集成Lakeflow Connect与PostgreSQL
现代数据团队希望从PostgreSQL到Databricks Unity Catalog实现可靠、增量、近实时的数据摄取,而无需构建昂贵且脆弱的CDC作业、自定义管道或手动ETL编排。Lakeflow Connect通过为开发人员提供统一的低开销摄取框架来解决这个问题,该框架自动处理提取、CDC、模式同步和Unity Catalog内的表创建。
为什么需要Lakeflow Connect
在大多数工程团队中,数据摄取管道没有经过规划,也没有设计长期统一的愿景;不同的系统随着时间的推移被拼接在一起,个别工程师添加脚本,每个新源都会引入相同模式的另一个版本。虽然功能正常,但这些管道逐渐变得昂贵和脆弱,难以监控,并且几乎不可能跨环境扩展。
我的Lakeflow Connect配置通过为开发人员提供标准化的云原生摄取架构来改变这种动态,该架构在所有关系源中保持一致。无论您是要引入PostgreSQL、Oracle、SQL Server还是SAP,摄取体验都遵循相同的可预测模式:配置网关、定义管道、选择数据并将其放入Unity Catalog。这种一致性有助于团队消除自定义CDC逻辑,减少运营开销,并显著提高摄取可靠性。
参考架构:Lakeflow Connect + PostgreSQL + Unity Catalog
我主要使用五个主要组件:
- PostgreSQL:源数据库,启用WAL逻辑复制
- Lakeflow摄取网关:提取WAL日志的安全代理
- Lakeflow Connect管道:协调提取→摄取→Delta
- Unity Catalog:持续创建/更新的Delta表
- 消费者:ETL、BI、ML工作负载
端到端摄取流程
为了将更改从PostgreSQL通过安全摄取网关流式传输到Unity Catalog,将初始加载与连续CDC更新相结合,我使用了以下管道,该管道自动处理提取、传输和Delta表写入,具有完全托管的端到端摄取路径。
为Lakeflow准备PostgreSQL
我为Lakeflow Connect使用逻辑复制(CDC)遵循以下步骤:
步骤1:启用逻辑复制
1
2
3
4
5
|
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;
SELECT pg_reload_conf();
|
步骤2:为Lakeflow创建用户
1
2
3
4
5
6
7
8
9
|
CREATE ROLE lakeflow_user
WITH LOGIN REPLICATION PASSWORD 'StrongPasswordHere';
GRANT CONNECT ON DATABASE appdb TO lakeflow_user;
GRANT USAGE ON SCHEMA public TO lakeflow_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO lakeflow_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO lakeflow_user;
|
步骤3:(可选)创建发布
如果必须显式发布表:
1
2
3
4
5
|
CREATE PUBLICATION lakeflow_publication
FOR TABLE
public.orders,
public.customers,
public.payments;
|
设置Lakeflow摄取网关
我的摄取网关部署在您的安全网络/VPC内部。它提取WAL日志并将更改批次安全地推送到Lakeflow。
网关配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
gateway:
name: pg-gateway-prod
region: us-east-1
mode: self_hosted
source:
type: postgresql
host: pg-prod.internal
port: 5432
database: appdb
username: lakeflow_user
password: ${PG_PASSWORD}
sslmode: require
cdc:
enabled: true
slot_name: lakeflow_slot
publication_name: lakeflow_publication
heartbeat_interval_sec: 10
telemetry:
enabled: true
log_level: info
|
创建摄取管道
我构建了一个摄取管道,用于控制提取、CDC行为、映射和目标设置。此管道定义:
- PostgreSQL源
- 要复制的表/模式
- 复制模式(初始加载+CDC)
- Unity Catalog目标
- 调度和警报
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
pipeline:
name: pg_orders_to_uc
description: Ingest PostgreSQL orders/customers into Unity Catalog
source:
type: postgresql
gateway: pg-gateway-prod
database: appdb
mode: incremental_cdc
cdc:
initial_load: true
include_deleted_rows: true
selection:
schemas:
- name: public
tables:
- name: orders
- name: customers
- name: payments
exclude_columns:
- table: customers
columns: ["ssn", "credit_card_number"]
target:
type: unity_catalog
catalog: lakehouse
schema: postgres_raw
table_naming:
prefix: pg_
case: lower
write_mode:
type: merge
keys:
orders: ["order_id"]
customers: ["customer_id"]
payments: ["payment_id"]
schedule:
type: continuous # real-time CDC
fallback:
on_failure: retry
max_retries: 5
backoff_sec: 60
notifications:
on_success:
- type: email
to: data-eng@company.com
on_failure:
- type: slack
webhook_url: ${SLACK_WEBHOOK_URL}
|
选择要复制的数据
我必须选择模式、表和可选的列过滤器来定义哪些数据流入数据湖仓。但是,您可以从以下选项中选择:
- 整个模式
- 特定表
- 列级排除(PII屏蔽)
- 增量与完全加载
示例列排除:
1
2
3
|
exclude_columns:
- table: customers
columns: ["ssn", "credit_card_number"]
|
这对于安全敏感的数据集非常有用。
配置Unity Catalog作为目标
我的配置将数据作为托管Delta表写入我选择的目录和模式中。Lakeflow Connect自动创建Delta表:
1
2
|
PGSQL
catalog.schema.table
|
分区和模式演化可以自动管理。
调度和通知
为了启用连续CDC或定期运行,并配置成功、失败或模式漂移的警报,我使用了以下可用选项之一:
- 连续CDC(最低延迟)
- 基于间隔(例如,每15分钟)
- 手动或API触发
1
2
3
|
schedule:
type: interval
every: 15m
|
在Unity Catalog中验证复制的数据
为了查询摄取的Delta表,使用SQL或PySpark确保管道正常工作,我使用了以下代码。
1
2
3
4
5
6
|
SHOW TABLES IN lakehouse.postgres_raw;
SELECT order_id, customer_id, order_total, order_date
FROM lakehouse.postgres_raw.pg_orders
WHERE order_date >= current_date() - INTERVAL '7' DAY
ORDER BY order_date DESC;
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
orders_df = spark.table("lakehouse.postgres_raw.pg_orders")
customers_df = spark.table("lakehouse.postgres_raw.pg_customers")
df = (
orders_df.alias("o")
.join(customers_df.alias("c"), "customer_id")
)
revenue = (
df.groupBy("c.country")
.sum("o.order_total")
.orderBy("sum(order_total)", ascending=False)
)
display(revenue)
|
开发人员清单
我使用了一个快速的端到端清单来验证我的PostgreSQL、网关、管道和Unity Catalog设置是否正确配置,以实现无缝的Lakeflow摄取。
1
2
3
4
5
6
7
8
9
10
|
[ ] 在PostgreSQL中启用wal_level=logical
[ ] 创建具有REPLICATION + SELECT权限的lakeflow_user
[ ] 授予模式/表级权限
[ ] 在您的VPC中部署摄取网关
[ ] 使用PG凭据和CDC详细信息配置网关
[ ] 创建以Unity Catalog为目标的管道
[ ] 选择要摄取的模式/表
[ ] 配置连续或间隔调度
[ ] 启用警报(Slack/Email)
[ ] 在Unity Catalog中验证Delta表
|
这种架构通过用干净、托管的摄取框架替换脆弱的ETL代码,使我的开发团队受益匪浅。这种变化使团队能够专注于构建功能,而不是维护管道,从而减少了运营开销。
总结
Lakeflow Connect提供了一个干净、现代、开发人员友好的管道,用于将PostgreSQL数据复制到Unity Catalog,而无需编写自定义CDC作业、批处理脚本或ETL粘合代码。
通过简单的网关、管道和目录工作流,开发人员可以建立生产级的摄取,支持基于日志的CDC、初始加载+增量同步、模式演化、自动Delta表创建、安全端到端传输以及实时和计划摄取。
如果您正在运行类似的自定义ETL设置,或者需要现代化摄取管道,请从Lakeflow Connect for PostgreSQL作为试点配置开始。一旦实施了Lakeflow Connect,您会想知道以前是如何管理昂贵、脆弱的自定义ETL作业的。