Spark RDD 编程全指南:从创建到算子实战
RDD(弹性分布式数据集)是 Spark 分布式计算的核心抽象,其编程模型基于转换算子(Transformations) 和行动算子(Actions) 实现数据处理。本文系统讲解 RDD 的创建方式、分区控制及核心算子的使用场景与实战示例,帮助开发者掌握 RDD 编程的核心技巧。
RDD 的创建方式
RDD 的创建是 Spark 编程的第一步,根据数据源不同,可分为从集合(内存)创建和从外部存储(文件)创建两大类。
1. 从集合(内存)创建 RDD
适用于本地测试或小型数据集,通过 SparkContext 的 parallelize 或 makeRDD 方法将内存集合转换为 RDD。
核心方法
parallelize(seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]makeRDD(seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T](内部调用parallelize)
示例代码
1 | import org.apache.spark.{SparkConf, SparkContext} |
分区控制
numSlices 参数指定分区数,默认值为 Spark 配置的 spark.default.parallelism(未配置时为 CPU 核心数)。分区数决定并行计算的 Task 数量:
1 | // 创建 2 个分区的 RDD |
2. 从外部存储创建 RDD
适用于大规模数据处理,支持读取本地文件、HDFS、Hive、数据库等外部数据源。
核心方法
textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]:按行读取文本文件,返回每行内容的 RDD。wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]:按文件读取,返回(文件路径,文件内容)键值对 RDD。
示例代码
1 | // 读取本地文件(需用 file:// 前缀) |
分区特性
textFile的分区数默认与文件块数一致(HDFS 默认 128MB / 块),minPartitions可指定最小分区数;wholeTextFiles的分区数与文件数量相关,适合小文件批量处理。
RDD 转换算子(Transformations)
转换算子用于对 RDD 进行数据处理和转换,返回新的 RDD。转换算子是惰性计算的,仅在行动算子触发时才执行。
1. 单值类型转换算子
针对单个 RDD 进行转换,不涉及其他 RDD 的交互。
(1)map(f: T => U):元素映射
对 RDD 中的每个元素应用函数 f,返回新元素的 RDD。
示例:将整数翻倍
1 | val rdd = sc.makeRDD(List(1, 2, 3, 4)) |
(2)mapPartitions(f: Iterator[T] => Iterator[U]):分区映射
对每个分区的迭代器应用函数 f,返回新迭代器的 RDD。适合分区内批量处理(如数据库连接复用)。
示例:分区内元素求和
1 | val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) |
与 map 的区别:
map逐条处理元素,mapPartitions按分区批量处理,减少函数调用开销;mapPartitions可能因分区数据过大导致内存溢出(OOM),需谨慎使用。
(3)mapPartitionsWithIndex(f: (Int, Iterator[T]) => Iterator[U]):带分区索引的映射
在 mapPartitions 基础上增加分区索引参数,可针对不同分区执行差异化逻辑。
示例:标记元素所属分区
1 | val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) |
(4)flatMap(f: T => TraversableOnce[U]):扁平化映射
先映射后扁平化,将元素转换为集合后拆分为单个元素。
示例:拆分句子为单词
1 | val rdd = sc.makeRDD(List("Hello Spark", "Hello World")) |
(5)filter(f: T => Boolean):过滤
保留满足条件(f 返回 true)的元素。
示例:筛选偶数
1 | val rdd = sc.makeRDD(List(1, 2, 3, 4)) |
(6)groupBy(f: T => K):分组
按函数 f 的返回值(Key)对元素分组,返回 (K, Iterable[T]) 类型的 RDD。
示例:按奇偶分组
1 | val rdd = sc.makeRDD(List(1, 2, 3, 4)) |
(7)distinct():去重
去除 RDD 中的重复元素(内部通过 map(x => (x, null)).reduceByKey((_, _) => _).keys 实现)。
示例:去除重复值
1 | val rdd = sc.makeRDD(List(1, 2, 2, 3, 3, 3)) |
(8)coalesce(numPartitions: Int, shuffle: Boolean = false) 与 repartition(numPartitions: Int):调整分区
coalesce:默认不触发 Shuffle,适合减少分区(合并小分区),避免数据倾斜;repartition:内部调用coalesce(numPartitions, shuffle = true),触发 Shuffle,适合增加或均衡分区。
示例:合并分区
1 | val rdd = sc.makeRDD(1 to 6, 3) // 3 个分区 |
2. 双值类型转换算子
对两个 RDD 进行联合操作,如交集、并集、差集等。
(1)union(other: RDD[T]):并集
合并两个 RDD 的元素(不去重)。
1 | val rdd1 = sc.makeRDD(List(1, 2, 3)) |
(2)intersection(other: RDD[T]):交集
保留两个 RDD 共有的元素(去重)。
1 | val intersectionRDD = rdd1.intersection(rdd2) |
(3)subtract(other: RDD[T]):差集
保留当前 RDD 有而 other RDD 没有的元素。
1 | val subtractRDD = rdd1.subtract(rdd2) |
(4)zip(other: RDD[U]):拉链
将两个 RDD 的元素按位置配对为 (T, U),要求两个 RDD 分区数和元素数均相同。
1 | val rdd1 = sc.makeRDD(List(1, 2, 3)) |
3. 键值类型转换算子
针对 (K, V) 类型的 RDD(需通过隐式转换为 PairRDDFunctions),提供分组、聚合、连接等操作。
(1)reduceByKey(func: (V, V) => V):按 Key 聚合
对相同 Key 的 Value 应用聚合函数(如求和、求最大值),支持 Map 端预聚合(减少 Shuffle 数据量)。
示例:按 Key 求和
1 | val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4))) |
(2)groupByKey():按 Key 分组
对相同 Key 的 Value 进行分组,返回 (K, Iterable[V])(无预聚合,Shuffle 数据量大,性能低于 reduceByKey)。
示例:按 Key 分组
1 | val groupRDD = rdd.groupByKey() |
(3)aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U):自定义聚合
支持分区内和分区间使用不同的聚合逻辑,zeroValue 为初始值。
示例:按 Key 求分区内最大值、分区间求和
1 | val rdd = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 2), ("b", 4)), 2) |
(4)join(other: RDD[(K, W)]):按 Key 连接
类似数据库内连接,返回 (K, (V, W)),仅保留两边都存在的 Key。
示例:两 RDD 连接
1 | val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2))) |
RDD 行动算子(Actions)
行动算子触发 Spark 作业的执行(转换算子仅记录逻辑计划),返回结果或写入外部存储。
1. collect(): Array[T]:收集所有元素
将 RDD 所有分区的元素拉取到 Driver 节点,返回数组(仅用于小数据集,避免 OOM)。
1 | val rdd = sc.makeRDD(List(1, 2, 3)) |
2. count(): Long:计数
返回 RDD 元素的总数量。
1 | val count = rdd.count() // 输出:3 |
3. take(n: Int): Array[T]:取前 n 个元素
返回 RDD 的前 n 个元素(按分区顺序获取,非全局排序)。
1 | val top2 = rdd.take(2) // 输出:Array(1, 2) |
4. reduce(func: (T, T) => T):全局聚合
对 RDD 所有元素应用聚合函数(先分区内聚合,再分区间聚合)。
1 | val sum = rdd.reduce(_ + _) // 输出:6(1+2+3) |
5. foreach(f: T => Unit):遍历元素
对每个元素应用函数 f(分布式执行,无返回值,常用于输出结果到外部系统)。
1 | // 打印每个元素(分布式执行,Driver 端看不到输出) |
6. saveAsTextFile(path: String):保存为文本文件
将 RDD 元素按行写入外部存储(本地路径或 HDFS),每个分区生成一个文件。
1 | rdd.saveAsTextFile("hdfs:///user/output/result") // 保存到 HDFS |
核心算子对比与最佳实践
| 场景 | 推荐算子 | 避免使用 | 原因 |
|---|---|---|---|
| 聚合计算 | reduceByKey、aggregateByKey |
groupByKey |
groupByKey 无预聚合,Shuffle 数据量大,性能低;reduceByKey 在 Map 端预聚合,减少网络传输 |
| 元素转换 | map(逐条处理) |
mapPartitions(内存有限时) |
mapPartitions 按分区批量处理,性能高但可能因数据量大导致 OOM;map 内存友好但函数调用开销高 |
| 分区调整 | coalesce(减少分区)、repartition(增加 / 均衡分区) |
频繁调整分区 | 调整分区可能触发 Shuffle,增加额外开销,应在数据加载时合理设置初始分区 |
| 去重 | distinct(小数据集) |
distinct(超大规模数据集) |
distinct 依赖 Shuffle,超大规模数据建议通过业务逻辑提前去重(如 SQL distinct) |
| 连接操作 | join(内连接)、leftOuterJoin(左外连接) |
大表 join 大表 |
大表连接易触发数据倾斜,建议通过广播小表(broadcastJoin)或加盐哈希优化 |
最佳实践总结
- 优先使用预聚合算子:如
reduceByKey替代groupByKey,减少 Shuffle 数据量。 - 控制分区数量:每个分区数据量建议为 128MB~256MB,避免小分区(调度开销大)或大分区(OOM 风险)。
- 避免全量收集数据:
collect()仅用于调试,生产环境用take(n)或直接写入外部存储。 - 数据倾斜处理:对热点 Key 采用加盐、拆分单独处理等策略,避免单个 Task 耗时过长。
- 合理使用缓存:对重复使用的 RDD 调用
persist或cache,选择序列化存储(如MEMORY_AND_DISK_SER)节省内存。
RDD 编程常见问题与解决方案
1. 数据倾斜(某 Task 执行时间过长)
- 现象:大部分 Task 快速完成,少数 Task 耗时远超平均(如 10 分钟 vs 1 分钟)。
- 原因:某 Key 对应数据量过大(如 90% 数据集中在一个 Key)。
- 解决方案:
- 加盐哈希:对倾斜 Key 添加随机前缀,分散到多个分区计算后合并(见前文
reduceByKey示例); - 过滤倾斜 Key:单独处理倾斜 Key(如拆分逻辑,避免参与全局 Shuffle);
- 使用
aggregateByKey:通过预聚合减少倾斜 Key 的数据量。
- 加盐哈希:对倾斜 Key 添加随机前缀,分散到多个分区计算后合并(见前文
2. 内存溢出(OOM)
- 现象:Executor 或 Driver 日志报
java.lang.OutOfMemoryError。 - 常见原因:
- 单个分区数据量过大(如 1GB 以上);
collect()拉取全量大数据到 Driver;- 缓存未序列化的大 RDD。
- 解决方案:
- 增加分区数(
repartition),减小单个分区数据量; - 避免
collect(),改用saveAsTextFile直接输出; - 缓存 RDD 时使用序列化存储(
MEMORY_ONLY_SER或MEMORY_AND_DISK_SER)。
- 增加分区数(
3. Shuffle 性能差
- 现象:Shuffle 阶段耗时过长,磁盘 I/O 或网络传输瓶颈。
- 解决方案:
- 调整并行度(
spark.default.parallelism设为总核数的 2~3 倍); - 启用 Shuffle 压缩(
spark.shuffle.compress=true); - 选择合适的分区器(如
RangePartitioner优化排序场景)。
- 调整并行度(
4. 任务本地化级别低
- 现象:Task 日志显示
NODE_LOCAL或ANY本地化级别,网络传输频繁。 - 原因:数据与计算任务不在同一节点。
- 解决方案:
- 读取数据时尽量使用分布式存储(如 HDFS),利用数据本地化;
- 调整本地化等待时间(
spark.locality.wait=3s),允许任务等待数据所在节点资源。
RDD 编程实战案例:WordCount 优化
以经典的 WordCount 任务为例,综合运用 RDD 算子与优化技巧:
需求
统计文本文件中单词出现次数,处理大文件(10GB)并避免数据倾斜。
优化方案
- 合理分区:按文件大小设置分区数(10GB / 256MB ≈ 40 分区);
- 预聚合减少 Shuffle:使用
reduceByKey替代groupByKey; - 处理热点单词:对高频单词(如 “the”“a”)加盐拆分,避免单个 Task 负载过高。
代码实现
1 | import org.apache.spark.{SparkConf, SparkContext} |