0%

RDD编程

Spark RDD 编程全指南:从创建到算子实战

RDD(弹性分布式数据集)是 Spark 分布式计算的核心抽象,其编程模型基于转换算子(Transformations)行动算子(Actions) 实现数据处理。本文系统讲解 RDD 的创建方式、分区控制及核心算子的使用场景与实战示例,帮助开发者掌握 RDD 编程的核心技巧。

RDD 的创建方式

RDD 的创建是 Spark 编程的第一步,根据数据源不同,可分为从集合(内存)创建从外部存储(文件)创建两大类。

1. 从集合(内存)创建 RDD

适用于本地测试或小型数据集,通过 SparkContext 的 parallelizemakeRDD 方法将内存集合转换为 RDD。

核心方法
  • parallelize(seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
  • makeRDD(seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T](内部调用 parallelize
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.{SparkConf, SparkContext}  

object RDDCreate {
def main(args: Array[String]): Unit = {
// 初始化 Spark 配置与上下文
val conf = new SparkConf().setMaster("local[*]").setAppName("RDDCreate")
val sc = new SparkContext(conf)

// 定义内存集合
val list = List(1, 2, 3, 4, 5)

// 方式 1:parallelize 创建 RDD
val rdd1 = sc.parallelize(list)
// 方式 2:makeRDD 创建 RDD(推荐,语法更简洁)
val rdd2 = sc.makeRDD(list)

// 打印 RDD 内容(行动算子触发计算)
rdd2.collect().foreach(println)

// 关闭上下文
sc.stop()
}
}
分区控制

numSlices 参数指定分区数,默认值为 Spark 配置的 spark.default.parallelism(未配置时为 CPU 核心数)。分区数决定并行计算的 Task 数量:

1
2
3
// 创建 2 个分区的 RDD  
val rdd = sc.makeRDD(list, 2)
println(s"分区数:${rdd.getNumPartitions}") // 输出:分区数:2

2. 从外部存储创建 RDD

适用于大规模数据处理,支持读取本地文件、HDFS、Hive、数据库等外部数据源。

核心方法
  • textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]:按行读取文本文件,返回每行内容的 RDD。
  • wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]:按文件读取,返回(文件路径,文件内容)键值对 RDD。
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
// 读取本地文件(需用 file:// 前缀)  
val lineRDD = sc.textFile("file:///path/to/local/file.txt")
// 读取 HDFS 文件
val hdfsRDD = sc.textFile("hdfs:///user/data/input.txt")
// 按文件读取
val fileRDD = sc.wholeTextFiles("file:///path/to/directory")

// 打印 textFile 结果(每行内容)
lineRDD.collect().foreach(println)
// 打印 wholeTextFiles 结果(文件路径 -> 内容)
fileRDD.collect().foreach { case (path, content) =>
println(s"文件:$path,内容:$content")
}
分区特性
  • textFile 的分区数默认与文件块数一致(HDFS 默认 128MB / 块),minPartitions 可指定最小分区数;
  • wholeTextFiles 的分区数与文件数量相关,适合小文件批量处理。

RDD 转换算子(Transformations)

转换算子用于对 RDD 进行数据处理和转换,返回新的 RDD。转换算子是惰性计算的,仅在行动算子触发时才执行。

1. 单值类型转换算子

针对单个 RDD 进行转换,不涉及其他 RDD 的交互。

(1)map(f: T => U):元素映射

对 RDD 中的每个元素应用函数 f,返回新元素的 RDD。

示例:将整数翻倍

1
2
3
val rdd = sc.makeRDD(List(1, 2, 3, 4))  
val mapRDD = rdd.map(_ * 2) // 每个元素乘以 2
mapRDD.collect().foreach(println) // 输出:2, 4, 6, 8
(2)mapPartitions(f: Iterator[T] => Iterator[U]):分区映射

对每个分区的迭代器应用函数 f,返回新迭代器的 RDD。适合分区内批量处理(如数据库连接复用)。

示例:分区内元素求和

1
2
3
4
5
6
7
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)  
// 每个分区的元素求和(返回迭代器)
val mapPartRDD = rdd.mapPartitions(iter => {
val sum = iter.sum
Iterator(sum) // 每个分区返回一个求和结果
})
mapPartRDD.collect().foreach(println) // 输出:3(1+2)、7(3+4)

map 的区别

  • map 逐条处理元素,mapPartitions 按分区批量处理,减少函数调用开销;
  • mapPartitions 可能因分区数据过大导致内存溢出(OOM),需谨慎使用。
(3)mapPartitionsWithIndex(f: (Int, Iterator[T]) => Iterator[U]):带分区索引的映射

