0%

netty示例

Netty 实战示例解析:服务端与客户端通信实现

Netty 作为高性能 NIO 框架,其核心优势在于通过事件驱动模型处理高并发网络通信。上述示例完整实现了一个基础的客户端 - 服务端通信流程,涵盖了 Netty 核心组件的协作方式。本文将深入解析该示例的工作原理、关键细节及优化方向。

示例整体流程梳理

该示例实现了一个简单的 TCP 通信场景:

  1. 服务端:绑定 8765 端口,监听客户端连接,接收客户端消息后回复 “hello”。
  2. 客户端:连接服务端 8765 端口,发送 “你好呀” 消息,接收服务端回复并打印。

交互时序

1
客户端启动 → 连接服务端 → 发送 "你好呀" → 服务端接收并打印 → 服务端回复 "hello" → 客户端接收并打印

服务端实现详解

核心组件初始化(Server.main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 1. 初始化线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 处理连接建立(1个线程足够)
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理IO事件(默认CPU核心数*2线程)

// 2. 配置服务端引导类
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup) // 绑定线程组
.channel(NioServerSocketChannel.class) // 服务端通道类型(NIO非阻塞)
.option(ChannelOption.SO_BACKLOG, 128) // 连接队列大小(等待被accept的连接数)
.childOption(ChannelOption.SO_KEEPALIVE, true) // 客户端通道启用心跳
.childHandler(new ChannelInitializer<SocketChannel>() { // 初始化客户端通道处理器
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ServerHandler()); // 添加自定义处理器
}
});

// 3. 启动服务
ChannelFuture future = bootstrap.bind(8765).sync(); // 绑定端口(同步等待)
future.channel().closeFuture().sync(); // 阻塞等待服务关闭

关键细节

  • bossGroupworkerGroup 分工:boss 仅负责接收连接,接收后将连接注册到 worker 线程处理后续 IO。
  • SO_BACKLOG:TCP 协议中,未完成三次握手的连接会放入半连接队列,已完成的放入全连接队列,SO_BACKLOG 定义全连接队列大小(超过则拒绝新连接)。
  • childHandler:为每个新接入的客户端 SocketChannel 配置处理器,而非服务端自身的 ServerSocketChannel

服务端处理器(ServerHandler

ServerHandler 继承 ChannelInboundHandlerAdapter,重写核心事件方法处理客户端消息:

方法 触发时机 作用
channelRead 客户端发送消息到达时 读取消息并转换为字符串,打印到控制台
channelReadComplete 消息读取完毕后 向客户端回复 “hello” 消息
exceptionCaught 发生异常时(如客户端断开连接) 关闭通道,释放资源

代码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 读取客户端消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg; // Netty 数据容器(类似 ByteBuffer)
byte[] data = new byte[buf.readableBytes()]; // 获取可读取字节数
buf.readBytes(data); // 将数据读入字节数组
String request = new String(data, StandardCharsets.UTF_8); // 转换为字符串
System.out.println("接收到的消息为:->" + request);
}

// 消息读取完毕后回复客户端
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// 写入并刷新缓冲区(write + flush)
ctx.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes()));
}
  • ByteBuf 是 Netty 对 ByteBuffer 的增强,支持动态扩容、读写指针分离等特性。
  • ReferenceCountUtil.release(msg):Netty 采用引用计数管理 ByteBuf 内存,手动释放避免内存泄漏(示例中已在 channelRead 中处理)。

客户端实现详解

客户端初始化(Client.main

客户端与服务端流程类似,但只需一个线程组,核心操作是连接服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
EventLoopGroup group = new NioEventLoopGroup(); // 客户端单线程组
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class) // 客户端通道类型
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ClientHandler()); // 客户端处理器
}
});

ChannelFuture future = bootstrap.connect("127.0.0.1", 8765).sync(); // 连接服务端
future.channel().closeFuture().sync(); // 阻塞等待连接关闭

区别于服务端

  • 客户端通道类型为 NioSocketChannel(对应 TCP 客户端 socket)。
  • 无需区分 boss/worker 线程组,单一组即可处理连接和 IO。

客户端处理器(ClientHandler

客户端处理器重写两个核心方法:

方法 触发时机 作用
channelActive 与服务端连接建立成功后 向服务端发送 “你好呀” 消息
channelRead 接收到服务端回复消息时 读取并打印服务端回复的 “hello”

代码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 连接建立后发送消息
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 发送字节数据(Unpooled.copiedBuffer 用于创建 ByteBuf)
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀".getBytes()));
}

// 接收服务端回复
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
String response = new String(data, StandardCharsets.UTF_8);
System.out.println("接收到的回复为:->" + response);
}
  • channelActive:连接成功后触发,适合在此发送初始消息(如握手数据)。

核心组件协作机制

  1. ChannelPipeline 职责链
    每个 Channel 对应一个 ChannelPipeline,本质是 ChannelHandlerContext 的双向链表。客户端发送的消息会依次经过 pipeline 中的处理器(如 ServerHandler),处理完成后通过 writeAndFlush 写回客户端。

  2. 事件传播方向

    • 入站事件(如 channelRead):从 HeadContext 流向 TailContext(客户端 → 服务端)。
    • 出站事件(如 write):从 TailContext 流向 HeadContext(服务端 → 客户端)。
  3. 异步操作与 ChannelFuture
    bindconnectwriteAndFlush 等操作均返回 ChannelFuture,通过 addListener 注册回调,避免阻塞线程:

    1
    2
    3
    4
    5
    6
    7
    8
    // 异步处理连接结果(非阻塞)
    bootstrap.connect("127.0.0.1", 8765).addListener(future -> {
    if (future.isSuccess()) {
    System.out.println("连接成功");
    } else {
    System.err.println("连接失败:" + future.cause());
    }
    });

优化与扩展方向

  1. 添加编解码器
    示例中手动转换 ByteBuf 与字符串繁琐且易出错,可使用 Netty 内置编解码器:
1
2
3
// 服务端/客户端 pipeline 中添加
ch.pipeline().addLast(new StringDecoder()); // 自动将 ByteBuf 转为 String
ch.pipeline().addLast(new StringEncoder()); // 自动将 String 转为 ByteBuf

之后可直接在 channelRead 中接收 String 类型消息。

  1. 处理半包 / 粘包
    TCP 传输可能导致消息拆分或合并,需添加帧解码器:

    1
    2
    // 按换行符分割消息(适用于文本协议)
    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
  2. 线程安全与资源管理

    • 避免在 ChannelHandler 中创建非线程安全对象(如 SimpleDateFormat),可使用 ThreadLocal 或单例。
    • 确保 finally 块中调用 shutdownGracefully() 释放 EventLoopGroup 资源。
  3. 日志打印
    添加 Netty 日志处理器便于调试:

    1
    ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); // 打印详细通信日志

欢迎关注我的其它发布渠道