0%

编解码器

Netty 编解码器详解:从基础到 Protobuf 实战

在网络通信中,数据以二进制形式传输,而应用程序则需处理结构化数据(如字符串、对象)。编解码器是连接二进制数据与应用层对象的桥梁,Netty 提供了丰富的编解码器组件,简化了数据转换过程。本文将系统讲解 Netty 编解码器的设计原理、常用实现及自定义方法,并通过 Protobuf 实战展示跨语言序列化方案。

编解码器的核心概念

基本定义

  • 编码器(Encoder):将应用层对象转换为二进制字节流(出站操作),继承 ChannelOutboundHandler
  • 解码器(Decoder):将二进制字节流转换为应用层对象(入站操作),继承 ChannelInboundHandler

核心目标:屏蔽底层二进制数据的处理细节,让开发者专注于业务逻辑。

Netty 编解码器的设计模式

Netty 编解码器遵循责任链模式,通过 ChannelPipeline 串联多个编解码器和业务处理器:

  • 数据入站时,依次经过解码器 → 业务处理器。
  • 数据出站时,依次经过业务处理器 → 编码器。

例如,一个简单的字符串通信流程:

1
客户端发送 String → StringEncoder 编码为 ByteBuf → 网络传输 → 服务端 StringDecoder 解码为 String → 业务处理

Netty 内置编解码器

Netty 提供了多种开箱即用的编解码器,覆盖常见数据类型:

字符串编解码器

  • StringEncoder:将 String 编码为 ByteBuf(默认 UTF-8 编码)。
  • StringDecoder:将 ByteBuf 解码为 String

使用示例

1
2
3
4
// 服务端/客户端 pipeline 配置
pipeline.addLast(new StringDecoder()); // 入站解码
pipeline.addLast(new StringEncoder()); // 出站编码
pipeline.addLast(new BusinessHandler()); // 业务处理器

业务处理器中直接操作字符串

1
2
3
4
5
6
7
// 服务端接收字符串
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String content = (String) msg; // 已被 StringDecoder 转换
System.out.println("收到消息:" + content);
ctx.writeAndFlush("收到消息:" + content); // 会被 StringEncoder 编码
}

Java 对象编解码器

  • ObjectEncoder:通过 Java 序列化将对象编码为 ByteBuf
  • ObjectDecoder:通过 Java 反序列化将 ByteBuf 解码为对象。

使用示例

1
2
3
4
// 服务端配置(客户端类似)
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectHandler());

注意:Java 序列化存在严重缺陷(效率低、体积大、不跨语言),仅适合简单场景,生产环境建议使用 Protobuf 等现代序列化方案。

Protobuf 编解码器实战

Protobuf(Protocol Buffers)是 Google 推出的高效二进制序列化格式,具有跨语言、体积小、速度快等优势,是 Netty 中推荐的对象序列化方案。

环境配置(Maven)

通过 Maven 插件自动生成 Protobuf 对应的 Java 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<dependencies>
<!-- Netty 核心依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.57.Final</version>
</dependency>
<!-- Protobuf 核心依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.11.3</version>
</dependency>
</dependencies>

<build>
<extensions>
<!-- 自动识别操作系统,下载对应 protoc 编译器 -->
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<!-- Protobuf 编译插件 -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<!-- 指定 protoc 版本 -->
<protocArtifact>com.google.protobuf:protoc:3.11.3:exe:${os.detected.classifier}</protocArtifact>
<!-- proto 文件目录 -->
<protoSourceRoot>src/main/proto</protoSourceRoot>
<!-- 生成 Java 类的输出目录 -->
<outputDirectory>src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal> <!-- 生成 Java 消息类 -->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

定义 Protobuf 消息格式

src/main/proto 目录下创建 user.proto

1
2
3
4
5
6
7
8
9
syntax = "proto3"; // 使用 proto3 语法
package com.example.netty.codec; // 生成类的包名
option java_outer_classname = "UserProto"; // 外部类名

// 定义 User 消息结构
message User {
int32 id = 1; // 字段编号(用于二进制编码,与字段名无关)
string name = 2;
}

执行 mvn compile 生成 Java 类 UserProto(包含内部类 User 作为数据载体)。

服务端与客户端配置

