Azure Data Factory 全面解析:ETL 数据集成实战指南

本文深入探讨Azure Data Factory的核心功能、实际应用场景和最佳实践,包含完整的数据处理管道构建示例,帮助读者掌握这一强大的云端数据集成服务。

你应该使用 Azure Data Factory 吗?

是的,适用于大多数 ETL 场景。ADF 能很好地处理数据移动、基本转换和调度,只是不要期望它能替代复杂业务逻辑的自定义代码。

这到底是什么东西?

把 Azure Data Factory 想象成终极的数据移动服务。就像一个智能传送带系统,可以从任何地方抓取数据,对其进行处理,然后转储到其他地方。需要从 SQL 数据库提取客户数据,清理后推送到数据湖?ADF 能帮你搞定。

最好的部分是什么?你不需要编写大量自定义代码或管理服务器。微软负责繁重的工作,而你只需关注"做什么"而不是"如何做"。

管道:你的数据工作流

这是最有趣的部分。ADF 中的管道基本上是一个工作流,是你的数据经历的一系列步骤。把它想象成一个食谱:

  • 获取食材(提取数据)
  • 准备食材(转换数据)
  • 烹饪食材(处理数据)
  • 上菜(加载到目的地)

管道中的每个步骤称为"活动"。你可能有一个复制活动来将数据从点 A 移动到点 B,一个数据流活动来清理和转换数据,或者一个存储过程活动来运行一些自定义 SQL 逻辑。

真正重要的东西

复制活动

这是你的面包和黄油。复制活动可能占你使用功能的 80%。它们简单但功能强大。你告诉它从哪里获取数据,放在哪里,以及沿途想要的任何基本转换。

连接器库非常庞大——SQL Server、Oracle、MongoDB、REST API、平面文件,应有尽有。我曾经用它从一些我认为无法集成的非常奇怪的遗留系统中提取数据。

数据流

当你需要做的不仅仅是复制数据时,数据流就是你的朋友。把它们看作是可视化的 ETL(提取、转换、加载)过程。你可以拖放转换,如连接、聚合和过滤,而无需编写 SQL 或代码。

起初学习曲线有点陡峭,但一旦掌握了,你就可以快速构建复杂的数据转换。此外,它在底层生成 Spark 代码,因此扩展性很好。

触发器

除非有东西启动,否则 ADF 中什么也不会发生。触发器是你调度管道或让它们响应事件的方式。

你有基本的时间表触发器(每天凌晨 2 点运行)、翻转窗口触发器(分块处理数据)和基于事件的触发器(当文件落地到 blob 存储时运行)。基于事件的触发器对于构建实时数据处理特别方便。

现实检查

让我们诚实地面对你将面临的情况:

优点

  • 无需管理基础设施
  • 自动扩展
  • 与所有微软产品(以及大多数非微软产品)集成
  • 可视化界面,非开发人员也能理解
  • 内置监控和日志记录

痛点

  • 当出现问题时,调试可能是一场噩梦
  • 可视化设计器有时感觉笨拙
  • 如果不小心,定价可能会很昂贵
  • 当你需要非常自定义的逻辑时,功能有限
  • 版本控制…不太好

实战经验提示

  • 从小处着手:不要试图在一个庞大的管道中构建整个数据架构。将事情分解成更小、可管理的块。相信我这一点。
  • 使用参数:一切都应该参数化。源路径、目标表、日期范围,使其全部可配置。
  • 监控一切:为失败的管道运行设置警报。没有什么比发现你的关键数据加载三天前失败更糟糕的了。
  • 在较低环境中测试:ADF 没有很好的本地开发故事,因此拥有适当的开发/测试环境至关重要。
  • 学习表达式语言:ADF 有自己的动态内容表达式语言。起初很奇怪,但一旦熟悉了,你可以做一些相当酷的事情。

何时使用 ADF(以及何时不使用)

Azure Data Factory 非常适合:

  • 在 Azure 服务之间移动数据
  • 构建传统的 ETL 管道
  • 集成云和本地系统
  • 当你需要业务用户可以理解的东西时

可能不太适合:

  • 实时流处理(尽管它可以处理近实时)
  • 复杂的业务逻辑(坚持使用更简单的转换)
  • 当你需要毫秒级延迟时
  • 如果你还没有进入 Azure 生态系统

真实示例:处理每日客户数据文件

让我带你了解我构建的一个真实项目,处理每天放入 blob 存储的客户数据 CSV 文件,并将它们加载到 SQL 数据库中进行报告。

场景

每天早上 6 点,我们的系统将包含昨天客户数据的 CSV 文件转储到 Azure 存储帐户中。我们需要:

  1. 验证文件是否存在并且有数据
  2. 清理和转换数据
  3. 将其加载到我们的报告数据库中
  4. 归档处理过的文件
  5. 如果任何步骤失败,发送通知

