使用Apache Iceberg构建可复现ML系统

本文详细介绍了如何利用Apache Iceberg和SparkSQL构建可复现的机器学习系统,涵盖时间旅行、模式演进、ACID事务等核心技术,解决ML数据一致性和版本控制难题,提供实际代码示例和最佳实践。

构建可复现的ML系统:Apache Iceberg与SparkSQL开源基础

机器学习数据可复现性问题

常见痛点

数据漂移悄无声息地发生:特征分布随时间变化,直到模型开始做出不合理预测时才被发现。特征管道本应是确定性的,但实际上并非如此;由于时间戳逻辑或竞态条件,相同管道运行两次会产生微妙不同的输出。

版本控制情况更不容乐观。代码版本控制已相对成熟,但数据版本控制仍然主要依赖手动流程、电子表格和祈祷。

典型场景:同事周一运行特征工程,您周二运行,突然从相同源数据得到不同结果。因为底层表在周一至周二期间发生变化,而"时间点"逻辑并不如想象中准确。

传统数据湖的不足

数据湖为需要运行批量报告和ETL作业的分析场景设计,强调存储可扩展性而非事务完整性。这在生成季度报告时表现良好,但机器学习完全不同。

机器学习具有迭代性、实验性,并以传统分析从未需要的方式要求一致性。当模型训练作业读取部分写入的数据时,不仅会得到错误报告,更会获得从垃圾数据学习并做出垃圾预测的模型。

模式灵活性理论上很好,但实践中常导致模式混乱。没有适当的演进控制,数据科学家在向现有表添加"再多一个特征"时会意外破坏下游系统。

元数据情况更糟。传统数据湖跟踪文件而非逻辑数据集,因此在需要理解特征谱系或实施数据质量检查时基本处于盲飞状态。

Iceberg机器学习基础

真正有效的时间旅行

Iceberg的时间旅行基于快照架构,为每个写入操作维护完整的表元数据。每个快照代表表在特定时间点的一致视图,包括模式、分区等所有内容。

对机器学习而言这是革命性的:

1
2
3
4
5
6
7
-- 真正解决可复现性问题的方法
SELECT * FROM ml_features 
FOR SYSTEM_TIME AS OF '2024-01-15 10:30:00'

-- 或使用特定快照ID
SELECT * FROM ml_features 
FOR SYSTEM_VERSION AS OF 1234567890

无需再猜测哪个数据版本产生了好结果,不再有"上周还好好的"对话。可以跨时间段比较特征分布,通过检查历史数据状态分析模型性能退化,并构建真正提供一致结果的A/B测试框架。

无痛模式演进

向特征表添加新列不应需要团队会议和迁移计划。Iceberg的模式演进让您适应不断变化的需求,而不会破坏现有的读取器或写入器。

可以添加列、重命名列、重新排序列和提升数据类型,同时保持向前和向后兼容性。系统通过唯一字段ID跟踪列标识,因此重命名列不会破坏现有查询。

1
2
3
4
5
6
7
-- 添加特征确实如此简单
ALTER TABLE customer_features 
ADD COLUMN lifetime_value DOUBLE

-- 重命名不会破坏下游任何内容
ALTER TABLE customer_features 
RENAME COLUMN purchase_frequency TO avg_purchase_frequency

ACID事务(终于实现!)

ACID支持允许ML工作负载安全地操作共享数据集,而不会破坏数据或创建不一致读取。Iceberg使用乐观并发控制:多个写入器可以同时工作,但冲突会被自动检测和解决。

隔离级别防止读取器看到部分写入。因此当训练作业启动时,即使其他人正在实时更新特征,也能保证看到数据的一致快照。

构建可复现特征管道

合理的分区策略

分区秘密并不复杂:根据实际查询数据的方式在维度上进行分区。

大多数ML工作负载遵循时间模式,基于历史数据进行训练并预测近期数据。因此使用日期或时间戳列的时间分区通常是最佳选择。粒度取决于数据量:高容量系统使用每日分区,较小数据集使用每周或每月分区。

1
2
3
4
5
6
7
8
CREATE TABLE customer_features (
    customer_id BIGINT,
    feature_timestamp TIMESTAMP,
    demographic_features MAP<STRING, DOUBLE>,
    behavioral_features MAP<STRING, DOUBLE>,
    target_label DOUBLE
) USING ICEBERG
PARTITIONED BY (days(feature_timestamp))

实验数据版本控制

可复现实验需要数据版本和模型工件之间的紧密耦合。Iceberg的快照为此提供了基础,实现了将模型性能与特定数据状态关联的健壮实验跟踪。

与MLflow或类似跟踪系统的集成创建了模型运行和数据版本之间的可审计连接。每个训练作业记录输入数据集的快照ID,精确复现实验条件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import mlflow
from pyspark.sql import SparkSession

