构建生产级数据转换管道的实战经验

本文分享了构建生产级数据转换管道的实战经验,包括如何权衡数据映射的完整性、应用80/20原则进行优先级排序,以及通过向量化技术实现规模化处理,为大规模数据迁移项目提供实用指导。

构建生产级数据转换管道的经验教训

将大规模企业数据在系统间转换,与其追求完美,不如在权衡取舍、规模化工程和灵活性方面做出正确决策。

1. 拥抱有意识的不完美

我们面临的第一个残酷现实是:试图100%无损失地映射所有字段是不切实际的。

遗留企业系统在孤岛中演进,通常经过数十年的增量更新。一个系统可能使用自由文本标签,而另一个系统需要枚举代码来表示相同的概念。试图在不同系统之间构建无损的通用映射不仅不切实际,而且适得其反。

在我们的项目中,我们需要决定哪些地方需要精确,哪些地方不需要。在我们的管道中,输入系统中最重要的字段之一是诊断代码。但有一个问题——我们的输入数据源使用的诊断表示格式比输出系统要求的粒度要粗,这意味着如果我们想要产生有效的输出,我们需要假设将输入映射到哪个输出诊断。

我们最终确定可用性比精确性更重要,并将每个较高级别的诊断代码映射到最常观察到的细粒度对应项。这是一个必然不精确的假设,但这是许多数据转换项目中出现的常见问题和模式,即使在医疗保健领域之外也是如此。

这种方法不会让完美主义者满意,但这使我们的输出在生产中可用。一个警告——这些方法要求我们承认所构建内容的不完美性,并准确记录输出的哪些部分故意引入了不精确性——这将有助于我们在将输出放入下游业务逻辑时理解其价值。

正如著名统计学家George Box曾经说过的:“所有模型都是错误的,但有些是有用的。"——将这一点应用于我们在数据工程中必须做出的权衡也不为过。

2. 严格执行80/20规则

在执行系统间迁移或转换数据时,很容易追求完整性:映射每个字段,处理每个边缘情况。但即使在可能的情况下,这种方法也可能不可取,特别是在竞争优先级和有限工程资源的世界中。

我们很快了解到,运行该项目最有效和高价值的方式是从业务逻辑和下游系统的需求反向工作。事后看来,医疗保健索赔数据是我们学习这一教训的完美领域。索赔数据范围广泛——有数百个字段代表医疗保健索赔的不同细微差别——程序代码、计费代码、收入代码等。这迫使我们进行谈判,优先考虑哪些字段将在下游业务逻辑中被实际使用,更不用说重要了。由于下游系统本身是一个机器学习模型,这也迫使我们探究哪些字段在做出预测时实际上具有任何(字面意义上的)权重。

在我们的案例中,我们了解到下游模型只关心少数关键字段。我们没有试图解决所有问题,而是加倍关注这些字段,并将其余字段的优先级降低。然而,我们不仅仅停留在列级别。一旦确定了重要的列,我们也很快意识到少数行不遵守所谓的模式:

  • 本应为5位数的CPT(程序)代码,有时要长得多
  • 零填充的位置代码
  • 编码方式不同的时间戳字段(有些是无时区的,有些是带时区的)

在这些不一致性中,我们必须重新应用相同的优先级逻辑。从这个意义上说,这意味着要弄清楚哪些问题值得解决,以及在解决每个问题上投入多少时间和精力。例如,零填充很容易通过子字符串调用解决,但理解我们的程序代码何时以及为何比应有的长度任意更长则不容易,特别是当没有可辨别的模式,并且这个问题只影响不到1%的相关行时。我们继续做出无情的优先级决策,使我们能够按时交付项目。

这遵循经典的帕累托原则(也称为80/20规则):20%的字段驱动80%的价值。

3. 从一开始就通过向量化设计规模

即使范围缩小,大规模数据管道通常很快达到生产规模。在我们的案例中,一旦我们连接多个表,我们的索赔数据集很快达到数百万行。在这个规模下,在本地机器上使用测试数据集似乎运行无缝的逻辑通常在生产中会卡顿和摇摆。

一个帮助我们从一开始就扩展的重要实践是尽可能选择向量化而不是迭代。

逐行操作(for循环和像.apply()这样的逐行函数)易于理解和编写,但在规模上最终会成为性能杀手。相反,优先使用向量化的数据框操作或数据库连接。当我们想要在索赔数据集上创建一个派生列,指示同一患者的本次索赔与上次索赔之间经过了多少时间(以天为单位)时,我们遇到了这个挑战的完美例子。虽然我们可以编写一个逐行函数来在数据框中搜索同一患者的上次索赔,但这将花费大量时间(因此也是计算资源)来运行。

相反,我们选择了一种更智能、更简化的方法——我们使用窗口函数对每个患者的索赔进行编号,将每个患者的时间戳列"滞后"一个位置,然后只需在两个时间戳列之间进行列式日期差计算。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import pandas as pd

# --- 示例输入,从我们的索赔数据集中泛化 ---
df = pd.DataFrame({
    "patient_id": [101, 101, 102, 101, 102],
    "claim_id":   [  1,   2,   3,   4,   5],
    "claim_ts":   pd.to_datetime([
        "2024-01-10 09:00",
        "2024-01-12 15:00",
        "2024-01-11 08:00",
        "2024-02-01 10:30",
        "2024-01-20 12:00"
    ])
})

# --- 示例向量化窗口方法 ---
# 1) 按患者+时间排序(基本上是一个窗口函数,适合那些更熟悉SQL的人)
df = df.sort_values(["patient_id", "claim_ts"], kind="stable")

# 2) 对每个患者的索赔进行编号,并通过shift"滞后"以获取先前的时间戳
df["claim_seq"] = df.groupby("patient_id").cumcount() + 1
df["prev_claim_ts"] = df.groupby("patient_id")["claim_ts"].shift(1)

# 3) 列式时间差(以天为单位)
df["delta_since_prev_days"] = (df["claim_ts"] - df["prev_claim_ts"]).dt.days

这可能看起来像一个实现细节,但它代表了构建企业管道时重要的设计决策哲学。

结论

构建生产级数据转换管道可能不是光鲜的工作,但混乱的权衡决策和小数据处理技巧代表了对于下游业务用例重要的基础步骤。在分享我们将HIE数据转换为索赔数据的旅程中的一些经验时,我希望您可以在构建自己的企业数据管道时利用这些学习成果。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计