挑战
在Yelp,我们在优化机器学习模型(特别是pCTR模型)训练时间方面遇到了几个挑战。我们的主要机器学习工具是Spark,大多数数据集以Parquet格式存储在S3中。
最初,在p3-2xlarge实例上训练4.5亿个表格样本每个周期需要75小时。我们的目标是将这种效率扩展到20亿个表格样本,同时实现每个周期训练时间少于1小时。这需要创新解决方案来解决几个关键挑战:
- 数据存储:高效管理存储在S3上Parquet格式的大型分布式表格数据集,确保与Spark生态系统兼容
- 分布式训练:通过从TensorFlow的MirroredStrategy过渡到Horovod,有效扩展到多个GPU
技术实现
数据存储
我们选择将物化训练数据以Parquet数据集形式存储在S3中,原因如下:
- 兼容性:与Yelp的Spark生态系统无缝集成
- 效率:Parquet高度压缩且针对I/O操作优化
- 可扩展性:S3提供几乎无限的存储容量
- 性能:S3在并行访问时可以支持非常高的吞吐量
在将训练数据物化到S3后,需要读取并将其转换为TensorFlow数据集。我们最初的方法是使用Petastorm,它非常易于使用且与Spark集成良好。然而,我们发现Petastorm不适合我们的用例,因为在使用重新批处理方法实现所需批次大小时速度要慢得多。
为了解决这一挑战,我们探索了替代解决方案,发现TensorFlow I/O有一个称为ArrowStreamDataset的实现。通过直接使用TensorFlow数据集,我们可以避免依赖Python生成器,后者以可移植性和可扩展性有限而闻名。经过测试,我们发现这种方法在我们的用例中表现显著更好。
方法 | 时间 |
---|---|
boto3读取原始数据 | 18.5秒 |
Petastorm(每个行组一批) | 76.4秒 |
Petastorm(重新批处理到4096) | 815秒 |
ArrowStream(批处理到4096) | 19.2秒 |
转换900万样本时间 |
不幸的是,ArrowStreamDataset仅支持以Feather格式存储在本地磁盘上的数据集,这与我们的生态系统不兼容,并且限制了我们扩展到必要数据集大小的能力。
为了使用ArrowStreamDataset并更有效地从S3流式传输数据,我们使用PyArrow实现了ArrowStreamServer。ArrowStreamServer读取和批处理数据集,通过套接字将其作为RecordBatch流提供服务。每个ArrowStreamServer在单独的进程中运行以获得更好的并行性。
在消费者端,我们利用ArrowStreamDataset从端点读取RecordBatch,实现数据集的高效批处理和交错。
分布式训练
随着训练数据从数亿样本增长到数十亿样本,我们采用跨多个GPU的分布式训练来改善训练时间。最终我们选择Horovod作为默认分发方法。
最初,我们使用TensorFlow内置的MirroredStrategy进行分布式训练。在4个GPU上我们获得了几乎线性的加速。然而,当我们从4个GPU扩展到8个GPU时,我们发现MirroredStrategy不是最优的,导致GPU、CPU和IO指标较低。瓶颈被确定在Keras数据处理器中,该处理器难以在设备间分片数据集。
在观察到TensorFlow内置策略的瓶颈后,我们决定尝试Horovod更复杂的分布式训练能力。在我们的测试中,我们发现与使用单个GPU相比,Horovod在最多8个GPU上提供了线性性能扩展。我们认为Horovod性能优越的原因如下:
- 高效的进程管理:Horovod每个设备使用一个进程,优化了资源利用率并避免了Python中的GIL
- 梯度转换:通过将稀疏梯度转换为密集梯度,Horovod在全归约操作期间显著提高了内存效率和速度
切换到Horovod带来了几个挑战,主要是由于使用Keras的预制WideDeepModel和多GPU机器上资源管理的复杂性。
为了支持Keras,Horovod实现了DistributedOptimizer来包装和覆盖_compute_gradients方法,但Keras的预制WideDeepModel直接调用GradientTape而不是调用minimize来支持两个优化器情况。为了解决这个问题,我们必须重写WideDeepModel的train_step。
我们还遇到了"线程风暴"和内存不足问题,因为创建了数千个线程。为了防止核心和内存的过度订阅,我们设计了一种方法,将这些库使用的线程池从所有可用资源缩减到每个GPU的可用资源。
设置 | 值 |
---|---|
tf.data.Options.threading.private_threadpool_size | 每个GPU的CPU核心数 |
tf.data.Options.autotune.ram_budget | 每个GPU的可用主机内存 |
OMP_NUM_THREADS | 每个GPU的CPU核心数 |
TF_NUM_INTEROP_THREADS | 1 |
TF_NUM_INTRAOP_THREADS | 每个GPU的CPU核心数 |
基于GPU拆分资源的设置 |
总结
为了将所有这一切整合并纳入Yelp现有的Spark ML生态系统,我们设计了一个KerasEstimator作为ML管道中的Spark Estimator。如下图所示,我们首先将转换后的特征物化到S3中,并在训练Spark Executors上使用ArrowStreamServer提供服务,然后利用ArrowStreamDataset从ArrowStreamServer流式传输训练数据。TFMirrorRunner和HorovodRunner作为SparkRunner的两个具体实现,用于在Spark执行器中设置和训练Keras模型。
结果和收益
性能改进
我们在包含20亿样本的数据集上使用pCTR模型进行的基准测试显示了实质性改进。通过使用ArrowStream优化数据存储,我们从4.5亿样本的75小时起点实现了85.8倍加速。此外,实施分布式训练提供了额外的16.9倍加速,导致总加速约1400倍。这些结果强调了我们的方法在优化速度和成本方面的有效性。
注意:除A10G外,所有测试均使用AWS P3实例完成,A10G是G5dn实例。
结论
我们向使用TensorFlow与Horovod的过渡显著加速了机器学习训练过程,降低了成本并提高了开发速度。这种方法不仅解决了我们面临的挑战,还为未来的可扩展性和效率改进奠定了基础。IO通常是神经网络训练的瓶颈,特别是对于表格数据集和相对简单的模型。改进IO等同于提高训练性能。对于训练中小型模型,AWS G系列实例通常优于P系列实例,特别是如果我们考虑成本因素。
致谢
感谢Nathan Sponberg实现KerasEstimator。