Apache Airflow 详解:初学者友好指南
引言
现代数据驱动型企业都依赖于数据管道,这些管道用于获取、转换、丰富和移动信息。这些数据管道可能涉及许多步骤(提取原始数据、清洗数据、训练机器学习模型、构建仪表板等)。此外,它们必须以非常特定的顺序运行。像 Apache Airflow 这样的工作流编排工具有助于确保所有这些步骤在正确的时间以正确的顺序运行,并使管道易于监控和管理。
Airflow 于 2014 年在 Airbnb 内部构建,现已成为最受欢迎的开源工作流编排平台之一。成千上万的组织使用它来自动化和监控批处理管道。在本文中,我们将从基础开始解释 Apache Airflow。我们将涵盖的主题包括:
- Airflow 是什么,它能做什么,以及数据团队为何使用它。
- Airflow 的核心概念和组件,以及它们如何组合在一起。
- 管道如何被定义为有向无环图,并通过一个简单的 DAG 实例进行讲解。
- 常见的用例,如 ETL/ELT、MLOps 等。
我们希望这篇实用的、以示例驱动的介绍,能帮助刚开始接触工作流编排的工程师、学生和团队入门 Apache Airflow。
关键要点
- Apache Airflow 是一个基于 Python 的工作流编排平台,它使用 DAG(有向无环图)来管理、调度和监控复杂的数据管道。
- DAG 定义了任务及其依赖关系,允许 Airflow 以正确的顺序执行每个步骤,并提供内置的重试、日志记录和可观察性。
- Airflow 的架构(调度器、执行器、工作器、元数据数据库、Web UI)支持可扩展的分布式执行和管道的实时监控。
- Airflow 在批处理工作流(如 ETL/ELT 作业、ML 训练管道、数据质量检查、计划分析)方面表现出色,但不适合实时或超高频率的工作负载。
- 根据具体需求(如事件驱动的工作流、Kubernetes 原生执行或现代开发者友好界面),可以选择 Prefect、Dagster、Luigi、Argo Workflows 和 Mage AI 等替代方案。
什么是 Apache Airflow?
Apache Airflow 是一个用于编写、调度和监控工作流的开源系统。Airflow 工作流是一个 DAG(有向无环图),它是一系列具有明确定义关系(依赖关系)且没有任何循环(任务不能在上游重复自身)的任务。Airflow 的核心哲学是“配置即代码”,因此您无需使用带有拖放界面的 UI,而是编写 Python 脚本来表示工作流。这种方法为工程师提供了极大的灵活性:您可以通过 Airflow 的运算符和 Python 库连接到几乎任何技术。您还可以对管道定义应用软件工程最佳实践(版本控制、测试等)。
为什么工作流编排很重要? 关键在于,通常在任何数据工作流中,都有多个必须按顺序执行的步骤。一个简单的例子是 ETL(提取/转换/加载)管道。随着添加更多任务、依赖关系、分支等,并且需要在固定节奏下运行,管道会迅速变得复杂。在这种情况下,手动干预或简单的调度器工具往往变得不够用。Airflow 通过充当中央编排器来解决此问题。它拥有一个有向无环图,包含任务(节点)和依赖关系(边)。它知道何时执行哪个任务(等待上游依赖关系完成后再运行),包括调度(定期间隔执行)、错误处理(自动重试、警报)和日志记录。
如果我们将整个数据管道视为一份食谱,那么每个单独的任务就是一个子步骤(切菜、煮沸、装盘等)。Airflow 就像是厨房经理,它知道食谱并关注每个厨房中发生的所有不同子步骤,确保一切都以正确的顺序在正确的时间发生。
没有工作流编排,您可能拥有所有的厨师和食材,但它们没有被协调起来。Airflow 提供了这种协调,从而能够产出所需的“菜肴”(工作流运行)。
为什么数据团队使用 Airflow?
Airflow 之所以在数据工程师、ML 工程师和 DevOps 团队中流行,是因为它解决了复杂数据管道的许多需求。以下是团队选择 Airflow 的几个主要原因:
- 编排复杂的工作流和依赖关系:Airflow 专为具有大量相互依赖步骤的工作流而设计。Airflow 可以轻松声明上游和下游要求,确保任务按正确顺序执行。如果任务 C 依赖于任务 A 和 B,Airflow 调度器了解这些关系,并确保在运行 C 之前运行 A 和 B(可能并行)。Airflow 可以处理复杂的依赖关系、分支逻辑等。
- 集中式调度(增强版 Cron):它提供了一个调度器,允许您按特定计划执行任务。您可以使用 Airflow 的调度器定义数十个或数百个在不同时间(每小时、每天、每月)触发任务的管道。Airflow 理解任务关系。它可以智能地调度和触发任务运行。中央调度器使您不必在不同的服务器上编写不同的脚本。
- 可扩展性和分布式执行:Airflow 可以随工作流扩展。它可以配置为在多个工作进程或机器上并行执行任务。它还提供了多种执行器(本文稍后讨论),可以在分布式系统(如 Celery 或 Kubernetes)上跨节点集群运行任务。这意味着您可以并行化繁重任务(大数据处理)并让多个管道同时运行。
- 监控、日志记录和警报:它附带一个功能丰富的 Web UI,提供工作流的实时可见性。您可以看到哪些任务成功、哪些失败,并查看每个任务的日志输出。您甚至可以通过 Web 界面手动触发和/或重试任务。
Airflow 的核心组件
以下是 Apache Airflow 核心构建块的简要概述。使用此表作为快速参考,以了解每个组件是什么以及它的作用。
| 组件 | 是什么 | 核心职责 |
|---|---|---|
| DAGs(有向无环图) | 用 Python 编写的工作流蓝图;具有依赖关系且无循环的任务图。 | 定义哪些任务运行以及以何种顺序运行;编码调度和元数据;支持在 UI 中可视化。 |
| Tasks(任务) | 最小的工作单元(DAG 图中的节点),从运算符或 TaskFlow 函数实例化而来。 | 执行特定操作(例如,运行 SQL、调用 Python、触发 API、移动文件);报告成功/失败。 |
| Scheduler(调度器) | 持续评估 DAG 和任务状态的持久服务。 | 根据时间表和依赖关系确定每个任务应何时运行;创建任务实例;管理回填和重试。 |
| Executor(执行器)和 Workers(工作器) | 实际运行任务的执行后端和进程。 | 在所选后端(本地进程/线程、Celery 工作器、Kubernetes Pod)上启动任务实例;将结果返回给调度器。 |
| Web Server(Web 服务器/Web UI) | 由 Airflow 的 Web 服务器提供的操作用户界面。 | 可观察性和控制:查看 DAG、触发/清除任务、检查日志、暂停 DAG、管理连接/变量。 |
| Metadata Database(元数据数据库) | 持久化的系统记录(通常使用 PostgreSQL/MySQL)。 | 存储 DAG 运行、任务实例、状态、配置、日志元数据和操作历史。 |
Airflow 的工作原理(架构)
以下是 Airflow 处理工作流(DAG)的逐步概述:
- 编写 DAG:DAG 作者编写一个 Python 脚本来定义 DAG。该脚本包括诸如时间表(例如,“每天上午 9 点运行”)或手动触发设置等内容。它还包含任务的定义(通过运算符或
@task函数)以及任务依赖关系(例如,task1 >> task2表示 task2 等待 task1 完成)。Python 文件保存在 Airflow 的 DAGs 文件夹中,以便被 Airflow 发现。Airflow 将定期读取(解析)此文件以加载 DAG 定义。 - 调度和 DAG 解析:Airflow 调度器是一个在后台持续运行的进程。它根据每个 DAG 的时间表或触发器检查其是否应该运行。例如,一个计划为“每日”的 DAG 可以被调度器拾取,并注意到必须在午夜安排一个新的运行。Airflow 调度器还会检查 DAG 中的任务是否准备好运行——这意味着它们的所有上游依赖关系都已完成,并且它们的计划时间(逻辑日期)已到。
- 执行任务:每个要执行的任务由调度器交给执行器。执行器决定在哪里以及如何运行任务。它可以在本地执行,也可以发送到工作器队列。对于分布式架构,任务将被添加到队列中,然后由执行任务的工作器进程选取。
- 更新状态和错误处理:每个任务实例在完成时将其状态(成功、失败等)报告回调度器,调度器使用任务状态更新其元数据数据库。 状态跟踪是让调度器知道下一步该做什么的关键——例如,如果任务 A 已成功完成,它就知道可以调度任务 B;如果任务 A 失败但仍有重试次数,则安排重试;如果失败且无剩余重试次数,则将 DAG 运行标记为失败。如果任务失败,Airflow 具有根据配置自动重试任务的逻辑。如果在重试后仍失败,Airflow 可以发出失败警报(发送电子邮件或 Slack 消息等)。所有这些都可以作为 DAG 或任务参数的一部分进行配置(例如,重试次数、重试延迟、警报发送的电子邮件)。
- 监控和干预:Airflow Web UI 提供了上述过程的实时界面。您可以看到 DAG 运行出现(例如,在树状视图中为今天的运行显示一条新线),并观察任务状态在进展过程中发生变化。图视图可视化 DAG 的任务,并显示哪些任务已完成或出错。
- 清理和下一次运行:当 DAG 运行完成(所有任务都已成功或已被跳过/失败)时,Airflow 将转到下一个计划运行。所有关于过去运行的元数据仍保留在数据库中,您可以随时查看历史记录。调度器将继续根据时间表或手动触发触发新的运行。此过程会重复进行,直到 Airflow 进程被终止。
理解 Airflow 中的 DAG
Airflow 中的 DAG 定义了执行工作流所需的时间表、任务和依赖关系。DAG 不需要知道每个任务内部是什么;它只需要定义它们应该在何时以及以何种顺序运行。
任务、运算符和传感器
任务是 Airflow 中的基本工作单元。在 Airflow 中,任务是运算符的实例,运算符是预定义操作的模板。Airflow 附带各种核心运算符。BashOperator 允许您运行 shell 命令,PythonOperator 允许您调用 Python 函数。Airflow 还提供了一个 @task 装饰器,允许您将常规 Python 函数转换为任务。
除了运算符,Airflow 还提供了传感器。传感器是等待满足某个条件或事件后才成功的任务。它们可以用于等待文件出现或表被填充等情况。
任务依赖关系
要声明任务之间的依赖关系,您可以使用 >> 和 << 运算符(推荐)或诸如 set_upstream 和 set_downstream 这样的方法。也可以链接任务、创建交叉下游依赖关系,甚至动态构建任务列表。没有任何依赖关系的 DAG 将只是一组独立的任务。
DAG 设计最佳实践 下表列出了设计 DAG 时需要牢记的一些准则。
| 最佳实践 | 描述 | 实用技巧 / 示例 |
|---|---|---|
| 确保 DAG 真正是无环的 | DAG 不得包含循环。在复杂的工作流中,很容易引入间接循环(例如,A → B → C → A)。Airflow 会自动检测循环并拒绝运行此类 DAG。 | 定期检查依赖关系。使用图视图直观确认 DAG 没有循环。重构复杂 DAG 以避免意外循环。 |
| 保持任务幂等且小巧 | 每个任务应执行单个逻辑步骤,并且能够安全地重新运行而不会损坏数据。幂等任务确保重试不会产生不一致的结果。 | 使用“插入或替换”模式。在提交结果之前写入临时文件。将大型脚本拆分为多个任务。 |
| 使用描述性 ID 和文档 | 清晰的命名和文档可提高代码和 Airflow UI 中的可读性和可维护性。 | 使用有意义的 DAG ID 和任务 ID。使用 dag.doc_md 或 task.doc_md 添加文档。命名与业务逻辑保持一致。 |
| 利用 Airflow 特性 | 使用 Airflow 内置功能进行通信、凭证处理和管道协调,而不是自定义逻辑。 | 使用 XComs 进行小数据共享。使用变量和连接处理配置和密钥。尽可能使用内置运算符和钩子。 |
| 测试和版本控制 DAG 代码 | DAG 就是代码,应使用适当的软件工程实践进行维护:版本控制、测试和 CI/CD。 | 将 DAG 存储在 Git 中。为自定义运算符和逻辑编写测试。在部署到生产环境之前使用本地/测试环境。 |
| 避免使调度器过载 | 大量 DAG 或 DAG 文件中的繁重计算会降低调度器速度。DAG 应轻量级以便解析。 | 监控调度器性能。将大型 DAG 拆分为较小的 DAG。避免在 DAG 导入期间进行 API 调用或繁重计算。 |
| 定义清晰的故障处理策略 | 规划应如何处理故障。并非所有错误都应重试,长时间运行的任务可能需要 SLA 或警报。 | 仅对暂时性故障使用重试。使用传感器和触发器处理基于事件的依赖关系。为长时间或关键任务设置 SLA。 |
示例:一个简单的 Airflow DAG
没有什么比一个例子更能巩固概念了。让我们通过一个简单的 Airflow DAG 代码并解释其各部分。这个例子将创建一个具有两个顺序任务的小型工作流:
|
|
让我们分解一下这里发生了什么:
- 导入所需的类:
DAG(用于定义 DAG)和BashOperator(用于在任务中运行 shell/bash 命令)。我们还导入了datetime来指定 DAG 的开始日期。 - 我们使用上下文管理器创建一个 DAG 对象(
with DAG(…) as dag:)。为 DAG 提供一个唯一的 ID “example_dag”(在 Airflow 环境中必须是唯一的)、一个描述(可选,以帮助人类阅读)、一个start_date和一个schedule_interval。这里,我们设置了一个内置的schedule_interval预设(@daily)和start_date=datetime(2025,1,1)。我们还设置了catchup=False,以防止在start_date和今天之间回填任何缺失的运行(我们只希望它从现在开始每天运行)。整个代码块创建了一个 DAG,Airflow 将知道并每天在一天开始时运行它。 - 然后我们在 DAG 上下文中定义两个任务:
task1使用BashOperator定义,用于执行 bash 命令(date)。task_id是 “print_date”,将用于在 UI 和日志中标识此任务。运行时,此任务将简单地打印当前日期/时间(到任务的日志)。task2也使用BashOperator,task_id="echo_hello",command=echo 'Hello, Airflow!'。这将打印一条友好的消息。目前这些任务只是被定义,尚未链接。
- 最后一行(
task1 >> task2)指定了一个依赖关系(流向):task1流向task2。这意味着任务 1 将运行并成功完成后,任务 2 才能被考虑调度运行。在 DAG 图中,将有一个箭头从 task1 指向 task2。
此示例中需要注意的几点:
- 为了简单起见,我们只使用了
BashOperator,但我们本可以使用混合类型的运算符 DAG。我们可以使用PythonOperator调用 Python 函数,或者使用SimpleHttpOperator调用 API 等。 with DAG(…) as dag:上下文模式是创建 DAG 实例的常用习惯用法。在其中创建的所有任务都会自动与该 DAG 关联。catchup=False是一个参数,在许多具有过去start_date的 DAG 中使用,当您不希望它运行为所有过去日期时。如果我们保留catchup=True(默认值)并且今天是 2025 年 1 月 10 日,Airflow 将尝试为 1 月 1 日、2 日、3 日……直到 1 月 10 日运行 DAG,以补上那些错过的日期。我们在此关闭它以简化。- 此示例使用了
BashOperator的相对导入路径。在 Airflow 2 中,许多运算符被移到了airflow.providers下。此示例基于这些提供程序已安装的假设编写(BashOperator在 Airflow 核心本身中)。
这个简单的示例可以很容易地扩展——您可以添加额外的任务(也许调用 API,然后将结果加载到数据库,然后发送通知)。您还可以添加更复杂的逻辑(如分支——Airflow 支持使用 BranchOperator 进行 if/else 风格的分支,例如)。
随着 DAG 规模变大,Airflow 的 UI 和日志记录对于了解正在发生的情况变得非常宝贵。在此示例中,print_date 的日志将包含打印的系统日期,echo_hello 的日志将包含 “Hello, Airflow!”。如果 print_date 失败(也许在假设的场景中找不到 date 命令),那么 echo_hello 将永远不会执行(由于依赖关系),并且 DAG 运行将被标记为失败。然后您可以检查日志,修复问题,并重新运行该任务或整个 DAG。
Airflow 的常见用例
下表是对 Apache Airflow 常见用例的结构化总结。该表突出了每个用例涉及的内容以及 Airflow 在其中如何增加价值。
| 用例 | 描述(涉及内容及典型步骤) | Airflow 如何帮助 / 好处 |
|---|---|---|
| ETL/ELT 数据管道 | 使用 ETL(提取-转换-加载)或 ELT 模式将数据从多个源移动到数据仓库或数据湖。 | DAG 以正确的顺序编排 提取 → 转换 → 加载。Airflow 与许多数据库和存储系统集成。如果步骤失败(例如 API 宕机),下游任务将停止,并且可以触发警报,从而提高批处理和增量同步的可靠性。 |
| 数据仓库和 BI 报告 | 准备和刷新仪表板和报告中使用的分析数据。 | 安排每日或定期作业,以便报告始终基于最新数据运行。协调 SQL 工作负载、质量检查和报告步骤。提供监控和通知,使故障(例如损坏的聚合)可见,而不是无声地破坏 BI 输出。 |
| 机器学习管道 | 自动化从数据准备到部署的端到端 ML 工作流。 | 将每个 ML 阶段表示为 DAG 中的一个任务,并强制执行正确的顺序。可以在步骤之间持久化工件(预处理数据、模型二进制文件)。与 ML 框架和 Kubernetes 运算符集成,并支持在非高峰时段进行计划的重新训练和实验编排。 |
| 数据质量检查和验证 | 运行自动检查以确保数据完整、一致且可信。 | 安排定期的数据质量 DAG(每天/每周)。跨数据库和 QA 脚本协调检查,在异常时向团队发出警报,并可以作为数据可靠性工程的一部分触发下游纠正任务。 |
| 事务性数据库维护和备份 | 为生产数据库和基础设施自动化例行的运维任务。 | 集中调度和监控维护任务。确保备份和清理工作在定义的时间一致运行,减少手动工作和人为错误,并提供日志/历史记录,以便团队确认关键维护作业已完成。 |
| 集成 / 工作流自动化 | 编排跨多个服务和 API 的业务或系统工作流。 | 充当灵活的“粘合”层,连接服务。DAG 编码复杂的分支和条件逻辑。运算符和 Python 任务支持自定义集成。提供了一个集中位置来管理、监控和重试业务流程步骤,而不是将自动化逻辑分散在临时脚本和工具中。 |
何时不应使用 Airflow
尽管 Airflow 功能强大,但它并非适用于所有情况。了解其局限性有助于您选择正确的编排方法:
- 实时或流式工作负载:Airflow 是批处理导向的,具有明确定义的开始和结束。它不适合长时间运行的、事件驱动的或流式工作负载。如果您需要摄取连续的事件流(用户点击、物联网传感器等)并以非常低的延迟实时处理这些事件,Airflow 不是最佳选择。
- 高频率或大量短任务:这是 Airflow 可能无法有效工作的另一种情况。Airflow 任务有一定的开销(在数据库跟踪等中),因此如果您需要每几秒触发一次,或者有数千个微小任务,Airflow 可能无法很好扩展。
- 纯粹事件驱动的工作流:这与流式用例类似,但适用于应该由事件触发的情况。“每当事件 X 发生时,执行 Y” 是 Airflow 的一个完全有效的用例,但如果触发该操作的唯一条件是某个事件(例如,“每当文件到达 S3 存储桶时,执行 X”),那么 Airflow 在这里并不是最轻量级的解决方案。Airflow 通常使用传感器(例如,
S3KeySensor)来“感知”文件/事件的存在。传感器有缺点,因为它们依赖于轮询。传感器的问题包括触发现有文件以及不一定对事件响应(例如,它们按时间间隔轮询)。
简而言之,Airflow 适用于周期性的、批处理导向的或复杂到需要定义和管理的工作流。如果您的用例需要实时或连续处理、超高频作业或非常简单且不需要 Airflow 开销的工作流,它可能不是一个好的选择。如果是这样,您可能需要探索其他选项或简化解决方案。
Airflow 的替代方案
近年来,工作流编排领域蓬勃发展,虽然 Airflow 是领导者,但有几个替代工具值得了解:
| 工具 / 服务 | 简介 | 参考 |
|---|---|---|
| Luigi | 由 Spotify 创建的开源 Python 工作流调度器。适合用代码构建具有任务和依赖关系的批处理管道。架构比 Airflow 更简单、更轻量,但生态系统较小,内置集成较少。 | Luigi 文档 |
| Prefect | 以 Python 原生的编排框架定位,是 Airflow 更现代、对开发者更友好的替代品。使用流和任务,提供调度、重试和可观察性。可以完全开源运行,或使用 Prefect Cloud 获得托管的 UI 和控制平面。 | Prefect 官网 Prefect 文档 |
| Dagster | 专注于软件定义资产和数据感知管线的数据编排器。强调类型安全、测试和开发工作流。非常适合关心数据沿袭、质量和现代工程实践的团队。 | Dagster 官网 Dagster 文档 |
| Kedro | 用于构建可重现、可维护的数据和 ML 管线的 Python 框架。专注于项目脚手架、模块化管道和最佳实践。通常与 Airflow 等编排器一起使用,而不是取代它。 | Kedro 官网 Kedro 文档 |
| Argo Workflows | 作为 CRD 实现的 Kubernetes 原生工作流引擎。每个步骤都在一个容器中运行,非常适合云原生批处理作业、CI/CD 以及 Kubernetes 重度环境中的 ML 管道。 | Argo Workflows 官网 |
| Mage AI | 现代数据管道工具,用于构建、运行和管理 ETL、流式和 ML 管道,具有类似笔记本的视觉界面。专注于开发者体验和快速迭代。 | Mage AI 官网 |
| Kestra | 使用声明性 YAML 工作流的开源、事件驱动的编排平台。专为具有强大插件生态系统的可扩展、计划性和事件驱动的数据及流程编排而设计。 | Kestra 官网 Kestra 文档 |
Airflow 不是唯一的选项,最佳选择取决于您的具体用例。Luigi、Prefect 和 Dagster 通常被认为是同一领域(基于 Python 的工作流编排器)的主要其他开源选项。如果您发现 Airflow 的陈旧 UI 或其他限制很麻烦,您应该考虑评估这些工具。Prefect 试图成为“更好”或“更简单”的 Airflow,Dagster 试图成为一个更“结构化”的、强调数据资产的编排器,而 Luigi 是 Airflow 更简单的前身。另一方面,如果您的用例差异很大(例如实时流式处理或完全云原生),您可能根本不使用 Airflow,而是直接考虑流式平台或托管的编排服务。
最后思考
Apache Airflow 是一个用于编排复杂计算工作流和数据处理管道的开源平台。最初由 Airbnb 开发,它在数据工程和机器学习社区中获得了极大的普及。Airflow 的组件包括调度器、执行器、工作器、元数据数据库和基于 Web 的 UI。这使团队能够轻松运行和维护生产就绪的 ETL、MLOps 和基础设施管道。Airflow 的 DAG(有向无环图)抽象使得工作流能够明确、可测试并可长期维护。该平台由社区构建的运算符和提供程序组成的生态系统,也简化了依赖于任何技术或工具的工作流编排。
然而,它并不是一个万能的解决方案。它最适用于批处理或微批处理管道,并且用户必须熟悉 Python。对于流式处理或高频率工作流,有比 Airflow 更合适的其他解决方案。对于数据科学家或喜欢编写声明性管道的团队,可能值得考虑 Airflow 的替代工具。
Airflow 仍在积极开发中,新功能不断添加,例如事件驱动调度、数据集和资产。无论您的背景如何,本文介绍的概念和最佳实践都应有助于您理解工作流编排的工作原理以及何时应使用(或不使用)Airflow。希望这能作为您进一步探索这个丰富领域的垫脚石。