SpringBoot集成Netty实现Ws和Tcp通信

        本教程将指导你如何在 Spring Boot 项目中集成 Netty,实现 WebSocket 和 TCP 通信。以下是详细的步骤和代码示例。  

环境准备

在 你的pom.xml 中添加 Netty 依赖:

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>5.0.0.Alpha2</version>
</dependency>

Ws通信具体模块

1.初始服务端代码

import com.leyting.handler.MsgHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class Init implements ApplicationRunner {public static void serverStart(int port) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();try {serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new IdleStateHandler(12,12,12, TimeUnit.DAYS));pipeline.addLast(new HttpObjectAggregator(1024*64));pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));pipeline.addLast(new MsgHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(port).sync();channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {if (channelFuture1.isSuccess()) {log.info("Websocket启动成功,端口:{}", port);}else {log.warn("Websocket启动失败,端口:{}", port);}});channelFuture.channel().closeFuture().sync();} catch (Exception e) {throw new RuntimeException(e);}finally {bossGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}@Overridepublic void run(ApplicationArguments args)  {serverStart(7309);}
}

2.信息处理器

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
public class MsgHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {public final static Map<ChannelId, Channel> CHANNEL = new ConcurrentHashMap<>();private final static ChannelGroup channelGroups = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overridepublic void handlerAdded(ChannelHandlerContext ctx){channelGroups.add(ctx.channel());SimpleDateFormat format = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");ChannelId id = ctx.channel().id();CHANNEL.put(id, ctx.channel());log.info("客服端:{} 上线了!",id);ctx.channel().writeAndFlush(new TextWebSocketFrame(format.format(new Date()) + " 欢迎你的上线"));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){cause.printStackTrace();ChannelId id = ctx.channel().id();CHANNEL.remove(id);channelGroups.remove(ctx.channel());log.info("客服端:{} 异常断开!",id);ctx.close();}@Overridepublic void channelInactive(ChannelHandlerContext ctx){channelGroups.remove(ctx.channel());log.info("客服端:{} 断开连接!",ctx.channel().id());CHANNEL.remove(ctx.channel().id());ctx.close();}@Overrideprotected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {if (!CHANNEL.containsKey(ctx.channel().id())) { CHANNEL.put(ctx.channel().id(), ctx.channel());}String msg = textWebSocketFrame.text();log.info("客服端:{} 发送消息:{}", ctx.channel().id(), msg );ctx.channel().writeAndFlush(new TextWebSocketFrame("服务端收到您发送的信息:" + msg));}
}

3.测试用例 

测试案例
Ws测试用例

WebSocket测试网站http://wstool.js.org/

Tcp通信具体模块

1.初始服务端代码

import com.leyting.handler.MsgHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class Init implements ApplicationRunner {public static void serverStart(int port) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();try {serverBootstrap.group(bossGroup,workerGroup)// 添加通道设置非阻塞.channel(NioServerSocketChannel.class)// 服务端可连接队列数量.option(ChannelOption.SO_BACKLOG, 128)// 开启长连接.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)// 流程处理.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new MsgHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(port).sync();channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {if (channelFuture1.isSuccess()) {log.info("TcpServer启动成功,端口:{}", port);}else {log.error("TcpServer启动失败,端口:{}", port);}});channelFuture.channel().closeFuture().sync();} catch (Exception e) {throw new RuntimeException(e);}finally {bossGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}@Overridepublic void run(ApplicationArguments args)  {serverStart(7311);}
}

2.信息处理器代码

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
public class MsgHandler extends ChannelInboundHandlerAdapter  {public final static Map<ChannelId, Channel> CHANNEL = new ConcurrentHashMap<>();private final static ChannelGroup channelGroups = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overridepublic void handlerAdded(ChannelHandlerContext ctx)  {channelGroups.add(ctx.channel());SimpleDateFormat format = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");ctx.channel().writeAndFlush(new TextWebSocketFrame(format.format(new Date()) + " 欢迎你的上线"));ChannelId id = ctx.channel().id();CHANNEL.put(id, ctx.channel());log.info("客服端:{} 上线了!",id);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();channelGroups.remove(ctx.channel());log.info("客服端:{} 异常!",ctx.channel().id());CHANNEL.remove(ctx.channel().id());ctx.close();}@Overridepublic void channelInactive(ChannelHandlerContext ctx)  {channelGroups.remove(ctx.channel());log.info("客服端:{} 断开连接!",ctx.channel().id());CHANNEL.remove(ctx.channel().id());ctx.close();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg instanceof ByteBuf byteBuf) {// 将 ByteBuf 转换为字符串String message = byteBuf.toString(CharsetUtil.UTF_8);log.info("客服端:{} 发送消息:{}", ctx.channel().id(), message);ctx.channel().writeAndFlush(Unpooled.copiedBuffer("服务端收到您发送的信息:" + message, CharsetUtil.UTF_8));} else {log.info("客服端:{} 发送未知类型的消息:{}", ctx.channel().id(), msg);}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/897139.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

与中国联通技术共建:通过obdiag分析OceanBase DDL中的报错场景

中国联通软件研究院&#xff08;简称联通软研院&#xff09;在全面评估与广泛调研后&#xff0c;在 2021年底决定采用OceanBase 作为基础&#xff0c;自研分布式数据库产品CUDB&#xff08;即China Unicom Database&#xff0c;中国联通数据库&#xff09;。目前&#xff0c;该…

机器学习-随机森林解析

目录 一、.随机森林的思想 二、随机森林构建步骤 1.自助采样 2.特征随机选择 3构建决策树 4.集成预测 三. 随机森林的关键优势 ​**(1) 减少过拟合** ​**(2) 高效并行化** ​**(3) 特征重要性评估** ​**(4) 耐抗噪声** 四. 随机森林的优缺点 ​优点 ​缺点 五.…

深度集成DeepSeek,智问BI@GPT引领商业智能“深度思考“革命

当下传统的数据分析工具如同显微镜&#xff0c;虽然能帮助我们看到数据的细节&#xff0c;却难以揭示数据背后的深层规律。亿信华辰最新升级的智问BIGPT产品&#xff0c;通过深度集成DeepSeek大模型&#xff0c;首次在商业智能领域实现了"深度思考"功能。这项突破性创…

Mysql安装方式

方式一&#xff1a;安装包安装 下载安装包 官网直接下载&#xff1a;https://dev.mysql.com/downloads/ 安装配置 2.1、双击刚刚下载好的msi文件&#xff0c;开始安装MySQL。 2.2、选择自定义模式Custom安装 2.3、点击选择自己电脑对应的mysql安装目录 2.5、继续点击下一步&…

unity调用本地部署deepseek全流程

unity调用本地部署deepseek全流程 deepseek本地部署 安装Ollama 搜索并打开Ollama官网[Ollama](https://ollama.com/download) 点击Download下载对应版本 下载后点击直接安装 安装deepseek大语言模型 官网选择Models 选择deepseek-r1&#xff0c;选择对应的模型&#xff0…

Linux - 网络基础(应用层,传输层)

一、应用层 1&#xff09;发送接收流程 1. 发送文件 write 函数发送数据到 TCP 套接字时&#xff0c;内容不一定会立即通过网络发送出去。这是因为网络通信涉及多个层次的缓冲和处理&#xff0c;TCP 是一个面向连接的协议&#xff0c;它需要进行一定的排队、确认和重传等处理…

wxWidgets GUI 跨平台 入门学习笔记

准备 参考 https://wiki.wxwidgets.org/Microsoft_Visual_C_NuGethttps://wiki.wxwidgets.org/Tools#Rapid_Application_Development_.2F_GUI_Buildershttps://docs.wxwidgets.org/3.2/https://docs.wxwidgets.org/latest/overview_helloworld.htmlhttps://wizardforcel.gitb…

使用joblib 多线程/多进程

文章目录 1. Joblib 并行计算的两种模式多进程(Multiprocessing,适用于 CPU 密集型任务)多线程(Multithreading,适用于 I/O 密集型任务)2. Joblib 的基本用法3. Joblib 多进程示例(适用于 CPU 密集型任务)示例:计算平方4. Joblib 多线程示例(适用于 I/O 密集型任务)…

神旗视讯Linux client 3.4版本发布和开源

在国产化替代的大潮中&#xff0c;神旗视讯推出专为统信 Linux、麒麟 Linux OS 打造打造的开源视频会议客户端&#xff0c;全面适配国产 x86 及 arm64 架构 CPU&#xff0c;以稳定、安全、灵活的特性&#xff0c;为国产操作系统用户带来前所未有的高效沟通体验&#xff0c;同时…

HCIA-IP路由动态-RIP

一、概念 动态路由是指路由器通过运行动态路由协议&#xff08;RIP、OSPF等&#xff09;&#xff0c;自动学习和发现网络中的路由信息。路由器之间通过交换路由协议数据包&#xff0c;互相通告自己所知道的网络信息&#xff0c;从而构建和更新路由表。 二、RIP(路由信息协议)…

VEC系列-RabbitMQ 入门笔记

消息队列&#xff08;MQ&#xff09;对于开发者来说是一个经常听到的词汇&#xff0c;但在实际开发中&#xff0c;大多数人并不会真正用到它。网上已经有很多关于 MQ 概述和原理的详细讲解&#xff0c;官网文档和技术博客也都介绍得很深入&#xff0c;因此&#xff0c;我在这里…

js中??是什么意思

在 JavaScript 中&#xff0c;?? 是一个逻辑运算符&#xff0c;称为 空值合并运算符&#xff08;Nullish Coalescing Operator&#xff09;。它用于检查左侧的值是否为 null 或 undefined&#xff0c;如果是&#xff0c;则返回右侧的值&#xff1b;否则返回左侧的值。 语法 …

常见限流算法

限流是指在高并发、大流量请求的情况下&#xff0c;限制新的流量对系统的访问&#xff0c;以保证系统服务的安全性。常见的限流算法及其详细介绍如下&#xff1a; 计数器算法&#xff08;Fixed Window Counter&#xff09; 原理&#xff1a;使用一个固定时间窗口内的计数器来…

YOLOv12本地部署教程——42%速度提升,让高效目标检测触手可及

YOLOv12 是“你只看一次”&#xff08;You Only Look Once, YOLO&#xff09;系列的最新版本&#xff0c;于 2025 年 2 月发布。它引入了注意力机制&#xff0c;提升了检测精度&#xff0c;同时保持了高效的实时性能。在保持速度的同时&#xff0c;显著提升了检测精度。例如&am…

【原创】C# HttpClient 读取流数据的问题

默认情况下HttpClient中有缓存&#xff0c;在读取流数据的时候&#xff0c;往往要等一小会儿&#xff0c;然后读出一大堆。 我们在请求OpenAI类的大模型的时候&#xff0c;往往要一边读取一边显示&#xff08;输出&#xff09;&#xff0c;这时候需要禁止HttpClient 中内置的缓…

能源行业标杆:信创系统在智能电网中的3个创新应用案例

在当今数字化浪潮汹涌澎湃的时代&#xff0c;信息技术应用创新&#xff08;信创&#xff09;已成为推动我国经济社会发展的重要引擎。智能电网作为能源行业的核心领域&#xff0c;其信息化建设对于保障国家能源安全和促进能源转型具有重要意义。今天&#xff0c;让我们一同探索…

AcWing 蓝桥杯集训·每日一题2025·5526. 平衡细菌

5526. 平衡细菌 题意 给定一个序列 ( a i ) (a_i) (ai​)&#xff0c;每次操作可以选择一个位置 (p)&#xff0c;令从 ( a p ) (a_p) (ap​) 开始的每个数都加上一个以 (1) 或者 (-1) 为公差的从 ( 1 / − 1 ) (1 / -1) (1/−1) 开始的等差数列。求最小化让序列归零的操作…

PTA 7-6 列出连通集

题目详情&#xff1a; 给定一个有 n 个顶点和 m 条边的无向图&#xff0c;请用深度优先遍历&#xff08;DFS&#xff09;和广度优先遍历&#xff08;BFS&#xff09;分别列出其所有的连通集。假设顶点从 0 到 n−1 编号。进行搜索时&#xff0c;假设我们总是从编号最小的顶点出…

ES中数据刷新策略refresh

在 Elasticsearch 中&#xff0c;插入数据时的 refresh 参数控制文档在写入后何时对搜索可见&#xff0c;其行为直接影响数据可见性和系统性能。以下是 refresh 参数的三个可选值&#xff08;true、false、wait_for&#xff09;的详细说明及适用场景&#xff1a; 1. refreshtr…

用Python的Pandas库解锁数据科学:从入门到实战

用Python的Pandas库解锁数据科学&#xff1a;从入门到实战 引言 Python的Pandas库&#xff08;名称源自"Panel Data"&#xff09;作为数据科学生态系统的基石&#xff0c;凭借其强大的数据结构和灵活的操作功能&#xff0c;已成为全球超过90%数据工作者的首选工具。…