mapPartitions 基础上增加分区索引参数,可针对不同分区执行差异化逻辑。

示例:标记元素所属分区

1
2
3
4
5
6
7
8
9
10
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)  
val indexedRDD = rdd.mapPartitionsWithIndex((index, iter) => {
iter.map(num => s"分区 $index: $num")
})
indexedRDD.collect().foreach(println)
// 输出:
// 分区 0: 1
// 分区 0: 2
// 分区 1: 3
// 分区 1: 4
(4)flatMap(f: T => TraversableOnce[U]):扁平化映射

先映射后扁平化,将元素转换为集合后拆分为单个元素。

示例:拆分句子为单词

1
2
3
4
val rdd = sc.makeRDD(List("Hello Spark", "Hello World"))  
// 先按空格拆分(返回 List),再扁平化为单个单词
val flatMapRDD = rdd.flatMap(_.split(" "))
flatMapRDD.collect().foreach(println) // 输出:Hello, Spark, Hello, World
(5)filter(f: T => Boolean):过滤

保留满足条件(f 返回 true)的元素。

示例:筛选偶数

1
2
3
val rdd = sc.makeRDD(List(1, 2, 3, 4))  
val filterRDD = rdd.filter(_ % 2 == 0)
filterRDD.collect().foreach(println) // 输出:2, 4
(6)groupBy(f: T => K):分组

按函数 f 的返回值(Key)对元素分组,返回 (K, Iterable[T]) 类型的 RDD。

示例:按奇偶分组

1
2
3
4
5
6
7
8
9
val rdd = sc.makeRDD(List(1, 2, 3, 4))  
// 按 num % 2 分组(Key 为 0 或 1)
val groupRDD = rdd.groupBy(_ % 2)
groupRDD.collect().foreach { case (key, iter) =>
println(s"Key $key: ${iter.mkString(",")}")
}
// 输出:
// Key 1: 1,3
// Key 0: 2,4
(7)distinct():去重

去除 RDD 中的重复元素(内部通过 map(x => (x, null)).reduceByKey((_, _) => _).keys 实现)。

示例:去除重复值

1
2
3
val rdd = sc.makeRDD(List(1, 2, 2, 3, 3, 3))  
val distinctRDD = rdd.distinct()
distinctRDD.collect().foreach(println) // 输出:1, 2, 3
(8)coalesce(numPartitions: Int, shuffle: Boolean = false)repartition(numPartitions: Int):调整分区
  • coalesce:默认不触发 Shuffle,适合减少分区(合并小分区),避免数据倾斜;
  • repartition:内部调用 coalesce(numPartitions, shuffle = true),触发 Shuffle,适合增加或均衡分区。

示例:合并分区

1
2
3
4
5
val rdd = sc.makeRDD(1 to 6, 3)  // 3 个分区  
// 合并为 2 个分区(不 Shuffle,可能数据不均)
val coalesceRDD = rdd.coalesce(2)
// 重分区为 2 个分区(Shuffle,数据均衡)
val repartitionRDD = rdd.repartition(2)

2. 双值类型转换算子

对两个 RDD 进行联合操作,如交集、并集、差集等。

(1)union(other: RDD[T]):并集

合并两个 RDD 的元素(不去重)。

1
2
3
4
val rdd1 = sc.makeRDD(List(1, 2, 3))  
val rdd2 = sc.makeRDD(List(3, 4, 5))
val unionRDD = rdd1.union(rdd2)
unionRDD.collect().foreach(println) // 输出:1, 2, 3, 3, 4, 5
(2)intersection(other: RDD[T]):交集

保留两个 RDD 共有的元素(去重)。

1
2
val intersectionRDD = rdd1.intersection(rdd2)  
intersectionRDD.collect().foreach(println) // 输出:3
(3)subtract(other: RDD[T]):差集

保留当前 RDD 有而 other RDD 没有的元素。

1
2
val subtractRDD = rdd1.subtract(rdd2)  
subtractRDD.collect().foreach(println) // 输出:1, 2
(4)zip(other: RDD[U]):拉链

将两个 RDD 的元素按位置配对为 (T, U),要求两个 RDD 分区数和元素数均相同。

1
2
3
4
val rdd1 = sc.makeRDD(List(1, 2, 3))  
val rdd2 = sc.makeRDD(List("a", "b", "c"))
val zipRDD = rdd1.zip(rdd2)
zipRDD.collect().foreach(println) // 输出:(1,a), (2,b), (3,c)

3. 键值类型转换算子

针对 (K, V) 类型的 RDD(需通过隐式转换为 PairRDDFunctions),提供分组、聚合、连接等操作。

(1)reduceByKey(func: (V, V) => V):按 Key 聚合