设置链接服务

首先,你需要定义你的连接。这是连接到 blob 存储的 JSON 样子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
    "name": "BlobStorageLinkedService",
    "type": "Microsoft.DataFactory/factories/linkedservices",
    "properties": {
        "type": "AzureBlobStorage",
        "typeProperties": {
            "connectionString": {
                "type": "AzureKeyVaultSecret",
                "store": {
                    "referenceName": "KeyVaultLinkedService",
                    "type": "LinkedServiceReference"
                },
                "secretName": "storage-connection-string"
            }
        }
    }
}

这是 SQL 数据库连接:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
    "name": "SqlDatabaseLinkedService",
    "type": "Microsoft.DataFactory/factories/linkedservices", 
    "properties": {
        "type": "AzureSqlDatabase",
        "typeProperties": {
            "connectionString": {
                "type": "AzureKeyVaultSecret",
                "store": {
                    "referenceName": "KeyVaultLinkedService",
                    "type": "LinkedServiceReference"
                },
                "secretName": "sqldb-connection-string"
            }
        }
    }
}

主管道

这是协调一切的管道。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
{
    "name": "ProcessDailyCustomer",
    "properties": {
        "parameters": {
            "ProcessDate": {
                "type": "string",
                "defaultValue": "@formatDateTime(utcNow(), 'yyyy-MM-dd')"
            }
        },
        "activities": [
            {
                "name": "CheckFileExists",
                "type": "GetMetadata",
                "typeProperties": {
                    "dataset": {
                        "referenceName": "CustomerFileDataset",
                        "type": "DatasetReference",
                        "parameters": {
                            "fileName": "@concat('sales_', pipeline().parameters.ProcessDate, '.csv')"
                        }
                    },
                    "fieldList": ["exists", "itemName", "size"]
                }
            },
            {
                "name": "ProcessCustomerData",
                "type": "ExecuteDataFlow",
                "dependsOn": [
                    {
                        "activity": "CheckFileExists",
                        "dependencyConditions": ["Succeeded"]
                    }
                ],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 2
                },
                "typeProperties": {
                    "dataflow": {
                        "referenceName": "TransformCustomerData",
                        "type": "DataFlowReference",
                        "parameters": {
                            "processDate": {
                                "value": "@pipeline().parameters.ProcessDate",
                                "type": "Expression"
                            }
                        }
                    }
                }
            },
            {
                "name": "ArchiveProcessedFile",
                "type": "Copy",
                "dependsOn": [
                    {
                        "activity": "ProcessCustomerData", 
                        "dependencyConditions": ["Succeeded"]
                    }
                ],
                "typeProperties": {
                    "source": {
                        "type": "DelimitedTextSource"
                    },
                    "sink": {
                        "type": "DelimitedTextSink"
                    }
                }
            }
        ]
    }
}

用于转换的数据流

