云计算百科
云计算领域专业知识百科平台

SpringBoot集成Netty服务器接收大量数据实例

依赖

<dependency>     <groupId>io.netty</groupId>     <artifactId>netty-all</artifactId>     <version>4.1.68.Final</version> </dependency>  

创建Netty服务类

@Component
@RequiredArgsConstructor
public class NettyServerConfig {

// 定义服务器端口号
private final int port = 13030;

// 注入JsonServerHandler处理器
private final JsonServerHandler jsonServerHandler;

// 初始化方法,在Bean创建时启动Netty服务器
@PostConstruct
public void startNetty() {
new Thread(() -> {
// 创建boss和worker两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建ServerBootstrap实例以配置服务器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定使用NIO传输Channel
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 配置通道处理器,包括解码器、编码器和业务处理器
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(jsonServerHandler); // 使用注入的JsonServerHandler
}
})
.option(ChannelOption.SO_BACKLOG, 128) // 设置队列大小
.childOption(ChannelOption.SO_KEEPALIVE, true); // 保持连接

// 绑定端口并启动服务器
ChannelFuture f = bootstrap.bind(port).sync();
// 等待服务器通道关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅地关闭线程组
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}).start();
}
}

创建EchoServerHandler 类接收并存储数据

@Slf4j
@ChannelHandler.Sharable
@Component
public class JsonServerHandler extends SimpleChannelInboundHandler<String> {
@Autowired
private ReceptionMapper receptionMapper;

private final ObjectMapper objectMapper = new ObjectMapper();

// 处理接收到的消息
@Override
@Transactional
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
try {
// 反序列化接收到的JSON消息
ReceptionData receptionData = null;
try {
receptionData = objectMapper.readValue(msg, ReceptionData.class);
} catch (JsonProcessingException e) {
log.error("解码数据错误", e);
}
            //存入数据库
receptionMapper.saveData(receptionData );

} catch (Exception e) {
log.error("接收数据错误", e);
}
}

赞(0)
未经允许不得转载:网硕互联帮助中心 » SpringBoot集成Netty服务器接收大量数据实例
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!