利用动态表和Medallion架构优化Snowflake中的ELT流程

本文详细介绍了如何在Snowflake中使用动态表和Medallion架构简化ELT工作流。动态表提供声明式、增量式数据处理,自动处理依赖关系和刷新,无需外部编排工具。文章涵盖从青铜层到黄金层的完整数据处理流程,包括与Snowpipe、Fivetran和Kafka的集成实践。

利用动态表和Medallion架构优化Snowflake中的ELT流程

动态表是Snowflake中的一种声明式方法,用于构建自动化、增量和依赖感知的数据转换。它们通过以最小操作开销提供大规模实时洞察,使您的数据管道现代化。

什么是动态表?

动态表是Snowflake中自动更新的物化表,可为您处理转换逻辑。您只需要定义:

  • 一个SQL转换查询
  • 目标新鲜度(例如,TARGET_LAG = ‘5分钟’)

无需使用Airflow或dbt Cloud等工具手动编排工作流,Snowflake会完成繁重的工作。它跟踪上游更改并自动更新表——仅处理新的或更改的数据。

结果?更快的管道、更新的洞察和更少的工作。

动态表的主要优势

  • 简化的ELT管道 – 无需外部编排器
  • 增量处理 – 仅刷新更改的数据
  • 新鲜度保证 – 基于TARGET_LAG自动更新
  • 计算效率 – 动态扩展资源
  • 智能依赖跟踪 – 仅在输入更改时刷新下游表
  • 管道弹性 – 无需手动作业或cron计划

动态表 vs. 视图 vs. 物化视图

特性 视图 物化视图 动态表
存储 否(仅虚拟) 是(预计算和存储) 是(自动刷新)
新鲜度 始终最新(查询时) 手动或自动刷新 通过TARGET_LAG维护
增量更新 有限
依赖感知 部分 是(自动)
计算使用 查询时 刷新时 刷新时(自动管理)
用例适合 轻量级查询 重用聚合 完整ELT管道逻辑
需要编排 是(外部) 通常需要 否(自编排)

Medallion架构:ELT的分层方法

Medallion架构将数据分为三个逻辑层:

描述 目的
青铜 原始、未处理的数据 从摄取工具捕获
白银 清理和验证的数据 过滤和转换的数据
黄金 聚合的业务数据 KPI和分析就绪的输出

此模型增强了数据平台的模块化、可观察性和可重用性。

动态表如何处理摄取工作负载

动态表设计用于直接在摄取管道上运行,无需手动编排即可实现实时或近实时数据转换。它利用Snowflake的元数据跟踪并自动高效刷新新数据,无论摄取方法如何,都能提供低延迟转换。

青铜层:使用Snowpipe、Fivetran或Kafka Connect进行摄取

1. Snowpipe + 动态表

用例: 适合微批处理、基于文件的摄取。 常见场景包括IoT遥测、点击流跟踪、日志数据以及JSON/CSV文件放入云存储(例如S3、GCS或Azure Blob)。

工作原理:

  • Snowpipe持续监控云存储阶段(例如Amazon S3)。
  • 当新文件到达时,它们通过内部COPY INTO操作自动加载到原始表(如orders_raw)中。
  • 在此原始表之上定义动态表(例如cleaned_orders)。
  • Snowflake使用元数据跟踪检测并仅转换新摄取的记录。

动态表行为:

  • 增量刷新:每次更新仅处理新记录。
  • 无需编排:Snowflake根据定义的TARGET_LAG自动触发刷新周期。
  • 高弹性:即使多个文件同时到达,Snowflake也能高效批处理并在后台处理。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE OR REPLACE DYNAMIC TABLE cleaned_orders
TARGET_LAG = '5 minutes'
WAREHOUSE = analytics_wh
AS
SELECT 
  order_id,
  customer_id,
  order_amount,
  order_status,
  order_date
FROM orders_raw
WHERE order_status IS NOT NULL;

当新的JSON订单文件通过Snowpipe到达时,cleaned_orders动态表会自动刷新——通常在5分钟内——无需cron作业,无需管道触发器。

最佳实践: 使用FILE_NAME、METADATA$FILENAME或摄取时间戳来跟踪批处理来源或在必要时去重行。

2. Fivetran + 动态表

用例: 从SaaS应用(如Salesforce、HubSpot、Stripe或Shopify)进行基于连接器的摄取。 适合从操作系统的批处理或近实时摄取。

工作原理:

  • Fivetran从源API提取数据并将其加载到原始Snowflake表(例如customers_raw)中。
  • 这些原始表反映完整快照或增量增量,具体取决于连接器类型。
  • 在这些原始输入之上定义动态表(例如cleaned_customers)。
  • Snowflake自动跟踪元数据更改并触发转换,无需任何外部编排。