def train_model_with_versioning(spark, snapshot_id):
    # 从特定快照加载数据
    df = spark.read \
        .option("snapshot-id", snapshot_id) \
        .table("ml_features.customer_features")
    
    # 在MLflow中记录数据版本
    mlflow.log_param("data_snapshot_id", snapshot_id)
    mlflow.log_param("data_row_count", df.count())
    
    # 继续模型训练...

生产实施

真实案例:客户流失预测

处理每日数百万客户交互同时保持完全可复现性的系统。数据架构使用多个按新鲜度和访问模式组织的Iceberg表。原始事件流入暂存表,经过验证和清理后,聚合为针对ML访问模式优化的特征表。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 原始事件暂存表
CREATE TABLE customer_events_staging (
    event_id STRING,
    customer_id BIGINT,
    event_type STRING,
    event_timestamp TIMESTAMP,
    event_properties MAP<STRING, STRING>,
    ingestion_timestamp TIMESTAMP
) USING ICEBERG
PARTITIONED BY (days(event_timestamp))
TBLPROPERTIES (
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'snappy'
)

-- 优化布局的特征表
CREATE TABLE customer_features (
    customer_id BIGINT,
    feature_date DATE,
    recency_days INT,
    frequency_30d INT,
    monetary_value_30d DOUBLE,
    support_tickets_30d INT,
    churn_probability DOUBLE,
    feature_version STRING
) USING ICEBERG
PARTITIONED BY (feature_date)
CLUSTERED BY (customer_id) INTO 16 BUCKETS

性能优化

Iceberg中的查询性能受益于几种互补技术。文件大小很重要:根据访问模式目标文件大小为128MB到1GB,高度选择性查询使用较小文件,全表扫描使用较大文件。

Parquet为ML工作负载提供天然优势,因为通常选择列子集。压缩选择取决于优先级:频繁访问数据使用Snappy(更快解压缩),归档数据使用Gzip(更好压缩比)。

使用聚类或Z排序的数据布局优化可以显著改善多维访问模式的性能。这些技术在文件内共置相关数据,减少典型ML查询的扫描开销。

1
2
3
4
5
6
7
-- 为典型访问模式优化表
ALTER TABLE customer_features 
SET TBLPROPERTIES (
    'write.target-file-size-bytes' = '134217728',  -- 128MB
    'write.parquet.bloom-filter-enabled.customer_id' = 'true',
    'write.parquet.bloom-filter-enabled.feature_date' = 'true'
)

最佳实践与经验教训

选择表格式

何时使用Iceberg而不是其他选项?并非总是显而易见。

Iceberg在需要强一致性保证、复杂模式演进和时间旅行功能时表现出色。ML工作负载特别受益于这些功能,因为其具有实验性质和可复现性要求。

Delta Lake提供类似功能,与某机构生态系统集成更紧密。如果主要在Databricks内操作或需要高级功能(如液体聚类),Delta可能是更好选择。

Apache Hudi为流式用例优化,具有复杂索引。考虑将其用于具有重度流式要求或复杂更新插入模式的ML系统。

有时普通的Parquet表就足够了。如果具有简单、仅追加工作负载和稳定模式,表格式的操作开销可能不值得。不要过度设计解决实际不存在问题的方案。

常见陷阱

过度分区是最常见的错误。创建数据量小于100MB或每个分区超过一万个文件的分区会损害查询规划性能。基于实际使用模式(而非理论理想)监控分区统计并调整策略。

即使有Iceberg的安全功能,模式演进错误也会破坏下游消费者。在CI/CD管道中实施模式验证,在部署前捕获不兼容更改。使用列映射功能将逻辑列名与物理存储解耦。

当团队不利用Iceberg优化功能时,经常出现查询反模式。在WHERE子句中包含分区谓词以避免不必要扫描。通过仅选择所需列而非SELECT * 来使用列修剪。

Apache Iceberg和SparkSQL为构建在生产中实际可靠工作的ML系统提供了坚实基础。时间旅行、模式演进和ACID事务的组合解决了多年来困扰ML基础设施的基本数据管理挑战。

投资通过改进开发速度、减少调试时间和增加系统可靠性信心获得回报。团队一致报告更好的实验可复现性和新模型更快的生产时间。

但成功需要围绕分区、模式设计和操作程序的深思熟虑决策。该技术提供强大功能,但需要了解底层架构和ML工作负载的特定要求才能实现收益。

随着ML系统复杂性和业务关键性增长,可靠数据基础变得越来越重要。Iceberg代表了一个成熟、生产就绪的解决方案,帮助组织构建具有与传统企业应用相同可靠性期望的ML系统。

诚实地说?是时候拥有真正按我们需要方式工作的工具了。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计