0%

spark优化

spark性能优化全指南:从算法到资源的全方位调优

Spark 作为分布式计算引擎,其性能受算法设计、资源配置、数据分布等多因素影响。不合理的实现或配置可能导致任务运行缓慢、资源浪费甚至失败。本文从算法优化、并行度调整、缓存策略、内存管理、数据倾斜处理等维度,系统讲解 Spark 性能优化的核心方法,帮助开发者将任务效率提升数倍。

算法性能优化:选择高效的算子与数据处理方式

Spark 算子的选择直接影响任务的 Shuffle 数据量和计算复杂度,合理使用算子可减少不必要的开销。

避免低效算子,优先选择聚合类算子

低效算子 高效替代方案 优化原理
groupByKey reduceByKey / aggregateByKey groupByKey 仅分组不聚合,会将所有数据 Shuffle 到 Reduce 端;reduceByKey 在 Map 端先预聚合,减少 Shuffle 数据量
flatMap + groupByKey combineByKey combineByKey 支持自定义 Map 端聚合逻辑,灵活控制中间结果
cartesian(笛卡尔积) 避免使用,或通过过滤提前减少数据量 笛卡尔积会产生 N*M 条数据,极易导致 OOM,仅在小数据集场景使用

使用分区级算子减少任务启动开销

  • mapPartitions 替代 map
    map 对每条数据执行一次函数调用,mapPartitions 对每个分区执行一次函数调用(处理整个分区数据),减少函数初始化开销(如数据库连接创建)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 低效:每条数据创建一次连接  
    rdd.map(record => {
    val conn = getDBConnection() // 重复创建连接
    process(record, conn)
    })

    // 高效:每个分区创建一次连接
    rdd.mapPartitions(iterator => {
    val conn = getDBConnection() // 分区内共享连接
    iterator.map(record => process(record, conn))
    })
  • foreachPartition 替代 foreach:类似 mapPartitions,适合批量输出场景(如批量写入数据库)。

数据倾斜的算法级优化

数据倾斜(某 Key 对应数据量远超其他 Key)会导致单个 Task 耗时过长,可通过加盐散列分散倾斜 Key:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 示例:对倾斜 Key 加盐分散  
val skewedKeys = List("hot_key1", "hot_key2") // 已知的倾斜 Key

// 步骤 1:对倾斜 Key 添加随机前缀(0-9),非倾斜 Key 保持不变
val saltedRDD = rdd.map { case (key, value) =>
if (skewedKeys.contains(key)) {
val salt = (new scala.util.Random).nextInt(10)
(s"$salt|$key", value) // 加盐后 Key:"1|hot_key1"
} else {
(key, value)
}
}

// 步骤 2:聚合后去除前缀
val result = saltedRDD
.reduceByKey(_ + _) // 分散到 10 个 Task 处理
.map { case (key, sum) =>
if (key.contains("|")) (key.split("\\|")(1), sum) // 去除前缀
else (key, sum)
}
.reduceByKey(_ + _) // 二次聚合合并结果

并行度优化:合理设置任务数量

并行度(Task 数量)决定了 Spark 任务的并行执行能力,并行度过低会导致资源浪费,过高则增加任务调度开销。

并行度的核心参数

