Yelp收入数据管道构建实战:从需求解析到Spark ETL

本文详细介绍了Yelp如何构建收入数据管道,实现与第三方收入识别服务的集成。涵盖需求解析、数据差距分析、系统设计评估,以及使用Spark ETL处理复杂数据转换的技术方案,包括特征管理、依赖处理和UDF应用等核心实现细节。

背景

随着Yelp业务的持续增长,交易量增加和新产品服务的推出使得收入流变得更加复杂。这些变化对收入识别中涉及的手动流程提出了挑战。

如收入自动化系列第一篇文章所述,Yelp投入了大量资源现代化其计费系统,以实现收入识别流程自动化的前提条件。在本博客中,我们将分享如何构建收入数据管道,促进与第三方收入识别SaaS解决方案(以下简称REVREC服务)的集成。

我们实现自动收入识别的旅程

REVREC服务在识别收入方面提供以下好处:

  • 以最低成本和风险识别任何收入流,适用于一次性购买和订阅,以固定费率或可变价格提供
  • 实现连续会计流程,实时核对收入数据,使会计团队能够加快50%的结账速度
  • 实时预测收入,为我们提供开箱即用的报告和仪表板进行收入分析

为了获得上述所有好处,我们需要确保REVREC服务拥有正确的数据来识别收入。因此,本项目的核心是构建一个数据管道,从Yelp的生态系统中收集并生成高质量的收入数据到REVREC服务。

步骤1:处理模糊需求

我们从围绕标准收入识别流程的产品需求列表开始:

让我们看一下"定义交易金额"部分的一个示例需求:

总合同价值(TCV)应在预订时捕获,作为要识别的收入,基于每行的公允价值分配将不同于发票金额。

这些需求是为会计师编写的,但工程师很难理解。为了将其转化为工程师友好的需求并确保跨职能团队达成一致,我们遵循以下方法进行分解:

首先,我们需要创建一个词汇表,将Yelp的业务术语映射到工程术语。这样的映射快速对齐了跨职能团队的理解。对于示例需求,词汇表如下所示:

需求:总合同价值(TCV)应在预订时捕获,作为要识别的收入,基于每行的公允价值分配将不同于发票金额。

词汇表

  • 收入合同:在Yelp商务网站上下的订阅订单
  • 预订:用户完成订阅购买请求的时刻
  • 公允价值:如果我们单独销售此产品的总金额
  • 每行:我们跟踪产品功能履行和计费的最小粒度
  • 发票金额:在收入期间产品分类账上的净额

其次,我们需要提取此需求的目的,以便理解为什么我们首先需要某些数据,以及什么数据最接近业务需求。用示例计算解释也很有帮助。

目的

  • 确保收入分配基于我们单独销售此产品的价格,而不是基于我们实际向用户计费的金额

示例计算

  • 产品A单独售价为100美元
  • 产品B单独售价为20美元
  • 产品A和B的订阅捆绑包,产品A售价100美元,产品B作为附加功能免费提供
  • 订阅订单的总收入100美元应基于单独售价分配,A的份额100/120 * 100 = 83.333,B的份额20/120 * 100 = 16.667

最后,我们将原始需求转化为工程师友好的需求,如下所示:

REVREC服务要求在用户完成订阅订单时发送购买产品的总金额,此金额可能与我们实际向用户计费的金额不同。

步骤2:数据差距分析

项目团队在将Yelp的自定义订单到现金系统(详细信息可在此博客文章中找到)与通常在此类项目中运行良好的标准ETL(提取、转换和加载)架构集成时也面临挑战。这导致了数据差距分析,以使Yelp的数据与集成所需的模板对齐。

一些主要挑战和解决方案包括:

数据差距

  • 问题:Yelp系统与第三方系统之间的字段没有直接映射
  • 即时解决方案:
    • 使用近似值,例如使用总计费金额作为标价
    • 复合数据,例如通过将产品唯一ID与收入期间结合创建唯一的月度合同标识符
  • 长期解决方案:开发产品目录系统以获取通用产品属性

不一致的产品实现

  • 问题:不同产品的数据属性分散在各个数据库中
  • 即时解决方案:
    • 将来自不同表的数据预处理为统一模式
    • 按类型(例如订阅、计费、履行)分类和预处理数据
  • 长期解决方案:建议将来将计费数据模型统一到集中模式中

数据差距分析的结果不仅阐明了支持使用现状数据进行自动收入识别的即时解决方案,还为我们提供了更好的长期投资方向,使自定义订单到现金系统更接近行业标准,从而使数据映射更加直接。

步骤3:系统设计评估

在Yelp,有许多处理、流式传输和存储大规模数据的选项。我们考虑了可用选项,并评估了它们的优缺点,然后选择了适合我们需求的选项。

我们对存储和数据处理框架有以下设计选择:

选项1:MySQL + Python批处理

  • 生成财务报告的传统方法
  • 由于更改生产数据导致重新运行结果不一致以及在峰值数据量期间批处理时间缓慢而被拒绝

选项2:数据仓库 + dbt

  • 使用SQL进行数据转换,允许非工程师更新作业
  • 由于难以用SQL表示复杂逻辑而被拒绝

