生产级大数据:Databricks中的Apache Spark作业及更多
本文基于现有扩展Apache Spark工作负载的经验,通过保留八个最重要的策略,并将高价值但次要的策略(如优先使用窄转换、应用代码级最佳实践、利用Databricks Runtime功能和优化集群配置)移至杂项部分,从而不失去对洗牌和内存等影响领域的关注,同时仍能彻底解决这些问题。
分阶段洞察的图表和示例代码可以在Databricks或原生Spark会话中完全执行,所有这些都值得您投入时间,应用程序将带来难以置信的性能优势,在实际管道中通常达到5-20倍的范围。
优化策略
1. 分区和并行化
策略:在像连接这样的洗牌密集型操作之前使用repartition()来增强并行性,并在写入前使用coalesce()来最小化分区,防止小文件问题冲击存储元数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
spark = SparkSession.builder.appName("PartitionExample").getOrCreate()
# 示例DataFrame创建
data = [(i, f"val_{i}") for i in range(1000000)]
df = spark.createDataFrame(data, ["id", "value"])
# 在连接或聚合之前重新分区以增强并行性
df_repartitioned = df.repartition(200, "id") # 洗牌到200个均匀分区
# 执行示例操作(例如groupBy)
aggregated = df_repartitioned.groupBy("id").count()
# 在写入之前合并以减少输出文件
aggregated_coalesced = aggregated.coalesce(10)
aggregated_coalesced.write.mode("overwrite").parquet("/tmp/output")
print(f"重新分区后的分区数: {df_repartitioned.rdd.getNumPartitions()}")
print(f"合并后的分区数: {aggregated_coalesced.rdd.getNumPartitions()}")
|
解释:分区是Spark分布式模型中任务并行化和负载均衡的基础。repartition(n)通过完全洗牌确保数据均匀分布,适合在连接前使用以避免执行器过载。coalesce(m)(其中m < 当前分区数)在本地合并以实现高效写入,减少Databricks的Delta或S3中的I/O成本。
风险:过度重新分区会增加洗牌开销;通过Spark UI的"输入大小"指标进行监控。好处:可扩展至TB级数据;跨Spark环境通用。
2. 缓存和持久化
策略:在迭代或多用途场景中缓存或持久化可重用的DataFrame,以避免重新计算。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
spark = SparkSession.builder.appName("CachingExample").getOrCreate()
# 创建示例DataFrame
df = spark.range(1000000).withColumn("squared", spark.range(1000000).id ** 2)
# 缓存到内存(默认)
df.cache()
print("第一次计算(有效未缓存,但设置缓存):", df.count())
# 重用:第二次更快
print("第二次计算(从缓存):", df.count())
# 使用自定义级别持久化(例如内存和磁盘)
df.persist(StorageLevel.MEMORY_AND_DISK)
print("持久化计数:", df.count())
# 清理
df.unpersist()
|
解释:在循环或DAG分支中,重新计算会严重影响性能。cache()使用MEMORY_ONLY;persist()允许像MEMORY_AND_DISK这样的级别以应对溢出恢复。在Databricks中,这利用了快速的NVMe;注意内存使用以避免驱逐。
好处:在ML训练中速度提升高达10倍。
风险:内存耗尽 – 使用spark.ui进行跟踪。
3. 谓词下推
策略:尽早过滤以利用存储级剪枝,特别是使用Parquet/Delta时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("PushdownExample").getOrCreate()
# 从Parquet读取(支持下推)
df = spark.read.parquet("/tmp/large_dataset.parquet") # 假设预写入的大文件
# 早期过滤:下推到存储
filtered_df = df.filter(col("value") > 100).filter(col("category") == "A")
# 进一步操作:减少数据洗牌
result = filtered_df.groupBy("category").sum("value")
result.show()
# 比较执行计划
df.explain() # 无过滤
filtered_df.explain() # 可见下推
|
解释:下推在源端跳过不相关数据,大幅减少读取。Delta Lake通过统计信息增强;通用但依赖于格式(Parquet支持,JSON不支持)。
好处:节省网络。
风险:过度过滤隐藏数据问题。
4. 倾斜处理
策略:对键加盐或自定义分区以均衡分布。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit, rand, floor
spark = SparkSession.builder.appName("SkewExample").getOrCreate()
# 倾斜的DataFrame
skewed_df = spark.createDataFrame([(i % 10, i) for i in range(1000000)], ["key", "value"]) # 低键上有许多重复
# 对键加盐:附加随机后缀(0-9)
salted_df = skewed_df.withColumn("salted_key", concat(col("key"), lit("_"), floor(rand() * 10).cast("string")))
# 在加盐键上分组,然后聚合
temp_agg = salted_df.groupBy("salted_key").sum("value")
# 移除盐以获得最终结果
final_agg = temp_agg.withColumn("original_key", col("salted_key").substr(1, 1)).groupBy("original_key").sum("sum(value)")
final_agg.show()
|
解释:倾斜会导致执行器饿死;加盐暂时分散热键。自定义分区器(通过RDD)提供精确控制。检查UI任务时间。
好处:均衡执行。
风险:加盐带来额外计算。
5. 优化写入操作
策略:明智地进行分桶/分区,合并文件,使用Delta的Optimize/Z-Order。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WriteOptExample").getOrCreate()
# 示例DataFrame
df = spark.range(1000000).withColumn("category", (spark.range(1000000).id % 10).cast("string"))
# 按列分区以提高查询效率
df.write.mode("overwrite").partitionBy("category").parquet("/tmp/partitioned")
# 对于Delta:写入后优化
df.write.format("delta").mode("overwrite").save("/tmp/delta_table")
spark.sql("OPTIMIZE delta.`/tmp/delta_table` ZORDER BY (id)")
# 写入前合并
df.coalesce(5).write.mode("overwrite").parquet("/tmp/coalesced")
|
解释:写入会导致文件爆炸;合并可以整合。Delta的Z-Order为扫描聚类;
好处:读取更快;Databricks特有但可通过Hive移植。
6. 利用自适应查询执行(AQE)
策略:启用AQE以进行运行时调整,如自动倾斜处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AQEExample").getOrCreate()
# 启用AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# 受益于AQE的示例连接(如果小则自动广播)
large_df = spark.range(1000000)
small_df = spark.range(100)
result = large_df.join(small_df, large_df.id == small_df.id)
result.explain() # 显示自适应计划
result.show()
|
解释:AQE根据后统计信息调整(例如减少分区);好处:无需手动优化;Spark 3+通用。
7. 作业和阶段优化
策略:通过Spark UI洞察进行调整,调整内存/并行度。
1
2
3
4
5
6
7
8
9
10
11
12
|
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TuneExample") \
.config("spark.executor.memory", "4g") \
.config("spark.sql.shuffle.partitions", "100") \
.getOrCreate()
# 示例作业
df = spark.range(10000000).groupBy("id").count()
df.write.mode("overwrite").parquet("/tmp/tuned")
# 运行后,检查UI的GC/阶段;迭代调整配置
|
解释:UI标记GC(>10%不好);调整shuffle.partitions以匹配核心数。
好处:资源效率;通用。
8. 使用广播哈希连接(BHJ)优化连接
策略:广播小端以消除洗牌。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("BHJExample").getOrCreate()
# 大和小DataFrame
large_df = spark.range(1000000).toDF("key")
small_df = spark.range(100).toDF("key")
# 广播小的以进行BHJ
result = large_df.join(broadcast(small_df), "key")
result.explain() # 显示BroadcastHashJoin
result.show()
|
解释:BHJ将小DF复制到节点;调整spark.sql.autoBroadcastJoinThreshold。
好处:无洗牌。
风险:广播内存占用。
杂项策略
这些额外技术补充了核心集,为特定场景提供针对性增强。虽然不总是基础性的,但它们可以在代码效率、平台特定加速和基础设施调优方面提供显著提升。
优先使用窄转换
策略:优先使用像filter()和select()这样的窄转换,而不是像groupBy()或join()这样的宽转换。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("NarrowExample").getOrCreate()
# 示例大DataFrame
df = spark.range(1000000).withColumn("value", spark.range(1000000).id * 2)
# 窄:先过滤和选择(无洗牌)
narrow_df = df.filter(col("value") > 500000).select("id")
# 然后宽:GroupBy(仅在减少的数据上洗牌)
result = narrow_df.groupBy("id").count()
result.show()
|
解释:窄操作按分区处理,避免洗牌;尽早链接它们以剪枝。好处:较低开销
风险:过度链接增加代码复杂性。
代码级最佳实践
策略:使用select()明确指定列,避免使用*。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CodeBestExample").getOrCreate()
# 示例宽表
df = spark.createDataFrame([(1, "A", 100, "extra1"), (2, "B", 200, "extra2")], ["id", "category", "value", "unused"])
# 不好:选择所有(*)
all_df = df.select("*") # 加载不必要的列
# 好:选择特定的
slim_df = df.select("id", "category", "value")
# 处理:使用更少内存
result = slim_df.filter(col("value") > 150)
result.show()
|
解释:*加载额外内容,增加内存;select()修剪。
好处:更精简的管道;风险:在演进模式中缺失列。
利用Databricks Runtime功能
策略:利用Delta Cache和Photon进行I/O和计算加速。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RuntimeFeaturesExample").getOrCreate()
# 假设启用了Photon的Databricks Runtime
spark.conf.set("spark.databricks.delta.cache.enabled", "true") # Delta Cache
# 读取Delta(自动缓存)
df = spark.read.format("delta").load("/tmp/delta_table")
# 查询:受益于缓存/Photon向量化
result = df.filter(col("value") > 100).groupBy("category").sum("value")
result.show()
|
解释:Delta Cache在本地预加载;Photon向量化。
好处:延迟下降;仅限Databricks,其他地方用手动缓存模拟。
为大数据优化集群配置
策略:选择实例类型并启用自动缩放。例如,AWS EMR等。
1
2
3
4
5
6
7
8
9
10
11
12
|
# 这是通过Databricks UI/CLI配置的,不是代码,但示例作业配置:
# 在Databricks笔记本或作业设置中:
# 集群:启用自动缩放,最小2-最大10个工作节点
# 实例:i3.xlarge(存储优化)或r5.2xlarge(内存优化)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ClusterOptExample").getOrCreate()
# 运行繁重作业:自动缩放处理负载
df = spark.range(100000000).groupBy("id").count() # 自动扩展
df.show()
|
解释:匹配实例到工作负载(例如内存用于连接);自动缩放适应。
好处:节省成本;Databricks特有,但可以应用于AWS EMR等,在集群引导期间使用实例配置JSON的自动和管理缩放。
对Databricks和其他Spark环境的适用性
通用:其中一些方法适用于EMR、Synapse和其他Spark平台,如分区、缓存、谓词下推、倾斜处理技术、窄转换、编码实践、AQE、作业优化和BHJ。
Databricks特有:使用Delta的写入操作、Runtime中的功能、集群配置(和配置更改)都是Databricks原生的(但可以使用像Iceberg这样的替代品或一些手动调优来利用)。
结论
在本文中,我试图演示八个核心策略,这些策略构成了解决洗牌、内存和I/O瓶颈以及提高效率的基础。杂项部分描述了一些微妙的改进方法、平台特定改进和基础设施调优。您现在在工作负载中具有灵活性和可变性,包括即席查询和生产ETL管道。总的来说,这12种策略(核心和杂项)促进了一种整体思考优化的方式。首先在Spark UI中进行分析,使用此处提供的代码片段自适应地实施增量改进,并进行详尽基准测试以展示改进(使用每个指标)。通过在Databricks中应用这些技术,您不仅将降低成本和延迟,还将构建可扩展、有弹性的大数据工程解决方案。
随着Spark开发(2025年趋势)的不断扩展,请重新访问此参考和新工具,如MLflow,用于实验能力,将瓶颈转化为突破。