这是实际数据处理发生的地方:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
    "name": "TransformCustomerData",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "CustomerFileDataset",
                        "type": "DatasetReference"
                    },
                    "name": "CustomerSource"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "CustomerTableDataset", 
                        "type": "DatasetReference"
                    },
                    "name": "CustomerDB"
                }
            ],
            "transformations": [
                {
                    "name": "FilterValidRecords",
                    "description": "Remove records with missing required fields"
                },
                {
                    "name": "CalculateAge",
                    "description": "Calculate age from date of birth and add customer fields"
                },
                {
                    "name": "LookupContactInfo", 
                    "description": "Validate email format and standardize phone numbers"
                }
            ],
            "script": "source(output(\n\t\tCustomerID as string,\n\t\tFirstName as string,\n\t\tLastName as string,\n\t\tEmail as string,\n\t\tPhone as string,\n\t\tDateOfBirth as date,\n\t\tAddress as string,\n\t\tCity as string,\n\t\tState as string,\n\t\tZipCode as string,\n\t\tRegistrationDate as timestamp\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false) ~> CustomerSource\n\nCustomerSource filter(!isNull(CustomerID) && !isNull(Email) && !isNull(FirstName) && !isNull(LastName)) ~> FilterValidCustomers\n\nFilterValidCustomers derive(CleanEmail = lower(trim(Email)),\n\t\tCleanPhone = regexReplace(Phone, '[^0-9]', ''),\n\t\tIsValidEmail = contains(Email, '@') && contains(Email, '.')) ~> StandardizeContactInfo\n\nStandardizeContactInfo derive(Age = toInteger(daysBetween(DateOfBirth, currentDate()) / 365),\n\t\tFullName = concat(FirstName, ' ', LastName),\n\t\tCustomerTenure = toInteger(daysBetween(RegistrationDate, currentTimestamp()) / 365),\n\t\tAgeGroup = case(\n\t\t\tAge < 25, 'Young Adult',\n\t\t\tAge < 45, 'Adult', \n\t\t\tAge < 65, 'Middle Age',\n\t\t\t'Senior'\n\t\t)) ~> EnrichCustomerProfile\n\nEnrichCustomerProfile derive(FullAddress = concat(Address, ', ', City, ', ', State, ' ', ZipCode),\n\t\tTenureSegment = case(\n\t\t\tCustomerTenure < 1, 'New',\n\t\t\tCustomerTenure < 3, 'Growing',\n\t\t\tCustomerTenure < 5, 'Established',\n\t\t\t'Loyal'\n\t\t),\n\t\tProcessedDate = currentTimestamp()) ~> DeriveCustomerMetrics\n\nDeriveCustomerMetrics sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tinput(\n\t\tCustomerID as string,\n\t\tFirstName as string,\n\t\tLastName as string,\n\t\tFullName as string,\n\t\tEmail as string,\n\t\tCleanEmail as string,\n\t\tPhone as string,\n\t\tCleanPhone as string,\n\t\tIsValidEmail as boolean,\n\t\tDateOfBirth as date,\n\t\tAge as integer,\n\t\tAgeGroup as string,\n\t\tAddress as string,\n\t\tCity as string,\n\t\tState as string,\n\t\tZipCode as string,\n\t\tFullAddress as string,\n\t\tRegistrationDate as timestamp,\n\t\tCustomerTenure as integer,\n\t\tTenureSegment as string,\n\t\tProcessedDate as timestamp\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> CustomerDB"
        }
    }
}

设置触发器

这是设置每日触发器的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
    "name": "DailyCustomerProcessTrigger",
    "properties": {
        "type": "ScheduleTrigger",
        "typeProperties": {
            "recurrence": {
                "frequency": "Day",
                "interval": 1,
                "startTime": "2024-01-01T07:00:00Z",
                "timeZone": "UTC",
                "schedule": {
                    "hours": [7],
                    "minutes": [0]
                }
            }
        },
        "pipelines": [
            {
                "pipelineReference": {
                    "referenceName": "ProcessDailyCustomer",
                    "type": "PipelineReference"
                }
            }
        ]
    }
}

现实世界中的陷阱

  • 文件命名约定:确保你的文件命名一致。当有人更改日期格式并破坏一切时,我艰难地学到了这一点。
  • 错误处理:始终添加适当的错误处理。在上面的示例中,我会为失败添加电子邮件通知。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
{
    "name": "SendFailureEmail",
    "type": "WebActivity", 
    "dependsOn": [
        {
            "activity": "ProcessSalesData",
            "dependencyConditions": ["Failed"]
        }
    ],
    "typeProperties": {
        "url": "https://your-logic-app-webhook-url",
        "method": "POST",
        "body": {
            "subject": "Daily Sales Processing Failed",
            "message": "@concat('Pipeline failed for date: ', pipeline().parameters.ProcessDate)",
            "priority": "High"
        }
    }
}
  • 测试:创建一个用于测试的单独管道,使用较小的数据集。不要用生产数据测试——这永远不会有好结果。

实际实践中的样子

当你部署这个时,每天早上会发生以下情况:

  1. 触发器在早上 7 点触发。
  2. 管道检查客户文件是否存在。
  3. 如果找到,数据流在大约 3 分钟内处理约 50,000 条记录。
  4. 数据被加载到 SQL 数据库中。
  5. 原始文件被移动到存档文件夹。
  6. 你会收到成功通知(如果出现问题,则收到失败警报)。

入门指南

以下是我实际推荐的入门步骤:

  1. 首先构建一个简单的复制管道:将一些数据从 A 移动到 B。熟悉界面。
  2. 添加一些基本转换:在数据流中尝试查找或条件拆分。
  3. 设置监控和警报:学习如何读取运行历史记录并设置通知。
  4. 尝试触发器:从简单的时间表开始,然后尝试基于事件的触发器。
  5. 深入研究参数和变量:这是 ADF 真正开始闪耀的地方。

底线

Azure Data Factory 并不完美,但对于大多数数据集成场景来说,它相当可靠。它使你免于编写大量样板代码和管理基础设施。可视化界面使向利益相关者解释数据管道的工作更容易。

只需记住:从简单开始,参数化一切,不要试图强迫 ADF 做它没有设计用于做的事情。它是一个工具,而不是魔杖。

当它工作时,你会看起来像一个数据集成向导。当它不工作时…嗯,这就是 StackOverflow 的用途。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计