选项3:事件流 + 流处理

  • 具有近实时数据处理能力的成熟技术
  • 由于不需要立即数据呈现,且第三方接口不支持流集成,增加了复杂性而没有好处而被拒绝

选项4:数据湖 + Spark ETL

  • MySQL表每日快照并存储在数据湖中,然后使用Spark ETL处理
  • 首选方案,好处包括数据源和处理的独立可重现性、峰值期间的可扩展性以及强大的社区支持

最终,我们选择了数据湖 + Spark ETL选项。然而,它提出了诸如管理复杂的数据处理DAG以及将现有业务逻辑从Python高效转换为PySpark等挑战。我们将在下一节讨论如何解决这些挑战。

解决技术挑战

管理复杂的Spark ETL管道

报告收入合同数据需要来自不同来源的综合数据,包括原始mysql表快照和一些来自外部系统的预计算数据源。

它还涉及多个数据转换阶段,我们需要按类别聚合数据,通过应用转换逻辑添加其他字段,连接然后将它们映射到最终数据模板中。您可以通过查看下面的简化图了解Spark ETL管道:

管理如此复杂的ETL管道可能是一项艰巨的任务,尤其是在处理复杂的收入识别逻辑时。在Yelp,我们使用名为spark-etl的内部包来简化此过程。在本节中,我们将解释spark-etl如何帮助我们有效管理和维护ETL管道。

构建块 - Spark特征

Spark-ETL程序的构建块是Spark特征,它定义了输入、转换逻辑和输出。这些特征类似于具有请求-响应模式的Web API。

在我们的设计中,我们将Spark特征分为两类:源数据快照特征和转换特征。

源数据快照特征

源数据快照特征从S3读取数据库快照,并将数据传递到下游而不进行任何转换。然后,各种转换特征可以重用此原始数据。以下是从S3位置检索源数据的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class ARandomDatabaseTableSnapshotFeature(SparkFeature):
    alias = f"{TABLE_NAME}_snapshot"

    def __init__(self) -> None:
        self.sources = {
            TABLE_NAME: S3PublishedSource(
            	   base_s3_path=get_s3_location_for_table(TABLE_NAME),
            	   source_schema_id=get_schema_ids_for_data_snapshot(TABLE_NAME),
            	   date_col="_dt",
            	   select_cols=TABLE_SCHEMA,
            )
  }

    def transform(
        self, spark: SparkSession, start_date: date, end_date: date, **kwargs: DataFrame
    ) -> DataFrame:
        return kwargs[TABLE_NAME]

转换特征

转换特征将源数据快照特征或其他转换特征作为pyspark.DataFrame对象。它们执行各种转换,如投影、过滤、连接或应用用户定义函数。以下是转换函数中pyspark.DataFrame操作的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ARandomTransformationFeature(SparkFeature):
    def __init__(self) -> None:
        self.sources = {
            "feature_x": ConfiguredSparkFeature(),
            "feature_y": ConfiguredSparkFeature(),
        }

    def transform(
        self, spark: SparkSession, start_date: date, end_date: date, **kwargs: DataFrame
    ) -> DataFrame:
        feature_x = kwargs["feature_x"]
        feature_y = kwargs["feature_y"]

        # 基于需求转换DataFrame
        feature_x = feature_x.withColumn(
            "is_flag", lit(False).cast(BooleanType())
        )
        feature_y = feature_y.withColumn(
            "time_changed", lit(None).cast(IntegerType())
        )
        aggregated_feature = feature_x.unionByName(feature_y).drop("alignment")
        return aggregated_feature.filter(
            (
                aggregated_feature.active_period_start
                <= aggregated_feature.active_period_end
            )
            | aggregated_feature.active_period_end.isNull()
            | aggregated_feature.active_period_start.isNull()
        )

依赖管理

依赖关系由用户定义的yaml文件处理,该文件包含所有相关的Spark特征。无需在yaml文件中绘制复杂的依赖关系图。在运行时,spark-etl根据拓扑确定执行序列。

例如,如果关系表示为下面的DAG,相应的yaml配置只需要指定节点而不是边,以保持配置的简洁性,因为边已在SparkFeature.sources中定义。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
features:
    <feature1_alias>:
        class: <path.to.my.Feature1Class>
    <feature2_alias>:
        class: <path.to.my.Feature2Class>
    <feature3_alias>:
        class: <path.to.my.Feature3Class>
    <feature4_alias>:
        class: <path.to.my.Feature4Class>

publish:
    s3:
        - <feature4_alias>:
            path: s3a://bucket/path/to/desired/location
            overwrite: True

调试

鉴于Spark程序的分布式性质和延迟评估,很难设置断点并进行交互式调试,而且这个新数据管道通常处理具有数千甚至数百万行的数据帧,将中间数据帧检查点到暂存路径将是一种方便的方法,用于检查数据以进行调试,并通过指定计算昂贵的特征路径更快地恢复管道。

