利用Spark 3和NVIDIA GPU为大数据管道降低云成本高达70%
作者:Ilay Chen和Tomer Akirav
在PayPal,每小时有数十万个Apache Spark作业运行,处理PB级数据并需要大量资源。为应对机器学习解决方案的增长,PayPal需要可扩展的环境、成本意识和持续创新。本文阐述了Apache Spark 3和GPU如何帮助企业将大数据处理和AI应用的Apache Spark作业云成本降低高达70%。
我们的旅程将从简要介绍Spark RAPIDS开始——这是Apache Spark的加速器,利用GPU通过RAPIDS库加速处理。然后我们将回顾PayPal基于CPU的Spark 2应用,升级到Spark 3及其新功能的过程,探索将Apache Spark应用迁移到GPU集群的方法,以及如何调优Spark RAPIDS参数。最后我们将讨论遇到的一些挑战和更新带来的好处。
AI生成的云端天平秤图像
背景
GPU无处不在,其并行特性非常适合处理AI和图形应用等。对于不熟悉的人来说:GPU与CPU在计算方面的区别在于,CPU具有有限数量的非常强大的核心,而GPU拥有数千甚至数万个相对较弱但协同工作良好的核心。PayPal已经利用GPU训练模型一段时间了,因此我们决定评估GPU的并行性是否有助于处理基于Apache Spark的大数据应用。
在研究过程中,我们遇到了NVIDIA的Spark RAPIDS开源项目。它有许多用途,但我们专注于Spark RAPIDS的成本降低潜力,因为像PayPal这样的企业在云中运行Spark作业花费了大量资金。在行业中,将Spark与GPU结合使用尚不常见,但根据本文所述的发现,潜在收益可能非常巨大。
什么是Spark RAPIDS?
Spark RAPIDS是一个支持在Spark应用中使用GPU的项目。NVIDIA团队调整了Apache Spark的设计以利用GPU的强大能力。它对于大型连接、分组、排序和类似功能非常有益。Spark RAPIDS可以提升某些工作负载的性能,我们将在后面讨论其识别过程。您可以查看文档了解更多详情。
使用Spark RAPIDS通过GPU加速大数据处理有几个原因:GPU有自己的环境和编程语言,因此我们不能轻易在其上运行Python/Scala/Java/SQL代码。必须将代码转换为GPU编程语言,而Spark RAPIDS以透明的方式完成这种转换。Spark RAPIDS做出的另一个很酷的设计改变是Spark如何处理作业Spark计划每个阶段中的任务。在纯Spark中,阶段的每个任务都被发送到集群中的单个CPU核心。这意味着并行性处于任务级别。在Spark RAPIDS中,并行性是任务内的,意味着任务被并行化,每个任务内的数据处理也被并行化。GPU是强大的计算处理器,这激励我们操纵作业使其更受计算限制,从而处理大型分区。
NVIDIA提供的任务级并行性与数据级并行性对比
如需更多信息和详细解释,我们建议阅读NVIDIA的书籍《加速Apache Spark 3》。
入门
我们在PayPal研究环境中对Spark RAPIDS的初步实验取得了成功,这是一个可以访问网络但资源有限且没有生产数据的开放环境。下一步是将加速器投入生产,以衡量实际生产应用。
根据Spark RAPIDS文档,并非所有作业都适合此加速器,因此我们努力寻找最相关的作业。我们从一个Spark 2(CPU集群)作业开始,该作业处理大量数据(多个约10TB的输入),在异常大的数据集上执行SQL操作,使用密集的shuffle,并且需要相当多的机器。根据NVIDIA的资格工具预测,该作业具有很高的成功率,该工具分析基于CPU的Spark应用的Spark事件,以帮助量化将Spark应用迁移到GPU集群的预期加速效果。
如上所述,我们理解为了充分利用GPU,必须操纵Spark作业以处理大型分区。我们处理大分区的目标是操纵查询和操作使其更受计算限制,而不是受I/O或网络限制,从而有效利用GPU。
为了操纵作业以处理大型分区,我们更改了两个参数:AQE(自适应查询执行)参数,这是Spark 3中的一种新优化技术,除其他外,它调整shuffle阶段中的分区数量,使每个分区达到特定大小。第二个参数是spark.sql.files.maxPartitionBytes,它处理输入分区的大小。这些shuffle/输入阶段中的分区数量也会影响许多伴随阶段。
对于基线运行,我们没有设置spark.sql.files.maxPartitionBytes参数,因此Spark计划使用了默认值128MB。现在让我们看看读取大输入的原始阶段在Spark UI中的样子:
按Enter或点击查看完整尺寸图像
如您所见,我们获得了9.5TB的数据作为输入,Spark将其分为约185,000个分区(!),这意味着每个分区约为9.5TB/185,000 = 50MB。输入文件大小约为1GB,在Spark集群中将每个文件分成20个不同的分区对我们来说没有意义。这种分离导致许多网络通信开销,并在此阶段导致更长的延迟。
现在,在将spark.sql.files.maxPartitionBytes参数设置为2GB后(我们操纵Spark读取更大的输入分区,从而在后续阶段处理更大的分区),让我们看看该阶段如何受到影响:
按Enter或点击查看完整尺寸图像
我们的9.5TB被分配到10,000个分区,这比基线运行少了近20倍的分区,导致总时间减少到40分钟,运行时间减少了30%。
现在,让我们看看基线运行的所有最重的输入阶段,其中spark.sql.files.maxPartitionBytes设置为默认值:
按Enter或点击查看完整尺寸图像
将spark.sql.files.maxPartitionBytes设置为2GB后:
按Enter或点击查看完整尺寸图像
如我们所见,此更改降低了输入处理阶段的任务数量,这个简单的参数更改导致这些阶段的运行时间减少了20多分钟。
Spark 3和AQE
为了将我们的作业迁移到Apache Spark 3,必须采取相当多的步骤。我们必须更新代码中的一些语法,并且我们基础设施和应用的每个jar都必须使用更新的Scala版本编译。您可以查看官方迁移指南。
在Spark 3中,添加了使用GPU的能力并启用了AQE优化技术。如上所述,目标是操纵Spark处理大型分区,这意味着应用AQE至少1GB(或减少spark.shuffle.partitions数量)。为了使Spark应用处理1GB的分区,需要配置这些属性:
|
|
如下所示,在我们的用例中,这种做法在运行时间方面是有益的:
我们基线运行中的一个shuffle阶段(无AQE):
按Enter或点击查看完整尺寸图像
具有AQE的shuffle阶段:
按Enter或点击查看完整尺寸图像
在调整候选作业以处理大型分区后,我们检查了集群的利用率,发现它没有被充分利用,因此我们可以尝试减少应用消耗的机器数量。基线作业使用140台机器,在调整Spark和集群节点后,我们最终使用了100台得到相当充分利用的机器。此更改仅略微影响了作业的运行时间,但显著降低了成本!
中间结果:
我们削减了约20%的应用运行时间和约30%的资源,导致成本降低约45%!
例如,如果初始云使用成本为1,000 PYUSD,那么现在我们可能站在约550 PYUSD左右!
CPU运行图表:
总体而言,我们最初的意图是处理大型分区 solely 以从GPU中受益,但如我们所见,即使在使用Spark RAPIDS之前,性能也有显著提升,这令人兴奋!
(免责声明:此实践并非对所有作业都带来相同结果。这取决于数据及其操作。)
到目前为止,我们只是准备了适合Spark RAPIDS和GPU的作业,现在新的挑战开始了——迁移到GPU集群,学习新的调优概念,故障排除和优化GPU使用。
迁移到GPU集群
GPU迁移包括启用Spark RAPIDS初始化脚本,将所有依赖项复制到PayPal的生产环境中,在我们内部基础设施中支持GPU参数,学习我们云供应商的GPU集群功能等。(免责声明:如今,云供应商发布了新的自定义镜像,其中包含内置的Spark RAPIDS实例,因此可以节省这项工作。)
在运行一些简单作业后,确保我们创建了一个稳定可靠的基础设施,其中GPU集群按预期运行Spark RAPIDS,我们深入研究了用它运行候选生产应用。感谢Spark RAPIDS文档,我们排查了在根据需求调整时遇到的少数运行时错误。让我们快速覆盖两个帮助我们更好地理解Spark RAPIDS调优的问题:
无法分配本机内存:std::bad_alloc: RMM失败于:arena.hpp:382: 超过最大池大小
此错误背后的含义是GPU内存池已耗尽。要解决此问题,需要从GPU内存中释放一些压力。在查阅文献后,很明显某些配置对每个作业都至关重要。例如:
spark.rapids.sql.concurrentGpuTasks — 表示GPU并发处理的任务数量。
为了最大化执行性能,我们旨在并行运行尽可能多的任务。我们最初过于雄心勃勃,将此参数设置得太高,并立即收到上述错误。发生这种情况是因为我们使用只有16GB内存的Tesla T4 GPU。作为检查,我们将spark.rapids.sql.concurrentGpuTasks参数设置为1,并注意到没有内存错误。为了正确利用资源,我们必须找到GPU并发参数的最佳点。为了找到这一点,我们查看了GPU利用率指标(我们将在博客后面解释),并旨在将利用率保持在50%左右——NVIDIA团队建议我们这样做,以便在GPU计算与其与主存储器的通信/数据传输之间进行公平分配。在我们的案例中,经过一些试验和错误,我们确定一次运行2个任务,意味着设置spark.rapids.sql.concurrentGpuTasks = 2。
我们遇到的另一个有趣问题与运行时性能和稳定性有关。在将集群中的机器数量从140台减少到30台后,我们的Spark作业比预期慢,并偶尔失败,提示如下:
java.io.IOException: 设备上没有剩余空间
我们更深入地查看了节点,注意到当我们将GPU添加到机器时,我们能够解决计算瓶颈,但“压力”转移到了本地SSD。这是因为我们的GPU内存容量较低,倾向于将内存交换到本地磁盘。我们的Spark计划使用大型分区的事实增加了磁盘溢出。最初,当每个节点有4个SSD(每个375GB)时,我们发现作业比预期慢,有时甚至失败。为了克服此问题,我们将SSD数量加倍至8个,获得了稳定的结果和更好的性能。在云供应商处添加本地SSD相对便宜,因此此解决方案并未真正影响我们的总成本。
所有与本地SSD的交互都比主存储器访问慢得多。此情况的关键参数是:
spark.rapids.memory.host.spillStorageSize — 在溢出到本地磁盘之前,用于缓冲溢出的GPU数据的非堆主机内存量。
将溢出存储参数增加到32GB减少了我们作业的运行时间。
Spark RAPIDS优化:技巧和见解
选择NVIDIA的Tesla T4 GPU: 在NVIDIA的GPU中,我们发现Tesla T4通常对此类计算具有最佳性能/价格比,NVIDIA团队为降低成本目的推荐给我们。(免责声明:新的L4 GPU可能会提供更好的结果。)
考虑内存开销: 请记住,GPU不使用执行器内存,而是使用非堆内存,因此我们必须为每个执行器保证足够的内存开销。我们将内存开销设置为16GB。
调整spark.task.resource.gpu.amount: 此参数限制在执行器上允许并发运行的任务数量,无论这些任务是否使用GPU。起初我们贪婪地尝试为每个执行器分配许多任务。由于过多的I/O和溢出,它减慢了阶段的运行时间。在我们的案例中,我们发现0.0625(1/16)是一个好点。
使用spark.rapids.memory.pinnedPool.size: 固定内存指的是操作系统将保留在系统RAM中且不会重定位或交换到磁盘的内存页。使用固定内存显著提高了GPU和主机内存之间数据传输的性能。我们将此参数设置为8GB。
配置NVME本地SSD: Spark RAPIDS集群中的磁盘配置为使用NVME协议,导致速度提升10%。
凭借更强大的计算能力,我们允许自己挑战集群并减少机器数量。经过一些试验和错误,我们将GPU集群设置为运行30台机器,每台机器具有32核心、120GB RAM、8个SSD和2个Tesla T4 GPU,持续1.3小时。
Spark RAPIDS最终调优
GPU利用率
我们的云供应商提供了一个工具/代理,从GPU VM实例提取指标,如GPU利用率和GPU内存。这使我们能够监控GPU的使用情况,这对于识别未充分利用的GPU和优化工作负载至关重要。
GPU利用率图表
最终成本比较
下面我们可以找到研究发现的总结:
例如,考虑一个成本为1,000 PYUSD的作业,Spark 3与GPU将该成本降低到300 PYUSD。根据配置,您可以通过使用GPU集群处理大量数据享受高达70%的潜在成本节省。
按Enter或点击查看完整尺寸图像
机器硬件的价格已计入成本计算
关键学习
GPU不仅可以有效用于训练AI模型,还可以用于大数据处理。
消耗大量数据以在大型数据集上执行某些SQL操作的Spark作业是使用Spark RAPIDS加速的良好候选者。它们的资格可以通过NVIDIA的资格工具验证。
某些工作负载受益于受计算限制,这可以通过操纵Spark作业以处理大型分区来实现,通过spark.sql.files.maxPartitionBytes和AQE参数。
利用Spark 3与GPU和Spark RAPIDS可以显著降低合格工作负载的云成本。
未来思考
运行Spark RAPIDS与自动扩展GPU集群的潜力受到我们高度重视。由于其主要GPU机器的spot价格低于永久实例,此实践可能显著降低成本。
致谢
感谢Lena Polyak、Neta Golan、Roee Bashary和Tomer Pinchasi对项目成功的重大贡献。非常感谢NVIDIA的Spark RAPIDS团队支持我们。