使用DBT将数据加载到Redshift
在Yelp,我们拥抱创新并积极探索新的可能性。随着消费者对数据需求的不断增长,我们最近重新审视了如何更高效地将数据加载到Redshift。在这篇博客文章中,我们将探讨如何将DBT与Redshift Spectrum无缝结合使用,从数据湖读取数据到Redshift,从而显著减少运行时间、解决数据质量问题并提高开发效率。
起点
我们多年来将批处理数据加载到Redshift的方法一直很有效,但我们持续寻求改进。我们主要使用Spark作业读取S3数据,并将其发布到我们基于Kafka的内部数据管道(您可以在此处了解更多),以将数据获取到数据湖和Redshift。然而,我们开始遇到一些痛点:
- 性能:较大的数据集(每天超过1亿行)开始面临延迟。这主要是由于需要执行表扫描以确保在upsert操作中不会重复主键。
- 模式更改:大多数表都配置了Avro模式。模式更改有时很复杂,因为它们需要多步骤过程来创建和注册新的Avro模式。
- 回填:对数据进行回填修正的支持很差,因为没有简单的方法可以原地修改行。我们经常需要手动删除数据,然后为整个分区写入修正后的数据。
- 数据质量:并行写入数据湖和Redshift存在数据分歧的风险,例如两个数据存储之间的数据类型差异。
使用DBT改进Redshift加载
在考虑如何更高效地移动数据时,我们选择利用AWS Redshift Spectrum,这是一个专门构建的工具,使得可以从Redshift查询数据湖数据。由于数据湖表通常具有最新的模式,我们决定将其作为Redshift批处理的数据源,而不是S3。这不仅有助于减少数据分歧,还符合我们将数据湖视为单一事实来源的最佳实践。
在实现方面,Spectrum需要一个已定义的架构,该架构已经存在于我们数据湖表的Glue中。唯一需要的其他额外设置是将数据湖表添加为外部表,使它们可以通过简单的SQL查询从Redshift访问。
我们已经开始在其他数据集中采用DBT,但它似乎也是在我们流水线中捕获Redshift Spectrum查询的完美候选者。DBT擅长转换数据,并有助于强制执行编写模块化和版本控制的SQL。我们使用DBT简单地将数据从数据湖直接复制到Redshift,而不是使用Spark作业从S3读取到Redshift。DBT不仅提供了其通常的可重复性、灵活性和数据血缘关系的标志性优势,还帮助我们解决了上述提到的一些痛点。
简化的模式更改
为了简化模式更改,我们利用了DBT的on_schema_change配置参数。通过将其设置为append_new_columns,我们确保如果传入数据中缺少某些列,这些列不会被删除。我们还使用DBT契约作为第二层保护,以确保正在写入的数据与模型的配置匹配。
减少手动回填
使用DBT,回填也变得容易得多。通过使用DBT的pre_hook配置参数,我们可以指定一个在模型执行之前要执行的查询。这使我们能够更自动地删除即将写入的分区数据。既然我们可以保证幂等性,就可以进行回填,而不用担心过时的数据没有被删除。
数据去重
为了解决重复行的问题,我们在SQL中添加了一个去重层,并通过DBT测试进行了验证。虽然DBT有内置的唯一列测试,但对于我们的大表来说不可行,因为它们需要扫描整个表。相反,我们使用了dbt_expectations包中的expect_column_values_to_be_unique函数。这使我们能够指定行条件,仅扫描最近写入的行。
性能提升
最显著的收获是在性能方面,特别是对于我们最大和最有问题的Redshift数据集:
- 写入过去需要大约2小时,但现在通常只需10分钟即可运行。
- 以前,每月有时会有多达6小时的延迟。现在我们不再经历任何延迟!这大大减轻了我们的值班事件响应工作的负担。
- 模式升级过去是一个较长的多步骤过程。现在已经改进为一个只需几个小时的3步过程。
更好的数据一致性
通过消除数据流的分叉,我们增加了数据在不同数据存储之间不会出现分歧的信心。由于任何进入Redshift的数据都必须首先通过数据湖,我们可以更好地确保数据湖保持为我们单一的事实来源。
结论
在迁移成功后,我们将这些更改推广到大约十几个其他数据集,并观察到类似的全面好处。通过利用AWS Redshift Spectrum和DBT等工具,我们更好地将我们的基础设施与不断发展的数据需求对齐,为我们的用户和利益相关者提供了更大的价值。