参数 作用 默认值 调优建议
spark.default.parallelism Shuffle 操作的默认并行度(如 reduceByKeygroupByKey 集群总核数(total cores 设为 集群总核数的 2~3 倍(如 100 核集群设为 200~300)
spark.sql.shuffle.partitions Spark SQL 中 Shuffle 操作的并行度 200 根据数据量调整,每分区数据量建议为 128MB~256MB

手动调整 RDD 分区数

通过算子显式指定分区数,避免默认分区不合理:

1
2
3
4
5
6
7
8
9
// 创建 RDD 时指定分区数  
val rdd = sc.textFile("hdfs:///data.txt", minPartitions = 100) // 至少 100 个分区

// Shuffle 时指定分区数
val shuffledRDD = rdd.reduceByKey(_ + _, numPartitions = 200) // 显式设置 200 个分区

// 重分区(增加或减少分区)
val repartitionedRDD = rdd.repartition(150) // 会触发 Shuffle
val coalescedRDD = rdd.coalesce(50) // 减少分区,不触发 Shuffle(仅合并相邻分区)

并行度优化原则

  • 目标:每个 Task 处理的数据量控制在 128MB~256MB,避免过小(调度开销大)或过大(OOM 风险);
  • 数据量估算:总数据量 ÷ 目标分区数据量 = 建议分区数(如 100GB 数据 → 100GB / 256MB ≈ 400 分区);
  • 资源匹配:并行度不宜超过集群总核数的 3 倍(如 100 核集群最大并行度 300)。

缓存优化:减少重复计算

Spark 中重复使用的 RDD 若不缓存,会被反复计算(尤其复杂转换链),缓存可将中间结果存储在内存或磁盘,显著减少计算时间。

缓存算子与存储级别

算子 作用 常用存储级别
cache() 简写,默认存储级别为 MEMORY_ONLY 适用于小数据集,全内存存储
persist(StorageLevel) 自定义存储级别 灵活选择内存 + 磁盘、序列化等策略

存储级别选择策略

存储级别 描述 适用场景
MEMORY_ONLY 纯内存,不序列化 数据集小,内存充足,优先选择
MEMORY_AND_DISK 内存不足时写入磁盘 数据集较大,内存不足以完全存储
MEMORY_ONLY_SER 内存存储,序列化数据 内存紧张,通过序列化减少内存占用(CPU 换内存)
MEMORY_AND_DISK_SER 序列化存储,内存不足写磁盘 大数据集,内存有限,容忍一定 CPU 开销

缓存使用最佳实践

  • 缓存重复使用的 RDD:如多次 Join 或聚合的中间结果;

  • 避免缓存临时数据:仅使用一次的 RDD 无需缓存,浪费资源;

  • 及时释放缓存:不再使用的缓存通过unpersist()释放:

    1
    2
    3
    val cachedRDD = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)  
    // 使用缓存 RDD 进行计算...
    cachedRDD.unpersist() // 释放资源
  • 序列化缓存:大数据集建议使用序列化存储(如 MEMORY_ONLY_SER),减少内存占用(序列化后数据量可减少 50%~70%)。

内存优化:高效利用内存资源

Spark 内存管理直接影响任务稳定性和执行效率,不合理的内存配置会导致 OOM 或 GC 频繁。

内存分配核心参数

参数 作用 调优建议
spark.executor.memory 每个 Executor 的总内存 根据任务类型调整:批处理设为 8~16GB,机器学习设为 16~32GB
spark.driver.memory Driver 内存 小数据集设为 2~4GB,大结果集 collect() 设为 8~16GB
spark.memory.fraction 用于执行和存储的内存占比(总内存 - 预留内存) 默认 0.6,内存密集型任务可设为 0.7~0.8
spark.memory.storageFraction 存储内存(缓存)占执行 / 存储总内存的比例 默认 0.5,缓存密集型任务可设为 0.6~0.7

减少内存占用的技巧

  • 使用高效数据结构

    • 避免 Java 包装类型(如 Integer 改用 int),使用原始类型数组;
    • 小数据集用 Array 替代 List,减少对象引用开销;
    • 复杂结构用 Tuple 或自定义 case class,避免嵌套 Map
  • 广播大变量

    对于只读大对象(如字典表、模型参数),使用广播变量(Broadcast)将对象仅发送到每个 Executor 一次,而非每个 Task 一次:

1
2
3
4
5
6
7
val largeMap = Map(...)  // 大对象(如 1GB)  
val broadcastMap = sc.broadcast(largeMap) // 广播变量

rdd.map(record => {
val value = broadcastMap.value.get(record.key) // Executor 共享一个副本
// 处理逻辑...
})

GC 优化:减少垃圾回收停顿

  • 使用 G1 垃圾收集器:适合大内存场景,减少 Full GC 时间:

    1
    2
    # 在 spark-env.sh 中添加  
    export SPARK_JAVA_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=200"
  • 调整新生代与老生代比例:新生代设为总内存的 1/3~1/2,减少对象晋升到老生代的频率;

  • 避免内存碎片:序列化存储 RDD(_SER 存储级别),减少小对象数量。

资源配置优化:合理分配集群资源

Spark 任务的资源配置(Executor 数量、内存、核数)直接影响并行能力和稳定性,资源不足会导致任务排队,资源过剩则造成浪费。

核心资源参数

参数 作用 调优建议
--num-executors Executor 数量 集群节点数 × 2~3(如 10 节点集群设为 20~30)
--executor-cores 每个 Executor 的核数 2~5 核(核数过多会导致 GC 压力增大)
--executor-memory 每个 Executor 的内存 核数 × 4~8GB(如 4 核 Executor 设为 16~32GB)
--driver-memory Driver 内存 2~16GB(取决于 collect() 结果大小和广播变量大小)

资源配置公式(示例)

假设集群有 5 个节点,每个节点 16 核、64GB 内存:

  • Executor 数量:5 节点 × 3 = 15 个
  • 每个 Executor 核数:4 核(15×4=60 核,充分利用 5×16=80 核,预留部分资源)
  • 每个 Executor 内存:4 核 × 8GB = 32GB(15×32=480GB,5×64=320GB?此处需按实际节点内存调整,避免超配)

YARN 模式资源配置示例

通过 spark-submit 提交任务时指定资源:

1
2
3
4
5
6
7
8
9
10
spark-submit \  
--class com.example.MyJob \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \ # 10 个 Executor
--executor-cores 4 \ # 每个 Executor 4 核
--executor-memory 16g \ # 每个 Executor 16GB 内存
--driver-memory 4g \ # Driver 4GB 内存
--conf spark.default.parallelism=80 \ # 并行度 = 10×4×2
myjob.jar

数据倾斜处理:解决任务长尾问题

数据倾斜是 Spark 任务性能的常见瓶颈,表现为大部分 Task 快速完成,少数 Task 耗时过长(进度卡在 99%)。

1. 数据倾斜的识别

  • Web UI 观察:在 Spark UI 的 Stages 页面查看每个 Task 的执行时间,若某 Task 时间远超其他(如 10 分钟 vs 1 分钟),则存在倾斜;
  • Shuffle 数据量分析:查看 Stage 详情的 Shuffle Read 指标,若某 Task 读取数据量远超其他(如 10GB vs 100MB),则为倾斜 Key。

2. 数据倾斜的解决方案

倾斜场景 解决方案
Join 倾斜(某 Key 关联数据过多) 小表广播(MapJoin)、大表加盐、过滤倾斜 Key 单独处理
GroupBy 倾斜(某 Key 聚合数据过多) Map 端预聚合、加盐后二次聚合、拆分倾斜 Key 单独聚合
读写倾斜(某分区文件过大) 重分区(repartition)、小文件合并(coalesce

3. 倾斜 Key 单独处理示例

对于已知的倾斜 Key,可拆分处理逻辑,避免影响整体任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 假设 "hot_key" 是倾斜 Key  
val skewedKey = "hot_key"

// 步骤 1:拆分 RDD 为倾斜部分和非倾斜部分
val skewedRDD = rdd.filter(_._1 == skewedKey)
val nonSkewedRDD = rdd.filter(_._1 != skewedKey)

// 步骤 2:非倾斜部分正常聚合
val nonSkewedResult = nonSkewedRDD.reduceByKey(_ + _)

// 步骤 3:倾斜部分单独处理(加盐后聚合)
val skewedResult = skewedRDD
.map { case (k, v) => (s"${k}_${Random.nextInt(10)}", v) } // 加盐
.reduceByKey(_ + _) // 分散聚合
.map { case (k, v) => (k.split("_")(0), v) } // 去盐
.reduceByKey(_ + _) // 合并结果

// 步骤 4:合并两部分结果
val finalResult = nonSkewedResult.union(skewedResult).reduceByKey(_ + _)

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10