Lakeflow Connect与PostgreSQL集成完整指南

本文详细介绍了如何将Lakeflow Connect与PostgreSQL集成,实现从PostgreSQL到Databricks Unity Catalog的实时数据摄取,包括CDC配置、网关部署、管道创建和Delta表验证等完整技术流程。

集成Lakeflow Connect与PostgreSQL

现代数据团队希望从PostgreSQL到Databricks Unity Catalog实现可靠、增量、近实时的数据摄取,而无需构建昂贵且脆弱的CDC作业、自定义管道或手动ETL编排。Lakeflow Connect通过为开发人员提供统一的低开销摄取框架来解决这个问题,该框架自动处理提取、CDC、模式同步和Unity Catalog内的表创建。

为什么需要Lakeflow Connect

在大多数工程团队中,数据摄取管道没有经过规划,也没有设计长期统一的愿景;不同的系统随着时间的推移被拼接在一起,个别工程师添加脚本,每个新源都会引入相同模式的另一个版本。虽然功能正常,但这些管道逐渐变得昂贵和脆弱,难以监控,并且几乎不可能跨环境扩展。

我的Lakeflow Connect配置通过为开发人员提供标准化的云原生摄取架构来改变这种动态,该架构在所有关系源中保持一致。无论您是要引入PostgreSQL、Oracle、SQL Server还是SAP,摄取体验都遵循相同的可预测模式:配置网关、定义管道、选择数据并将其放入Unity Catalog。这种一致性有助于团队消除自定义CDC逻辑,减少运营开销,并显著提高摄取可靠性。

参考架构:Lakeflow Connect + PostgreSQL + Unity Catalog

我主要使用五个主要组件:

  • PostgreSQL:源数据库,启用WAL逻辑复制
  • Lakeflow摄取网关:提取WAL日志的安全代理
  • Lakeflow Connect管道:协调提取→摄取→Delta
  • Unity Catalog:持续创建/更新的Delta表
  • 消费者:ETL、BI、ML工作负载

端到端摄取流程

为了将更改从PostgreSQL通过安全摄取网关流式传输到Unity Catalog,将初始加载与连续CDC更新相结合,我使用了以下管道,该管道自动处理提取、传输和Delta表写入,具有完全托管的端到端摄取路径。

为Lakeflow准备PostgreSQL

我为Lakeflow Connect使用逻辑复制(CDC)遵循以下步骤:

步骤1:启用逻辑复制

1
2
3
4
5
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;

SELECT pg_reload_conf();

步骤2:为Lakeflow创建用户

1
2
3
4
5
6
7
8
9
CREATE ROLE lakeflow_user
WITH LOGIN REPLICATION PASSWORD 'StrongPasswordHere';

GRANT CONNECT ON DATABASE appdb TO lakeflow_user;
GRANT USAGE ON SCHEMA public TO lakeflow_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO lakeflow_user;

ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO lakeflow_user;

步骤3:(可选)创建发布

如果必须显式发布表:

1
2
3
4
5
CREATE PUBLICATION lakeflow_publication
FOR TABLE
    public.orders,
    public.customers,
    public.payments;

设置Lakeflow摄取网关

我的摄取网关部署在您的安全网络/VPC内部。它提取WAL日志并将更改批次安全地推送到Lakeflow。

网关配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
gateway:
  name: pg-gateway-prod
  region: us-east-1
  mode: self_hosted

source:
  type: postgresql
  host: pg-prod.internal
  port: 5432
  database: appdb
  username: lakeflow_user
  password: ${PG_PASSWORD}
  sslmode: require

cdc:
  enabled: true
  slot_name: lakeflow_slot
  publication_name: lakeflow_publication
  heartbeat_interval_sec: 10

telemetry:
  enabled: true
  log_level: info

创建摄取管道

我构建了一个摄取管道,用于控制提取、CDC行为、映射和目标设置。此管道定义:

  • PostgreSQL源
  • 要复制的表/模式
  • 复制模式(初始加载+CDC)
  • Unity Catalog目标
  • 调度和警报
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
pipeline:
  name: pg_orders_to_uc
  description: Ingest PostgreSQL orders/customers into Unity Catalog

source:
  type: postgresql
  gateway: pg-gateway-prod
  database: appdb
  mode: incremental_cdc
  cdc:
    initial_load: true
    include_deleted_rows: true

