0%

RDD序列化

spark RDD序列化序列化详解与优化

在 Spark 分布式计算中,序列化是影响性能的关键因素之一。当数据在 Driver 与 Executor 之间传输,或需要持久化存储时,都需要进行序列化。本文深入分析 Spark 的序列化机制,对比 Java 与 Kryo 的差异,并提供优化实践指南。

序列化机制概述

序列化的作用

  • 跨节点传输:将对象从 Driver 发送到 Executor,或在 Executor 之间交换数据;
  • 持久化存储:将 RDD 缓存到内存或磁盘时,需要序列化对象;
  • 任务闭包:Driver 端的变量和函数传递到 Executor 执行时,需序列化闭包。

Spark 支持的序列化方式

序列化方式 特点 性能 兼容性
Java 序列化 默认方式,支持所有实现 Serializable 的类,无需额外配置
Kryo 序列化 自定义二进制序列化,速度是 Java 的 10 倍,需注册类 中等
Avro/Protobuf 支持跨语言的结构化数据序列化,需依赖第三方库

Java 序列化与 Kryo 对比

Java 序列化

  • 优点:无需额外配置,支持所有实现 java.io.Serializable 的类;
  • 缺点
    • 序列化后的字节数大(通常是 Kryo 的 3~5 倍);
    • 序列化速度慢,影响网络传输和磁盘 I/O;
    • 可能触发类加载问题(如版本不一致)。

示例

1
2
3
4
5
// 定义可序列化的类(实现 Serializable 接口)  
case class Person(name: String, age: Int) extends java.io.Serializable

val rdd = sc.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
rdd.cache() // 默认使用 Java 序列化缓存

Kryo 序列化

  • 优点
    • 速度快:序列化和反序列化性能显著优于 Java;
    • 空间效率高:生成的字节数更少;
    • 支持自定义序列化器。
  • 缺点
    • 需要手动注册类(否则会动态注册,影响性能);
    • 不支持所有 Java 特性(如内部类);
    • 需要额外配置。

配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.serializer.KryoSerializer  

val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("KryoExample")
.set("spark.serializer", classOf[KryoSerializer].getName) // 启用 Kryo
.set("spark.kryo.registrationRequired", "true") // 强制注册所有类
.registerKryoClasses(Array(
classOf[Person],
classOf[MyCustomClass]
)) // 注册自定义类

val sc = new SparkContext(conf)

Kryo 优化实践

注册常用类

通过 registerKryoClasses 注册频繁使用的类,避免动态注册开销:

1
2
3
4
5
6
7
8
9
// 注册单个类  
conf.registerKryoClasses(Array(classOf[Person]))

// 注册多个类
conf.registerKryoClasses(Array(
classOf[Person],
classOf[java.util.HashMap[_, _]],
classOf[org.apache.hadoop.io.Text]
))

自定义序列化器

对于复杂对象或性能敏感场景,可自定义 Kryo 序列化器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.esotericsoftware.kryo.Kryo  
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output

// 自定义 Person 类的序列化器
class PersonSerializer extends Serializer[Person] {
override def write(kryo: Kryo, output: Output, person: Person): void = {
output.writeString(person.name)
output.writeInt(person.age)
}

override def read(kryo: Kryo, input: Input, clazz: Class[Person]): Person = {
val name = input.readString()
val age = input.readInt()
new Person(name, age)
}
}

// 注册自定义序列化器
conf.set("spark.kryo.customClasses", "com.example.PersonSerializer")

调整 Kryo 参数

通过 SparkConf 调整 Kryo 相关参数:

参数名 含义 默认值 优化建议
spark.serializer 序列化器类名 JavaSerializer 改为 KryoSerializer
spark.kryoserializer.buffer.max 单个对象最大序列化缓冲区大小 64m 大对象场景增加到 512m
spark.kryo.registrationRequired 是否强制注册类 false 设为 true 避免动态注册
spark.kryo.referenceTracking 是否跟踪对象引用 true 无循环引用时设为 false

常见序列化问题与解决方案

1. NotSerializableException

现象:RDD 操作时抛出 java.io.NotSerializableException
原因:闭包中引用了未序列化的对象(如数据库连接、非序列化类)。
解决方案

  • 确保闭包中只引用可序列化的对象;
  • 将不可序列化的对象标记为 @transient,并在 Executor 端重新初始化;
  • 使用 mapPartitions 减少闭包捕获(每个分区初始化一次)。

示例

1
2
3
4
5
6
7
8
9
10
11
12
class DatabaseConnection { /* ... */ }  // 未实现 Serializable  

val conn = new DatabaseConnection() // 不可序列化对象

// 错误写法:闭包捕获了 conn
val rdd = sc.parallelize(1 to 10).map(x => conn.query(x))

// 正确写法:使用 mapPartitions 并在 Executor 端初始化
val rdd = sc.parallelize(1 to 10).mapPartitions(iter => {
val conn = new DatabaseConnection() // 在 Executor 端创建
iter.map(x => conn.query(x))
})

2. 序列化性能瓶颈

现象:任务执行缓慢,尤其是 Shuffle 阶段。
解决方案

  • 切换到 Kryo 序列化;
  • 注册常用类;
  • 避免序列化大对象(如整个数据集),优先处理分区;
  • 使用 spark.kryoserializer.buffer.max 增加缓冲区大小。

3. 类版本不一致问题

现象:反序列化时抛出 ClassNotFoundExceptionInvalidClassException
解决方案

  • 确保所有节点使用相同版本的类;
  • 为自定义类添加 serialVersionUID
  • 使用 Kryo 时,注册类并启用 spark.kryo.registrationRequired

序列化最佳实践总结

  1. 优先使用 Kryo
    • 配置 spark.serializer=org.apache.spark.serializer.KryoSerializer
    • 注册所有自定义类(尤其是频繁使用的类)。
  2. 优化数据结构
    • 避免序列化复杂对象或大集合;
    • 使用基本类型数组替代集合类(如 Array[Int] 替代 List[Int])。
  3. 减少闭包捕获
    • 只在闭包中引用必要的变量;
    • 使用 mapPartitions 减少对象序列化次数。
  4. 监控与调优
    • 通过 Spark UI 监控序列化时间;
    • 根据对象大小调整 spark.kryoserializer.buffer.max
    • 使用自定义序列化器优化特殊类型。

示例代码:Kryo 配置与使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.apache.spark.{SparkConf, SparkContext}  

// 定义需要序列化的类
case class Person(name: String, age: Int)
case class Order(id: Long, items: Array[String])

object SerializationExample {
def main(args: Array[String]): Unit = {
// 创建配置并启用 Kryo 序列化
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("SerializationExample")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(Array(
classOf[Person],
classOf[Order],
classOf[Array[String]] // 注册常用类型
))

val sc = new SparkContext(conf)

// 创建并操作 RDD
val people = sc.parallelize(Seq(
Person("Alice", 25),
Person("Bob", 30)
))

// 缓存 RDD(使用 Kryo 序列化)
people.cache()

// 执行操作
val result = people.filter(_.age > 25).collect()
result.foreach(println)

sc.stop()
}
}

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10