0%

序列化

Hadoop 序列化机制:从 Writable 到 Avro

序列化是分布式计算框架的核心组件,负责将对象转换为字节流以便在网络传输或磁盘存储。Hadoop 生态系统提供了多种序列化方案,其中 Writable 接口 是最基础的实现,而 Avro、Protobuf 等则在兼容性和性能上进行了扩展。本文将深入解析 Hadoop 序列化机制的原理、应用场景及优化策略。

Writable 接口核心原理

接口定义与作用

Writable 是 Hadoop 自定义的序列化接口,所有可序列化的类必须实现该接口:

1
2
3
4
public interface Writable {  
void write(DataOutput out) throws IOException; // 对象转字节流
void readFields(DataInput in) throws IOException; // 字节流转对象
}
  • 设计目标:轻量级、高效的序列化,减少网络和磁盘 I/O 开销;
  • 应用场景:MapReduce 中 Key/Value 类型(如 IntWritableText)必须实现 Writable。

核心实现类分析:IntWritable

IntWritable 是 Hadoop 对 int 类型的封装,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class IntWritable implements Writable {  
private int value;

public void set(int value) { this.value = value; }
public int get() { return value; }

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(value); // 写入4字节整数
}

@Override
public void readFields(DataInput in) throws IOException {
value = in.readInt(); // 读取4字节整数
}
}

特性

  • 固定长度存储(4 字节),无需额外元数据;
  • 读写操作直接映射为 JDK 的 DataOutput/DataInput,性能高效。

常用 Writable 类型

类型 描述 对应 Java 类型
NullWritable 空值,仅占位 null
IntWritable 32 位整数 int
LongWritable 64 位整数 long
Text UTF-8 字符串 String
BytesWritable 字节数组 byte[]
ArrayWritable 数组 Object[]
MapWritable 键值对 Map<Writable, Writable>

自定义 Writable 实现:案例分析

场景需求

假设需要在 MapReduce 中传递用户信息(UserInfo),包含:

  • 用户 ID(long
  • 用户名(String
  • 年龄(int
  • 注册时间(long,Unix 时间戳)

实现步骤

(1)定义 UserInfo 类并实现 Writable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.io.DataInput;  
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class UserInfo implements Writable {
private long userId;
private String userName;
private int age;
private long registerTime;

// 无参构造函数(必须)
public UserInfo() {}

public UserInfo(long userId, String userName, int age, long registerTime) {
this.userId = userId;
this.userName = userName;
this.age = age;
this.registerTime = registerTime;
}

// Getters and Setters
public long getUserId() { return userId; }
public String getUserName() { return userName; }
public int getAge() { return age; }
public long getRegisterTime() { return registerTime; }

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(userId); // 写入 long(8字节)
Text.writeString(out, userName); // 写入 String(UTF-8 编码)
out.writeInt(age); // 写入 int(4字节)
out.writeLong(registerTime); // 写入 long(8字节)
}

@Override
public void readFields(DataInput in) throws IOException {
userId = in.readLong();
userName = Text.readString(in);
age = in.readInt();
registerTime = in.readLong();
}

@Override
public String toString() {
return userId + "\t" + userName + "\t" + age + "\t" + registerTime;
}
}
(2)关键点说明
  • 无参构造函数:必须提供,用于反射创建对象;
  • 字段读写顺序write()readFields() 必须严格一致;
  • 字符串处理:使用 Text.writeString()Text.readString() 处理可变长度字符串;
  • 兼容性:新增字段需修改序列化逻辑,可能导致版本不兼容(需通过版本号管理)。

Writable 的进阶扩展:WritableComparable

接口定义

若需对 Writable 对象进行排序(如 MapReduce 的 Shuffle 阶段),需实现 WritableComparable 接口:

1
2
3
public interface WritableComparable<T> extends Writable, Comparable<T> {  
// 继承 compareTo 方法用于比较
}

案例:为 UserInfo 添加排序能力

假设需按用户注册时间降序排序,相同时间按用户 ID 升序排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class UserInfo implements WritableComparable<UserInfo> {  
// 原有字段和方法...

@Override
public int compareTo(UserInfo other) {
// 先按注册时间降序
int cmp = Long.compare(other.registerTime, this.registerTime);
if (cmp != 0) return cmp;

// 再按用户 ID 升序
return Long.compare(this.userId, other.userId);
}

@Override
public int hashCode() {
return (int) (userId ^ (userId >>> 32));
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
UserInfo other = (UserInfo) obj;
return userId == other.userId;
}
}

Hadoop 序列化生态系统

Writable 的局限性

  • 二进制格式:不支持跨语言(如 Python、C++);
  • 手动实现:需编写大量模板代码,维护成本高;
  • 版本兼容性差:新增字段需修改序列化逻辑,可能导致旧版本无法解析。

替代方案对比

方案 特点 适用场景
Avro - 支持 schema 演化 - 提供 JSON 和二进制格式 - 自动生成序列化代码 - 跨语言支持 大数据生态系统(如 Kafka、Hive) 数据交换与长期存储
Protobuf - 高性能二进制格式 - 强类型系统 - 广泛的跨语言支持 - 自动生成代码 微服务间通信 对性能要求极高的场景
Thrift - 支持多协议(二进制、JSON) - 服务接口定义语言(IDL) - 跨语言 RPC 分布式服务框架 需要远程调用的场景

Avro 集成示例

在 Hadoop 中使用 Avro 替代 Writable,实现更灵活的序列化:

(1)定义 Avro Schema(user.avsc)
1
2
3
4
5
6
7
8
9
10
11
{  
"type": "record",
"name": "UserInfo",
"namespace": "com.example",
"fields": [
{"name": "userId", "type": "long"},
{"name": "userName", "type": "string"},
{"name": "age", "type": "int"},
{"name": "registerTime", "type": "long"}
]
}
(2)生成 Java 类并使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.avro.Schema;  
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AvroExample {
public static class UserMapper extends Mapper<AvroKey<GenericRecord>, AvroValue<GenericRecord>,
AvroKey<Long>, AvroValue<GenericRecord>> {

@Override
protected void map(AvroKey<GenericRecord> key, AvroValue<GenericRecord> value, Context context) {
// 直接处理 Avro 记录,无需手动序列化
GenericRecord user = value.datum();
long userId = (Long) user.get("userId");
// 业务逻辑...
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Avro Example");

// 设置输入输出格式为 Avro
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);

// 其他配置...
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

序列化性能优化建议

1. 选择合适的序列化方案

  • 性能优先:Writable(原生 Hadoop)、Protobuf;
  • 兼容性优先:Avro(支持 schema 演化);
  • 跨语言需求:Avro、Protobuf、Thrift。

2. 减少序列化数据量

  • 避免传输冗余字段,只保留必要数据;
  • 使用压缩格式(如 Avro 的二进制格式比 JSON 更紧凑);
  • 对高频数据使用编码优化(如字典编码、增量编码)。

3. 优化自定义 Writable

  • 对可变长度字段(如字符串)使用高效编码(如 UTF-8);
  • 实现 WritableComparable 以避免额外排序开销;
  • 缓存常用对象,减少对象创建(如 Text 对象可复用)。

4. 序列化框架对比(性能参考)

框架 序列化速度 反序列化速度 空间效率
Writable 极快 极快
Protobuf
Avro(二进制)
Avro(JSON)
Java 原生

欢迎关注我的其它发布渠道