selection:
  schemas:
    - name: public
      tables:
        - name: orders
        - name: customers
        - name: payments

  exclude_columns:
    - table: customers
      columns: ["ssn", "credit_card_number"]

target:
  type: unity_catalog
  catalog: lakehouse
  schema: postgres_raw
  table_naming:
    prefix: pg_
    case: lower
  write_mode:
    type: merge
    keys:
      orders: ["order_id"]
      customers: ["customer_id"]
      payments: ["payment_id"]

schedule:
  type: continuous  # real-time CDC
  fallback:
    on_failure: retry
    max_retries: 5
    backoff_sec: 60

notifications:
  on_success:
    - type: email
      to: data-eng@company.com
  on_failure:
    - type: slack
      webhook_url: ${SLACK_WEBHOOK_URL}

选择要复制的数据

我必须选择模式、表和可选的列过滤器来定义哪些数据流入数据湖仓。但是,您可以从以下选项中选择:

  • 整个模式
  • 特定表
  • 列级排除(PII屏蔽)
  • 增量与完全加载

示例列排除:

1
2
3
exclude_columns:
  - table: customers
    columns: ["ssn", "credit_card_number"]

这对于安全敏感的数据集非常有用。

配置Unity Catalog作为目标

我的配置将数据作为托管Delta表写入我选择的目录和模式中。Lakeflow Connect自动创建Delta表:

1
2
PGSQL
catalog.schema.table

分区和模式演化可以自动管理。

调度和通知

为了启用连续CDC或定期运行,并配置成功、失败或模式漂移的警报,我使用了以下可用选项之一:

  • 连续CDC(最低延迟)
  • 基于间隔(例如,每15分钟)
  • 手动或API触发
1
2
3
schedule:
  type: interval
  every: 15m

在Unity Catalog中验证复制的数据

为了查询摄取的Delta表,使用SQL或PySpark确保管道正常工作,我使用了以下代码。

1
2
3
4
5
6
SHOW TABLES IN lakehouse.postgres_raw;

SELECT order_id, customer_id, order_total, order_date
FROM lakehouse.postgres_raw.pg_orders
WHERE order_date >= current_date() - INTERVAL '7' DAY
ORDER BY order_date DESC;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
orders_df = spark.table("lakehouse.postgres_raw.pg_orders")
customers_df = spark.table("lakehouse.postgres_raw.pg_customers")

df = (
    orders_df.alias("o")
    .join(customers_df.alias("c"), "customer_id")
)

revenue = (
    df.groupBy("c.country")
      .sum("o.order_total")
      .orderBy("sum(order_total)", ascending=False)
)

display(revenue)

开发人员清单

我使用了一个快速的端到端清单来验证我的PostgreSQL、网关、管道和Unity Catalog设置是否正确配置,以实现无缝的Lakeflow摄取。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
[ ] 在PostgreSQL中启用wal_level=logical
[ ] 创建具有REPLICATION + SELECT权限的lakeflow_user
[ ] 授予模式/表级权限
[ ] 在您的VPC中部署摄取网关
[ ] 使用PG凭据和CDC详细信息配置网关
[ ] 创建以Unity Catalog为目标的管道
[ ] 选择要摄取的模式/表
[ ] 配置连续或间隔调度
[ ] 启用警报(Slack/Email)
[ ] 在Unity Catalog中验证Delta表

这种架构通过用干净、托管的摄取框架替换脆弱的ETL代码,使我的开发团队受益匪浅。这种变化使团队能够专注于构建功能,而不是维护管道,从而减少了运营开销。

总结

Lakeflow Connect提供了一个干净、现代、开发人员友好的管道,用于将PostgreSQL数据复制到Unity Catalog,而无需编写自定义CDC作业、批处理脚本或ETL粘合代码。

通过简单的网关、管道和目录工作流,开发人员可以建立生产级的摄取,支持基于日志的CDC、初始加载+增量同步、模式演化、自动Delta表创建、安全端到端传输以及实时和计划摄取。

如果您正在运行类似的自定义ETL设置,或者需要现代化摄取管道,请从Lakeflow Connect for PostgreSQL作为试点配置开始。一旦实施了Lakeflow Connect,您会想知道以前是如何管理昂贵、脆弱的自定义ETL作业的。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计