5个大多数工程师忽略的关键Databricks性能优化技巧
Databricks性能调优不是猜测游戏,它需要对内部机制有深入理解。在本指南中,我将探讨每个数据工程师都应该应用的六个实用优化技术,以实现更快、成本效益更高的生产环境。
1. Databricks中的UDF优化:性能关键
Databricks现实情况
在Databricks Runtime中,UDF仍然是最大的性能瓶颈之一。传统的Python UDF会绕过催化剂优化器和Photon引擎优化(当启用时)。
性能影响
相对性能很大程度上取决于工作负载:
- 内置函数:完整的催化剂优化,有资格进行Photon加速
- Pandas(Arrow)UDF:使用Apache Arrow进行向量化处理,通常比普通Python UDF快得多(基准测试显示加速比从适度的约1.5倍到显著改进不等,具体取决于工作负载)
- 普通Python UDF:在JVM和Python进程之间具有序列化开销的逐行处理,在生产工作负载中可能显著较慢
Databricks最佳实践
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, udf, col
from pyspark.sql.types import IntegerType, LongType
import pandas as pd
# 不好:标准Python UDF - 绕过Catalyst和Photon优化
@udf(returnType=IntegerType())
def calculate_score(value):
return int(value * 1.5 + 100)
df = df.withColumn("score", calculate_score(col("value")))
# 好:使用内置函数 - 完整的Catalyst和Photon优化
df = df.withColumn("score", (col("value") * 1.5 + 100).cast("int"))
# 更好:需要自定义逻辑时使用Pandas UDF - Arrow优化
@pandas_udf(LongType())
def calculate_complex_score(values: pd.Series) -> pd.Series:
return (values * 1.5 + 100).astype('int64')
df = df.withColumn("score", calculate_complex_score(col("value")))
|
Databricks特定配置
1
2
3
4
|
# 为Pandas UDF启用Arrow(在DBR 7.0+中默认启用)
# 需要在驱动程序和执行器上安装PyArrow库
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")
|
重要说明:Arrow优化需要在驱动程序和执行器节点上都安装Apache Arrow库。Databricks Runtime默认包含此库。
2. 使用AQE在Databricks中调整Shuffle分区
默认问题
Spark使用spark.sql.shuffle.partitions = 200作为默认值,对于生产工作负载来说,这很少是最优的。
此设置影响所有shuffle操作,包括连接、groupBy和聚合。
Databricks的经验法则
实际目标:对于许多工作负载,在shuffle后每个分区大约为128 MB。
这是一个指导原则,不是严格要求。实际的最佳大小取决于您的集群配置和数据特征。
示例场景:
- 10GB shuffle:大约80个分区(10,000 MB/128 MB)
- 1TB shuffle:大约8,000个分区(1,000,000 MB/128 MB)
- 100MB shuffle:大约1个分区(100 MB/128 MB)
Databricks AQE优势
Databricks Runtime包含增强的自适应查询执行(AQE),可根据运行时实际的shuffle数据大小自动优化分区计数。这消除了大部分手动调优负担。
配置
1
2
3
4
5
6
7
8
9
10
11
12
|
# 启用AQE(推荐用于所有Databricks作业)
# 在Spark 3.2+和Databricks Runtime 9.1+中默认启用
spark.conf.set("spark.sql.adaptive.enabled", "true")
# 动态分区合并
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1000")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
# Delta Lake的Databricks优化设置
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
|
实际示例
1
2
3
4
5
6
|
# 之前:200个分区创建200个小文件
df.write.format("delta").save("/mnt/data/table") # 200个文件,每个5MB
# 之后:AQE合并到最优计数
spark.conf.set("spark.sql.adaptive.enabled", "true")
df.write.format("delta").save("/mnt/data/table") # 约10个文件,每个100MB
|
3. Delta Lake的文件大小优化
Databricks中的小文件问题
由于以下原因,Delta Lake性能会因小文件而降低:
- 事务日志中的元数据操作增加
- 更多的云存储API调用(S3/ADLS/GCS)
- 启用Photon时效率降低
- 较慢的OPTIMIZE操作
实际目标文件大小
基于Databricks文档和生产经验[4],这些是实际目标(不是硬性限制):
- 实际最小值:每个文件128MB
- 常见目标范围:每个文件256MB - 1GB
- 避免超过:每个文件2GB(收益递减和潜在内存问题)
注意:Databricks可能在Unity Catalog管理的表和较新的运行时中自动调整这些目标。
检测查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
-- 检查Delta表中的文件大小
DESCRIBE DETAIL delta.'/mnt/data/table';
-- 分析文件分布
SELECT
CASE
WHEN size < 134217728 THEN 'Small (<128MB)'
WHEN size < 1073741824 THEN 'Good (128MB-1GB)'
ELSE 'Large (>1GB)'
END as size_category,
COUNT(*) as file_count
FROM (
SELECT explode(split(location, '/')) as path, size
FROM (DESCRIBE DETAIL delta.'/mnt/data/table')
)
GROUP BY size_category;
|
Databricks中的解决方案
1. 自动优化(推荐用于Databricks)
1
2
3
4
5
6
7
8
9
10
11
|
# 在表级别启用(Databricks特定功能)
spark.sql("""
ALTER TABLE my_table SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# 或为所有写入在会话级别设置
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
|
注意:自动优化在写入期间会产生额外的计算成本,但显著提高了读取性能。
2. 使用Z-ORDER手动OPTIMIZE
1
2
3
4
5
6
7
8
9
|
-- 压缩小文件(在非高峰时段安排)
OPTIMIZE delta.'/mnt/data/events';
-- 使用Z-ORDER对特定列进行查询性能优化
OPTIMIZE delta.'/mnt/data/events'
ZORDER BY (customer_id, event_date);
-- 检查优化影响
DESCRIBE HISTORY delta.'/mnt/data/events';
|
最佳实践:按计划(每日/每周)运行OPTIMIZE,而不是在每次写入后运行,以平衡计算成本。
3. 写入前合并
1
2
3
4
5
6
|
# 基于数据大小计算最优分区
data_size_gb = 50 # 估计或计算实际大小
target_file_size_gb = 0.5 # 500MB文件
num_files = max(1, int(data_size_gb / target_file_size_gb))
df.coalesce(num_files).write.format("delta").mode("overwrite").save("/mnt/data/table")
|
4. Databricks中的广播变量
何时使用
在Apache Spark中,当我们需要在所有执行器之间共享只读数据而不需要在每个任务中发送时,广播变量非常关键。这在Databricks中非常重要,因为驱动程序和执行器之间的网络带宽可能成为瓶颈。
常见用例
- 查找表:小维度表(通常<10MB)
- 配置映射:用于转换的键值对
- ML模型参数:共享模型系数
- 业务规则:映射字典
Databricks最佳实践
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# 不好:每个任务发送大字典
lookup_dict = {"key1": "value1", "key2": "value2"} # 假设5MB字典
def apply_lookup(key):
return lookup_dict.get(key, "unknown")
udf_lookup = udf(apply_lookup, StringType())
df = df.withColumn("mapped_value", udf_lookup(col("key")))
# 字典与每个任务一起序列化 - 1000个任务 = 5GB网络传输
# 好:广播一次,所有执行器共享
lookup_dict = {"key1": "value1", "key2": "value2"}
broadcast_lookup = spark.sparkContext.broadcast(lookup_dict)
def apply_lookup_broadcast(key):
return broadcast_lookup.value.get(key, "unknown")
udf_lookup = udf(apply_lookup_broadcast, StringType())
df = df.withColumn("mapped_value", udf_lookup(col("key")))
# 完成后清理以释放内存
broadcast_lookup.unpersist()
|
实际示例:货币转换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
from pyspark.sql.types import StringType
# 汇率查找(小数据集)
exchange_rates = {
"USD_EUR": 0.85,
"USD_GBP": 0.73,
"USD_JPY": 110.0,
# ... 100个货币对,总共约1KB
}
# 广播到所有执行器
bc_rates = spark.sparkContext.broadcast(exchange_rates)
# 使用Pandas UDF进行转换(更好的性能)
@pandas_udf("double")
def convert_currency(amounts: pd.Series, pairs: pd.Series) -> pd.Series:
rates = bc_rates.value
return amounts * pairs.map(lambda p: rates.get(p, 1.0))
df_converted = df.withColumn(
"amount_usd",
convert_currency(col("amount"), col("currency_pair"))
)
# 清理
bc_rates.unpersist()
|
性能影响
- 无广播:数据大小 × 任务数量 = 总网络传输
- 有广播:数据大小 × 执行器数量 = 总网络传输
对于具有100个执行器上的1000个任务的5MB查找:
- 无广播:5MB × 1000 = 5GB传输
- 有广播:5MB × 100 = 500MB传输(减少10倍)
5. Databricks中的统计信息和基于成本的优化
为什么统计信息很重要
Delta Lake和Databricks使用统计信息通过以下方式优化查询计划:
- 更好的连接策略选择(广播vs shuffle)
- 多路连接的最优连接顺序
- 文件级别的分区修剪
- 基于最小/最大值的数跳过
Delta Lake中的统计信息
Delta Lake自动收集统计信息:
- 最小/最大值:每个文件的每列
- 行计数:每个文件的总行数
- 空值计数:每个文件的每列的空值数
高级:用于CBO的表级统计信息
1
2
3
4
5
6
7
8
9
10
11
12
|
-- 收集全面的表统计信息(基于成本的优化器输入)
ANALYZE TABLE my_table COMPUTE STATISTICS;
-- 收集特定列的列级统计信息
-- 注意:这会扫描表并产生计算成本
ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS customer_id, order_date, amount;
-- 查看表统计信息
DESCRIBE EXTENDED my_table;
-- 查看列统计信息
DESCRIBE EXTENDED my_table customer_id;
|
实际影响
1
2
3
4
5
6
7
8
|
# 无统计信息:Spark的基于成本的优化器进行估计
result = large_table.join(medium_table, "id") # 可能选择次优策略 - 降低性能
# 有统计信息:CBO具有准确基数估计
spark.sql("ANALYZE TABLE large_table COMPUTE STATISTICS FOR COLUMNS id")
spark.sql("ANALYZE TABLE medium_table COMPUTE STATISTICS FOR COLUMNS id")
result = large_table.join(medium_table, "id") # 选择最优策略
|
何时运行ANALYZE
在以下情况下安排ANALYZE TABLE:
- 大量数据加载后(>表大小的20%)
- 重大模式更改后
- 运行关键生产查询前
- 对于频繁更新的表按定期计划(例如每周)
成本考虑:ANALYZE需要全表扫描。在非高峰时段安排,并在成本与查询优化收益之间取得平衡。
6. Databricks中的Photon引擎(可选但强大)
什么是Photon?
Photon是一个用C++编写的Databricks原生向量化查询引擎,可加速SQL和DataFrame操作。它为兼容的工作负载提供了显著的性能改进,但必须显式启用。
如何启用Photon
Photon不是通过笔记本中的spark.conf.set()启用的。而是通过以下方式启用:
选项1:集群UI
- 转到Compute > Create Cluster
- 选中"Use Photon Acceleration"复选框
- 选择兼容的运行时(DBR 9.1 LTS或更高版本)
选项2:Jobs API / Terraform
1
2
3
4
5
|
{
"runtime_engine": "PHOTON",
"spark_version": "11.3.x-photon-scala2.12",
"node_type_id": "i3.xlarge"
}
|
要求和兼容性
- 运行时:Databricks Runtime 9.1 LTS或更高版本
- 实例类型:大多数计算优化和内存优化实例
- 工作负载类型:最适合SQL/DataFrame操作、聚合、连接
- 未优化:RDD操作、Python UDF、有状态操作的流处理
成本考虑
启用Photon的集群可能具有不同的DBU定价。始终测试性能改进与成本增加,以验证工作负载的投资回报率。
性能基准测试
观察到的改进因工作负载而异:
- 聚合:通常快2-5倍
- 大表连接:可以看到2-4倍的改进
- 复杂SQL查询:可变,对您的特定查询进行基准测试
始终进行A/B测试:在您的实际数据上运行相同作业,使用和不使用Photon,以衡量影响。
结论
Databricks生产清单
新作业的基本配置
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# 启用所有AQE功能(Spark 3.2+ / DBR 9.1+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Delta Lake写入优化(Databricks特定)
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
# Pandas UDF的Arrow(需要PyArrow库)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# 注意:Photon必须在集群级别启用,而不是通过spark.conf.set()
|
性能监控查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
-- 监控Delta表健康状况
SELECT
'table_name' as table_name,
num_files,
size_in_bytes / 1024 / 1024 / 1024 as size_gb,
num_files / GREATEST(size_in_bytes / 1024 / 1024 / 1024, 1) as files_per_gb,
CASE
WHEN num_files / GREATEST(size_in_bytes / 1024 / 1024 / 1024, 1) > 10
THEN 'ALERT: Too many small files'
ELSE 'OK'
END as health_status
FROM (
SELECT
COUNT(*) as num_files,
SUM(size) as size_in_bytes
FROM (DESCRIBE DETAIL delta.'/mnt/data/your_table')
);
|
环境基准测试模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
import time
def benchmark_operation(df, operation_name, operation_func):
"""
简单的基准测试函数,用于测量Spark操作性能
"""
# 预热
operation_func(df).count()
# 实际基准测试
start_time = time.time()
result = operation_func(df).count()
end_time = time.time()
duration = end_time - start_time
print(f"{operation_name}: {duration:.2f} seconds")
return duration
# 示例用法比较内置函数与UDF
from pyspark.sql.functions import col, year as spark_year
# 创建示例数据
df = spark.range(0, 10000000).withColumn("date", F.current_date())
# 测试1:内置函数
def builtin_test(df):
return df.withColumn("year", spark_year(col("date")))
# 测试2:Python UDF
@udf(returnType=IntegerType())
def extract_year_udf(date_val):
return date_val.year if date_val else None
def udf_test(df):
return df.withColumn("year", extract_year_udf(col("date")))
# 测试3:Pandas UDF
@pandas_udf(IntegerType())
def extract_year_pandas(dates: pd.Series) -> pd.Series:
return dates.dt.year
def pandas_udf_test(df):
return df.withColumn("year", extract_year_pandas(col("date")))
# 运行基准测试
print("Performance Comparison on 10M rows:")
builtin_time = benchmark_operation(df, "Built-in function", builtin_test)
pandas_time = benchmark_operation(df, "Pandas UDF", pandas_udf_test)
python_time = benchmark_operation(df, "Python UDF", udf_test)
print(f"\nSpeedup ratios:")
print(f"Pandas UDF vs Python UDF: {python_time/pandas_time:.2f}x")
print(f"Built-in vs Python UDF: {python_time/builtin_time:.2f}x")
|
总结:五个关键缺失策略
这5个策略解决了重要的性能差距:
- UDF优化 - 理解性能层次结构(内置 > Pandas UDF > Python UDF)并在具有自定义逻辑的工作负载中使用Arrow优化的UDF可以产生显著的性能改进。
- Shuffle分区调优 - 利用自适应查询执行(AQE)根据运行时实际的shuffle数据大小自动优化分区计数,消除了默认200个分区设置的猜测工作。以每个分区大约128MB作为实际指导原则。
- 文件大小优化 - Delta Lake的自动优化和手动OPTIMIZE命令解决了小文件问题,该问题会降低读取和写入性能。对于大多数工作负载,目标为256MB-1GB文件。
- 广播变量 - 在执行器之间高效共享只读查找数据,在许多生产场景中将网络传输从千兆字节减少到兆字节。
- 统计信息和CBO - 运行ANALYZE TABLE为基于成本的优化器提供准确的基数估计,从而获得更好的连接策略和查询计划。
版本兼容性说明
- Databricks运行时:11.0 LTS或更高(推荐:13.3 LTS)
- Apache Spark:3.3或更高
- Delta Lake:2.0或更高
性能测试最佳实践
始终在您的特定环境中使用您的实际数据模式对优化进行基准测试。性能改进因以下因素而异:
- 集群配置(实例类型、集群大小)
- 数据特征(大小、分布、模式)
- 查询模式和复杂性
- 并发级别
快速参考命令
启用所有推荐设置
1
2
3
4
5
6
7
|
# 复制粘贴生产作业的起始配置
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
|
常见诊断查询
1
2
3
4
5
6
7
8
9
10
11
|
-- 检查表文件计数和大小
DESCRIBE DETAIL delta.'/mnt/data/table_name';
-- 查看表优化历史
DESCRIBE HISTORY delta.'/mnt/data/table_name';
-- 检查是否收集了统计信息
DESCRIBE EXTENDED table_name column_name;
-- 查看当前Spark配置
SET -v;
|
优化维护计划
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
# 频繁更新的表的每周OPTIMIZE
spark.sql("""
OPTIMIZE delta.'/mnt/data/high_traffic_table'
ZORDER BY (user_id, event_date)
""")
# 维度表的每月ANALYZE
spark.sql("""
ANALYZE TABLE dimension_table
COMPUTE STATISTICS FOR COLUMNS id, category, region
""")
# 清理旧版本(7天保留期后)
spark.sql("""
VACUUM delta.'/mnt/data/table_name' RETAIN 168 HOURS
""")
|