对相同 Key 的 Value 应用聚合函数(如求和、求最大值),支持 Map 端预聚合(减少 Shuffle 数据量)。

示例:按 Key 求和

1
2
3
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4)))  
val reduceRDD = rdd.reduceByKey(_ + _) // 相同 Key 的 Value 相加
reduceRDD.collect().foreach(println) // 输出:(a,3), (b,7)
(2)groupByKey():按 Key 分组

对相同 Key 的 Value 进行分组,返回 (K, Iterable[V])(无预聚合,Shuffle 数据量大,性能低于 reduceByKey)。

示例:按 Key 分组

1
2
3
4
5
6
7
val groupRDD = rdd.groupByKey()  
groupRDD.collect().foreach { case (key, iter) =>
println(s"$key: ${iter.mkString(",")}")
}
// 输出:
// a: 1,2
// b: 3,4
(3)aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U):自定义聚合

支持分区内和分区间使用不同的聚合逻辑,zeroValue 为初始值。

示例:按 Key 求分区内最大值、分区间求和

1
2
3
4
5
6
7
val rdd = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 2), ("b", 4)), 2)  
// 初始值 0,分区内求最大值,分区间求和
val aggRDD = rdd.aggregateByKey(0)(
(x, y) => math.max(x, y), // 分区内:保留最大值
(x, y) => x + y // 分区间:最大值相加
)
aggRDD.collect().foreach(println) // 输出:(a,3), (b,6)(分区 0:a=3,分区 1:b=4 → 3+0=3,4+2=6)
(4)join(other: RDD[(K, W)]):按 Key 连接

类似数据库内连接,返回 (K, (V, W)),仅保留两边都存在的 Key。

示例:两 RDD 连接

1
2
3
4
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2)))  
val rdd2 = sc.makeRDD(List(("a", 3), ("b", 4), ("c", 5)))
val joinRDD = rdd1.join(rdd2)
joinRDD.collect().foreach(println) // 输出:(a,(1,3)), (b,(2,4))

RDD 行动算子(Actions)

行动算子触发 Spark 作业的执行(转换算子仅记录逻辑计划),返回结果或写入外部存储。

1. collect(): Array[T]:收集所有元素

将 RDD 所有分区的元素拉取到 Driver 节点,返回数组(仅用于小数据集,避免 OOM)。

1
2
val rdd = sc.makeRDD(List(1, 2, 3))  
val result = rdd.collect() // 触发计算,返回 Array(1,2,3)

2. count(): Long:计数

返回 RDD 元素的总数量。

1
val count = rdd.count()  // 输出:3  

3. take(n: Int): Array[T]:取前 n 个元素

返回 RDD 的前 n 个元素(按分区顺序获取,非全局排序)。

1
val top2 = rdd.take(2)  // 输出:Array(1, 2)  

4. reduce(func: (T, T) => T):全局聚合

对 RDD 所有元素应用聚合函数(先分区内聚合,再分区间聚合)。

1
val sum = rdd.reduce(_ + _)  // 输出:6(1+2+3)  

5. foreach(f: T => Unit):遍历元素

对每个元素应用函数 f(分布式执行,无返回值,常用于输出结果到外部系统)。

1
2
3
4
// 打印每个元素(分布式执行,Driver 端看不到输出)  
rdd.foreach(println)
// 若需在 Driver 端打印,需先 collect
rdd.collect().foreach(println)

6. saveAsTextFile(path: String):保存为文本文件

将 RDD 元素按行写入外部存储(本地路径或 HDFS),每个分区生成一个文件。

1
rdd.saveAsTextFile("hdfs:///user/output/result")  // 保存到 HDFS  

核心算子对比与最佳实践