服务端(解码 Protobuf 对象)
1
2
3
4
5
6
7
8
9
10
11
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 配置 Protobuf 解码器(指定默认实例,用于反序列化)
ch.pipeline().addLast(new ProtobufDecoder(UserProto.User.getDefaultInstance()));
ch.pipeline().addLast(new ServerHandler()); // 业务处理器
}
});

服务端处理器

1
2
3
4
5
6
7
8
9
10
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 直接接收解码后的 User 对象
UserProto.User user = (UserProto.User) msg;
System.out.println("收到用户:id=" + user.getId() + ", name=" + user.getName());
// 回复客户端(简单字符串,需自行配置 StringEncoder)
ctx.writeAndFlush("用户信息已接收");
}
}
客户端(编码 Protobuf 对象)
1
2
3
4
5
6
7
8
9
10
11
12
13
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 配置 Protobuf 编码器
ch.pipeline().addLast(new ProtobufEncoder());
// 配置字符串解码器(用于接收服务端回复)
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ClientHandler());
}
});

客户端处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ClientHandler extends ChannelInboundHandlerAdapter {
// 连接建立后发送 Protobuf 对象
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 构建 User 对象
UserProto.User user = UserProto.User.newBuilder()
.setId(1001)
.setName("Alice")
.build();
ctx.writeAndFlush(user); // 自动被 ProtobufEncoder 编码
}

// 接收服务端回复(字符串)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String response = (String) msg;
System.out.println("服务端回复:" + response);
}
}

自定义编解码器

当内置编解码器无法满足需求时(如自定义协议),可通过继承 Netty 基础类实现自定义编解码器。

自定义解码器(ByteToMessageDecoder)

将字节流转换为自定义对象,重写 decode 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CustomDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 确保有足够的字节(例如,协议格式:2字节长度 + 内容)
if (in.readableBytes() < 2) {
return; // 字节不足,等待后续数据
}
// 标记当前读指针位置
in.markReaderIndex();
// 读取长度
short length = in.readShort();
// 检查内容是否足够
if (in.readableBytes() < length) {
in.resetReaderIndex(); // 重置指针,等待完整数据
return;
}
// 读取内容并转换为对象
byte[] content = new byte[length];
in.readBytes(content);
CustomObject obj = new CustomObject(new String(content, UTF_8));
out.add(obj); // 传递给下一个处理器
}
}

自定义编码器(MessageToByteEncoder)

将自定义对象转换为字节流,重写 encode 方法:

1
2
3
4
5
6
7
8
9
public class CustomEncoder extends MessageToByteEncoder<CustomObject> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomObject msg, ByteBuf out) {
// 协议格式:2字节长度 + 内容
byte[] content = msg.getContent().getBytes(UTF_8);
out.writeShort(content.length); // 写入长度
out.writeBytes(content); // 写入内容
}
}

使用自定义编解码器

1
2
3
4
// 在 pipeline 中添加自定义编解码器
pipeline.addLast(new CustomDecoder());
pipeline.addLast(new CustomEncoder());
pipeline.addLast(new BusinessHandler());

HTTP 编解码器

Netty 提供了专门的 HTTP 编解码器,用于处理 HTTP 协议:

编解码器 功能描述
HttpRequestDecoder 将字节流解码为 HttpRequestHttpContent
HttpResponseEncoder HttpResponseHttpContent 编码为字节流
HttpObjectAggregator 将 HTTP 分块数据聚合为完整的 FullHttpRequest/FullHttpResponse

使用示例(HTTP 服务端)

1
2
3
4
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合最大 64KB 的 HTTP 消息
pipeline.addLast(new HttpServerHandler());

编解码器使用最佳实践

  1. 顺序问题:编解码器在 ChannelPipeline 中的顺序至关重要:
    • 入站方向:先解码器,后业务处理器。
    • 出站方向:先业务处理器,后编码器。
  2. 资源管理:解码器需注意半包问题(如通过 markReaderIndex/resetReaderIndex 处理不完整数据)。
  3. 性能优化
    • 避免在编解码器中执行耗时操作(如数据库查询)。
    • 对高频消息使用池化缓冲区(PooledByteBuf)。
  4. 协议设计:自定义协议时应包含长度字段,便于解码器判断消息边界。

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10