Yelp的Spark-etl包支持将特征检查点到暂存路径,例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
spark-submit \
     	/path/to/spark_etl_runner.py \
     	--team-name my_team \
     	--notify-email my_email@example.com \
     	--feature-config /path/to/feature_config.yaml \
     	--publish-path s3a://my-bucket/publish/ \
     	--scratch-path s3a://my-bucket/scratch/ \
     	--start-date 2024-02-29 \
     	--end-date 2024-02-29 \
     	--checkpoint feature1, feature2, feature3

然后,在读取这些检查点数据时,Jupyterhub非常方便,使调试体验更加直接,并在团队中可共享。

将Python逻辑转换为PySpark

在Yelp,由于我们提供的产品种类繁多,我们的收入识别过程涉及众多业务规则。将此类逻辑转换为PySpark转换函数需要仔细设计并可能进行多次迭代。虽然PySpark的类SQL表达式(如选择、过滤和连接)功能强大,但对于复杂的业务逻辑可能不够灵活。在这种情况下,PySpark UDF(用户定义函数)为实现复杂规则(如折扣应用)提供了更灵活的解决方案。我们在各种对于类SQL表达式处理过于复杂的逻辑中使用了PySpark UDF。

PySpark中有两种类型的UDF:pyspark.sql.functions.udf和pyspark.sql.functions.pandas_udf。虽然两者都有类似的用途,但为简单起见,本演示将重点介绍前者。

UDF示例:折扣应用的业务规则

考虑以下应用折扣的简化业务规则:

  • 如果产品的活动期间完全覆盖折扣期间,则产品可以考虑折扣适用
  • 每个产品只能获得一个折扣
  • A类产品在B类产品之前获得折扣
  • 如果产品类型相同,ID较小的产品优先获得折扣
  • ID较小的折扣优先应用

UDF示例:实现

在按customer_id分组产品和折扣后,我们需要确定每个产品的折扣应用。以下是一个封装此逻辑的Python函数,如何将其应用于分组数据帧以及如何检索结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@udf(ArrayType(DISCOUNT_APPLICATION_SCHEMA))
def calculate_discount_for_products(products, discounts):
    # 基于优先级排序产品和折扣
    products = sorted(products, key=lambda x: (x['type'], x['product_id']))
    discounts = sorted(discounts, key=lambda x: x['discount_id'])

    results = []
    for product in products:
        for discount in discounts:
            if period_covers(discount['period'], product['period']) and discount['amount'] > 0:
                amount = min(product['budget'], discount['amount'])
                discount['amount'] -= amount
                results.append((product['product_id'], product['type'], product['period'], amount, discount['discount_id']))
                break

    return results

# 假设我们之前已按business_id分组了产品和折扣(此处未显示代码)
# 应用上述UDF
result_df = (
    grouped_contracts_with_biz_discounts.withColumn(
        "results",
        calculate_discount_for_products("products", "discounts"),
    )
)
# 然后通过选择分组产品和折扣的关键字(本例中为business_id)展开折扣应用
result_exploded = result_df.select(
    "business_id", explode("results").alias("exploded_item")
)

# 从展开的项目中检索产品和应用的折扣金额/ID
result_exploded = result_exploded.select(
    "business_id",
    "exploded_item.product_id",
    "exploded_item.amount",
    "exploded_item.discount_id",
)

如果不使用UDF,实现此逻辑将需要多个窗口函数,这可能难以阅读和维护。UDF提供了一种更直接和可维护的方法来在PySpark中应用复杂的业务规则。

未来改进

我们理解上述解决方案并非没有成本:

  • 我们仍然在2阶段spark作业中有50多个特征,这对于单个团队将来维护和开发可能非常具有挑战性。添加新产品需要在整个spark作业中进行更改和测试
  • 我们在计算收入期间和合同金额时严重依赖UDF。这些UDF运行成本高昂,并可能随着时间的推移影响作业的性能和可靠性

我们正在探索未来的改进以应对这些挑战:

  • 增强的数据接口和所有权:特征团队将拥有和管理用于离线消费的标准化数据接口,确保一致的数据可用性以进行分析和报告。这些接口抽象了实现细节,为团队提供了灵活性,可以在不中断报告流程的情况下进行更改
  • 简化的数据模型:简化源数据模型最小化了对自定义UDF的需求,因为映射可以使用标准PySpark函数处理
  • 统一实现:跨产品标准化实现并利用高级数据接口减少了输入表,简化了Spark特征拓扑并降低了维护开销

更多关于收入自动化系列

在本博客中,我们讨论了如何在构建Yelp自动收入识别的数据管道时处理模糊需求。我们还讨论了所做的系统设计决策以及解决的技术挑战。

在本系列的下一篇文章中,我们将讨论与确保数据完整性、核对数据差异以及与第三方系统合作的所有学习相关的话题。敬请关注!

致谢

这是一个多年期、跨组织的项目,依赖于所有团队的坚韧和协作来界定范围、设计、实施、测试并完成它。截至目前,我们已经使用这个新系统自动化了几乎所有的Yelp收入。非常感谢项目团队中的每个人的出色工作。特别感谢商务平台和财务系统领导层以及参与此项目的所有利益相关者团队的支持。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计