场景 推荐算子 避免使用 原因
聚合计算 reduceByKeyaggregateByKey groupByKey groupByKey 无预聚合,Shuffle 数据量大,性能低;reduceByKey 在 Map 端预聚合,减少网络传输
元素转换 map(逐条处理) mapPartitions(内存有限时) mapPartitions 按分区批量处理,性能高但可能因数据量大导致 OOM;map 内存友好但函数调用开销高
分区调整 coalesce(减少分区)、repartition(增加 / 均衡分区) 频繁调整分区 调整分区可能触发 Shuffle,增加额外开销,应在数据加载时合理设置初始分区
去重 distinct(小数据集) distinct(超大规模数据集) distinct 依赖 Shuffle,超大规模数据建议通过业务逻辑提前去重(如 SQL distinct
连接操作 join(内连接)、leftOuterJoin(左外连接) 大表 join 大表 大表连接易触发数据倾斜,建议通过广播小表(broadcastJoin)或加盐哈希优化

最佳实践总结

  1. 优先使用预聚合算子:如 reduceByKey 替代 groupByKey,减少 Shuffle 数据量。
  2. 控制分区数量:每个分区数据量建议为 128MB~256MB,避免小分区(调度开销大)或大分区(OOM 风险)。
  3. 避免全量收集数据collect() 仅用于调试,生产环境用 take(n) 或直接写入外部存储。
  4. 数据倾斜处理:对热点 Key 采用加盐、拆分单独处理等策略,避免单个 Task 耗时过长。
  5. 合理使用缓存:对重复使用的 RDD 调用 persistcache,选择序列化存储(如 MEMORY_AND_DISK_SER)节省内存。

RDD 编程常见问题与解决方案

1. 数据倾斜(某 Task 执行时间过长)

  • 现象:大部分 Task 快速完成,少数 Task 耗时远超平均(如 10 分钟 vs 1 分钟)。
  • 原因:某 Key 对应数据量过大(如 90% 数据集中在一个 Key)。
  • 解决方案
    • 加盐哈希:对倾斜 Key 添加随机前缀,分散到多个分区计算后合并(见前文 reduceByKey 示例);
    • 过滤倾斜 Key:单独处理倾斜 Key(如拆分逻辑,避免参与全局 Shuffle);
    • 使用 aggregateByKey:通过预聚合减少倾斜 Key 的数据量。

2. 内存溢出(OOM)

  • 现象:Executor 或 Driver 日志报 java.lang.OutOfMemoryError
  • 常见原因
    • 单个分区数据量过大(如 1GB 以上);
    • collect() 拉取全量大数据到 Driver;
    • 缓存未序列化的大 RDD。
  • 解决方案
    • 增加分区数(repartition),减小单个分区数据量;
    • 避免 collect(),改用 saveAsTextFile 直接输出;
    • 缓存 RDD 时使用序列化存储(MEMORY_ONLY_SERMEMORY_AND_DISK_SER)。

3. Shuffle 性能差

  • 现象:Shuffle 阶段耗时过长,磁盘 I/O 或网络传输瓶颈。
  • 解决方案
    • 调整并行度(spark.default.parallelism 设为总核数的 2~3 倍);
    • 启用 Shuffle 压缩(spark.shuffle.compress=true);
    • 选择合适的分区器(如 RangePartitioner 优化排序场景)。

4. 任务本地化级别低

  • 现象:Task 日志显示 NODE_LOCALANY 本地化级别,网络传输频繁。
  • 原因:数据与计算任务不在同一节点。
  • 解决方案
    • 读取数据时尽量使用分布式存储(如 HDFS),利用数据本地化;
    • 调整本地化等待时间(spark.locality.wait=3s),允许任务等待数据所在节点资源。

RDD 编程实战案例:WordCount 优化

以经典的 WordCount 任务为例,综合运用 RDD 算子与优化技巧:

需求

统计文本文件中单词出现次数,处理大文件(10GB)并避免数据倾斜。

优化方案

  1. 合理分区:按文件大小设置分区数(10GB / 256MB ≈ 40 分区);
  2. 预聚合减少 Shuffle:使用 reduceByKey 替代 groupByKey
  3. 处理热点单词:对高频单词(如 “the”“a”)加盐拆分,避免单个 Task 负载过高。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import org.apache.spark.{SparkConf, SparkContext}  

object OptimizedWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("OptimizedWordCount")
.set("spark.default.parallelism", "40") // 并行度设为 40
val sc = new SparkContext(conf)

// 读取 HDFS 文件(40 分区)
val lines = sc.textFile("hdfs:///user/data/bigfile.txt", 40)

// 定义热点单词列表(可从历史统计获取)
val hotWords = Set("the", "a", "is")

// 步骤 1:拆分单词并对热点单词加盐
val words = lines.flatMap(line => line.split("\\s+"))
.map(word => {
if (hotWords.contains(word)) {
// 热点单词添加随机前缀(0-9)
val salt = scala.util.Random.nextInt(10)
(s"$salt|$word", 1)
} else {
(word, 1)
}
})

// 步骤 2:聚合(预聚合减少 Shuffle 数据)
val counts = words.reduceByKey(_ + _)

// 步骤 3:热点单词去盐并二次聚合
val result = counts.map { case (key, count) =>
if (key.contains("|")) {
val Array(_, word) = key.split("\\|")
(word, count)
} else {
(key, count)
}
}.reduceByKey(_ + _) // 合并热点单词结果

// 输出结果到 HDFS
result.saveAsTextFile("hdfs:///user/output/wordcount_result")

sc.stop()
}
}

欢迎关注我的其它发布渠道