动态表行为:

  • 自动刷新:当Fivetran更新原始表时,Snowflake检测更改并相应刷新下游动态表。
  • 高效处理:仅转换受上游更改影响的行——仅此而已。
  • 自愈管道:无需管理同步计划或刷新逻辑;全部由Snowflake原生处理。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE OR REPLACE DYNAMIC TABLE cleaned_customers
TARGET_LAG = '5 minutes'
WAREHOUSE = analytics_wh
AS
SELECT 
  customer_id,
  first_name,
  last_name,
  email,
  is_active
FROM customers_raw
WHERE is_active = TRUE;

当Fivetran完成同步一批更新的客户配置文件时,cleaned_customers会自动拾取并处理它们——无需dbt作业,无需调度器。

最佳实践: 在源系统中使用软删除(例如is_deleted或is_active标志),以允许通过动态表逻辑在白银层中过滤过时的行。

3. Kafka Connect + 动态表

用例:

  • 高频事件驱动摄取
  • 常见于实时分析、欺诈检测、用户交互跟踪和应用遥测

工作原理:

  • Kafka Connect使用Snowflake Kafka Connector将事件流(JSON、Avro或CSV)发送到Snowflake。
  • 事件持续追加到原始流表(如event_logs_raw或orders_raw)中。
  • 动态表反应性地拾取新事件并应用转换,无需人工干预。

动态表行为:

  • 高响应性:近乎即时响应传入事件数据,取决于TARGET_LAG
  • 增量设计:Snowflake避免重新处理完整表,仅操作新的分区/事件块
  • 无缝流到批处理:您可以通过仓库简单性实现近流性能
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE OR REPLACE DYNAMIC TABLE real_time_metrics
TARGET_LAG = '1 minute'
WAREHOUSE = analytics_wh
AS
SELECT 
  customer_id,
  COUNT(order_id) AS order_count,
  SUM(order_amount) AS total_value,
  MAX(event_time) AS last_activity
FROM orders_raw
GROUP BY customer_id;

real_time_metrics在从Kafka到达新事件的1分钟内保持最新——使其非常适合实时仪表板或警报系统。

最佳实践: 为防止重复处理(尤其是在重播期间),使用去重逻辑(如ROW_NUMBER())或利用Kafka消息元数据。

动态表消除了轮询、调度或外部编排工具的需要。它们提供:

  • 性能效率 – 仅处理已更改的数据
  • 成本优化 – 通过避免全表刷新最小化计算
  • 近实时分析 – 通过TARGET_LAG驱动的更新保持新鲜

白银层:清理和验证的数据

动态表清理、标准化和执行规则:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CREATE OR REPLACE DYNAMIC TABLE cleaned_orders
TARGET_LAG = '5 minutes'
WAREHOUSE = analytics_wh
AS
SELECT
    order_id,
    customer_id,
    order_amount,
    order_status,
    order_date
FROM orders_raw
WHERE order_status IS NOT NULL;

CREATE OR REPLACE DYNAMIC TABLE cleaned_customers
TARGET_LAG = '5 minutes'
WAREHOUSE = analytics_wh
AS
SELECT
    customer_id,
    first_name,
    last_name,
    email,
    is_active
FROM customers_raw
WHERE is_active = TRUE;

黄金层:聚合的业务级洞察

这里我们计算用于BI、ML和报告的指标:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
CREATE OR REPLACE DYNAMIC TABLE customer_order_summary
TARGET_LAG = '10 minutes'
WAREHOUSE = analytics_wh
AS
SELECT
    c.customer_id,
    c.first_name,
    c.last_name,
    COUNT(o.order_id) AS total_orders,
    SUM(o.order_amount) AS total_revenue,
    MAX(o.order_date) AS last_order_date
FROM cleaned_customers c
LEFT JOIN cleaned_orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.first_name, c.last_name;

监控动态表健康状态

轻松跟踪刷新操作和诊断问题:

1
2
SELECT * 
FROM SNOWFLAKE.INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH

最佳实践

  • 根据用例设置TARGET_LAG值(越低=越新鲜=更多计算)
  • 避免过深的转换链
  • 使用清晰的命名约定(bronze_、silver_、gold_)
  • 通过INFORMATION_SCHEMA监控刷新状态

动态表和Medallion架构提供了一种可扩展、声明式和低维护的方式来构建ELT管道,无论您是通过Snowpipe、Fivetran还是Kafka进行摄取。

Snowflake确保仅以增量、高效和可靠的方式处理正确的数据。此框架消除了编排复杂性,加速了洞察,并使您的分析平台为实时决策做好准备。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计