增强Yelp的神经网络训练:通过WideAndDeep实现1400倍加速
在Yelp,我们遇到了挑战,促使我们改进广告收入生成模型的训练时间,这些模型使用宽深神经网络架构来预测广告点击率(pCTR)。这些模型处理具有小参数空间的大型表格数据集,需要创新的数据解决方案。这篇博客文章深入探讨了我们使用TensorFlow和Horovod优化训练时间的历程,以及ArrowStreamServer的开发——这是我们内部用于低延迟数据流和服务的库。这些组件共同使我们能够在关键业务模型的训练上实现1400倍的加速,相比使用单个GPU和Petastorm。
挑战
在Yelp,我们在优化机器学习模型(特别是pCTR模型)的训练时间方面遇到了几个挑战。我们的主要机器学习工具是Spark,大多数数据集以Parquet格式存储在S3中。
最初,在p3-2xlarge实例上训练4.5亿个表格样本每个周期需要75小时。我们的目标是将这种效率扩展到20亿个表格样本,同时实现每个周期训练时间少于1小时。这需要创新解决方案来解决几个关键挑战:
- 数据存储:有效管理存储在S3上Parquet格式的大型分布式表格数据集,以确保与我们Spark生态系统的兼容性。由于我们数据的表格性质,Petastorm效率低下,因此我们开发了ArrowStreamServer来提高流性能。
- 分布式训练:通过从TensorFlow的MirroredStrategy过渡到Horovod,有效扩展到多个GPU。
技术实现
数据存储
我们选择将物化训练数据存储在S3上的Parquet数据集中,原因如下:
- 兼容性:与Yelp的Spark生态系统无缝集成。
- 效率:Parquet高度压缩并针对I/O操作进行了优化。
- 可扩展性:S3提供几乎无限的存储容量。
- 性能:S3在并行访问时可以支持非常高的吞吐量。
将训练数据物化到S3后,需要读取并转换为TensorFlow数据集。我们的第一种方法是使用Petastorm,它非常易于使用且与Spark集成良好。然而,我们发现Petastorm不适合我们的用例,因为在使用重新批处理方法实现所需批次大小时速度要慢得多。这种低效率是由于Yelp专注于具有数百个特征和每行组数十万行的表格数据集,其中解批处理会导致张量爆炸并创建数百万个张量。
为了解决这个挑战,我们探索了替代解决方案,并发现TensorFlow I/O有一个称为ArrowStreamDataset的实现。通过直接使用TensorFlow数据集,我们可以避免依赖Python生成器,后者已知存在可移植性有限和可扩展性问题。经过测试,我们发现这种方法在我们的用例中表现显著更好。
| 方法 | 时间 |
|---|---|
| boto3读取原始数据 | 18.5秒 |
| Petastorm(每个行组一个批次) | 76.4秒 |
| Petastorm(重新批处理到4096) | 815秒 |
| ArrowStream(批处理到4096) | 19.2秒 |
| 转换900万样本的时间 |
不幸的是,ArrowStreamDataset仅支持以Feather格式存储在本地磁盘上的数据集,这与我们的生态系统不兼容,并限制了扩展到必要数据集大小的能力。
为了使用ArrowStreamDataset并更有效地从S3流式传输数据,我们使用PyArrow实现了ArrowStreamServer。ArrowStreamServer读取和批处理数据集,通过套接字将它们作为RecordBatch流提供服务。每个ArrowStreamServer在单独的进程中运行以实现更好的并行性。
在消费者端,我们利用ArrowStreamDataset从端点读取RecordBatch,实现数据集的高效批处理和交错。
分布式训练
随着训练数据从数亿样本增长到数十亿样本,我们采用跨多个GPU的分布式训练来改善训练时间。最终我们选择Horovod作为默认的分布式方法。
最初,我们使用TensorFlow内置的MirroredStrategy进行分布式训练。在4个GPU上我们获得了几乎线性的加速。然而,当我们从4个GPU扩展到8个GPU时,我们发现MirroredStrategy不是最优的,导致GPU、CPU和I/O指标低下。瓶颈被确定在Keras数据处理器中,它在跨设备分片数据集时遇到困难。
在观察到TensorFlow内置策略的瓶颈后,我们决定尝试Horovod更复杂的分布式训练能力。在我们的测试中,我们发现Horovod在最多8个GPU上提供了与单个GPU相比的线性性能扩展。我们认为Horovod优越性能的原因如下:
- 高效的进程管理:Horovod每个设备使用一个进程,优化了资源利用率并避免了Python中的GIL。
- 梯度转换:通过将稀疏梯度转换为密集梯度,Horovod显著提高了全归约操作期间的内存效率和速度。我们建议对于具有大批次大小和众多分类特征的模型将稀疏梯度转换为密集梯度。
切换到Horovod带来了几个挑战,主要是由于使用Keras的预制WideDeepModel和多GPU机器上资源管理的复杂性。
为了支持Keras,Horovod实现了DistributedOptimizer来包装和覆盖_compute_gradients方法,但Keras的预制WideDeepModel直接调用GradientTape而不是调用minimize来支持两个优化器的情况。为了解决这个问题,我们必须覆盖WideDeepModel的train_step。
我们还遇到了"线程风暴"和内存不足问题,因为创建了数千个线程。为了防止核心和内存的过度订阅,我们设计了一种方法,将这些库使用的线程池从所有可用资源缩减到每个GPU的可用资源。
| 设置 | 值 |
|---|---|
| tf.data.Options.threading.private_threadpool_size | 每个GPU的CPU核心数 |
| tf.data.Options.autotune.ram_budget | 每个GPU的可用主机内存 |
| OMP_NUM_THREADS | 每个GPU的CPU核心数 |
| TF_NUM_INTEROP_THREADS | 1 |
| TF_NUM_INTRAOP_THREADS | 每个GPU的CPU核心数 |
| 基于GPU的资源分割设置 |
总结
为了将所有这些东西整合并纳入Yelp现有的Spark ML生态系统,我们设计了一个KerasEstimator作为我们ML管道中的Spark Estimator。如下图所示,我们首先将转换后的特征物化到S3,并在训练Spark Executors上使用ArrowStreamServer提供服务,然后我们利用ArrowStreamDataset从ArrowStreamServer流式传输训练数据。TFMirrorRunner和HorovodRunner作为SparkRunner的两个具体实现,用于在Spark执行器中设置和训练Keras模型。
结果和收益
性能改进
我们在具有20亿样本的数据集上使用pCTR模型进行的基准测试显示了显著的改进。通过使用ArrowStream优化数据存储,我们从4.5亿样本的75小时起点实现了85.8倍的加速。此外,实施分布式训练提供了额外的16.9倍加速,导致总加速约为1400倍。这些结果强调了我们的方法在优化速度和成本方面的有效性。
注意:除A10G外,所有测试均使用AWS P3实例完成,A10G是G5dn实例。
结论
我们向使用TensorFlow与Horovod的过渡显著加速了我们的机器学习训练过程,降低了成本并提高了开发人员速度。这种方法不仅解决了我们面临的挑战,还为未来的可扩展性和效率改进奠定了基础。I/O通常是神经网络训练的瓶颈,特别是对于表格数据集和相对简单的模型。改进I/O等同于提高训练性能。对于训练中小型模型,AWS G系列实例通常优于P系列实例,特别是如果我们考虑成本。
致谢
感谢Nathan Sponberg实现了KerasEstimator。
脚注
- 高达数十太字节 ↩
- AWS S3可以向单个EC2实例提供高达100Gb/s的带宽。简介 - 最佳实践设计模式:优化Amazon S3性能 ↩
- Parquet行组是Petastorm中的默认批次大小 ↩
- 我们使用p3.8xlarge和p3.16xlarge ↩
- 在我们的案例中,我们有数百个分类特征和32K-256K的批次大小 ↩
- Spark上Tensorflow MirrorStrategy的包装器 ↩
- Spark上Horovod的包装器 ↩