目录
一、技术选型
二、项目初始化与依赖配置
1.1 创建 Maven 项目
三、服务端实现
2.1 服务端启动类(ChatServer)
2.2 服务端处理器初始化(ChatServerInitializer)
2.3 消息处理器(ChatServerHandler)
四、客户端实现
3.1 客户端启动类(ChatClient)
3.2 客户端处理器初始化(ChatClientInitializer)
3.3 客户端消息处理器(ChatClientHandler)
五、核心功能扩展
4.1 粘包/拆包处理
4.2 心跳检测与连接保活
4.3 群聊功能实现
六、运行与测试
七、完整代码结构
八、关键点总结
九、进阶优化方向
十、Netty 高并发聊天服务器常见问题及解决方案
1. 连接数激增导致内存泄露或 OOM
2. 高并发下消息丢失、顺序错乱
3. 客户端频繁断连(心跳丢失)
4. 线程阻塞导致反应迟钝
5. 粘包/拆包问题
6. 广播消息性能瓶颈
7. 单机承载能力有限
8. SSL/TLS加密导致吞吐下降
一、技术选型
技术 | 用途 |
Netty 4.x | 网络通信 |
Java 8+ | 基础开发 |
Maven | 项目构建 |
二、项目初始化与依赖配置
1.1 创建 Maven 项目
在 pom.xml 中添加 Netty 依赖:
<dependencies>
<!– Netty 核心依赖 –>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version> <!– 使用最新稳定版本 –>
</dependency>
<!– JSON 序列化(可选,用于消息对象转换) –>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
三、服务端实现
2.1 服务端启动类(ChatServer)
服务端启动器
public class ChatServer {
private final int port;
public ChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于处理连接请求
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理数据读写
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChatServerInitializer()) // 配置处理器链
.option(ChannelOption.SO_BACKLOG, 128) // 连接队列大小
.childOption(ChannelOption.SO_KEEPALIVE, true); // 保持连接
// 绑定端口并启动服务器
ChannelFuture f = b.bind(port).sync();
System.out.println(\”服务器启动成功,监听端口:\” + port);
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new ChatServer(port).run();
}
}
2.2 服务端处理器初始化(ChatServerInitializer)
管道初始化器
public class ChatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加编解码器(处理消息拆包/粘包)
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4)); // 发送消息时添加长度字段
// 添加字符串编解码器(可替换为自定义协议)
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加自定义处理器
pipeline.addLast(new ChatServerHandler());
}
}
2.3 消息处理器(ChatServerHandler)
业务逻辑处理
@Sharable
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
// 在线用户列表(简化版,实际应使用线程安全的集合)
private static final Map<String, Channel> onlineUsers = new HashMap<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(\”客户端连接:\” + ctx.channel().remoteAddress());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 处理消息逻辑(如登录、聊天、群发等)
if (msg.startsWith(\”LOGIN:\”)) {
String username = msg.split(\”:\”)[1];
onlineUsers.put(username, ctx.channel());
ctx.writeAndFlush(\”登录成功!当前在线用户:\” + onlineUsers.keySet());
} else if (msg.startsWith(\”SEND:\”)) {
String[] parts = msg.split(\”:\”);
String toUser = parts[1];
String content = parts[2];
Channel toChannel = onlineUsers.get(toUser);
if (toChannel != null && toChannel.isOpen()) {
toChannel.writeAndFlush(\”来自 \” + ctx.channel().remoteAddress() + \” 的消息:\” + content);
} else {
ctx.writeAndFlush(\”用户 \” + toUser + \” 不在线!\”);
}
} else {
ctx.writeAndFlush(\”未知指令,请使用 LOGIN:username 或 SEND:toUser:message\”);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.pri
评论前必须登录!
注册