Springboot整合Netty简单实现1对1聊天(vx小程序服务端)

本文功能实现较为简陋,demo内容仅供参考,有不足之处还请指正。

背景

一个小项目,用于微信小程序的服务端,需要实现小程序端可以和他人1对1聊天

实现功能

Websocket、心跳检测、消息持久化、离线消息存储

Netty配置类

/*** @author Aseubel*/
@Component
@Slf4j
@EnableConfigurationProperties(NettyServerConfigProperties.class)
public class NettyServerConfig {private ChannelFuture serverChannelFuture;// 心跳间隔(秒)private static final int HEARTBEAT_INTERVAL = 15;// 读超时时间private static final int READ_TIMEOUT = HEARTBEAT_INTERVAL * 2;// 使用线程池管理private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);private final EventLoopGroup workerGroup = new NioEventLoopGroup();private final NettyServerConfigProperties properties;// 由于在后面的handler中有依赖注入类,所以要通过springboot的ApplicationContext来获取Bean实例@Autowiredprivate ApplicationContext applicationContext;public NettyServerConfig(NettyServerConfigProperties properties) {this.properties = properties;}@PostConstructpublic void startNettyServer() {// 使用独立线程启动Netty服务new Thread(() -> {try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();SSLContext sslContext = SslUtil.createSSLContext("PKCS12",properties.getSslPath(), properties.getSslPassword());// SSLEngine 此类允许使用ssl安全套接层协议进行安全通信SSLEngine engine = sslContext.createSSLEngine();engine.setUseClientMode(false);pipeline.addLast(new SslHandler(engine)); // 设置SSLpipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(10 * 1024 * 1024));// 最大10MBpipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpHandler());// 只有text和binarytext的帧能经过WebSocketServerProtocolHandler,所以心跳检测这两个都得放前面pipeline.addLast(new IdleStateHandler(READ_TIMEOUT, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HeartbeatHandler());pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10 * 1024 * 1024));pipeline.addLast(applicationContext.getBean(MessageHandler.class));pipeline.addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 统一处理所有未被前面handler捕获的异常log.error("全局异常捕获: {}", cause.getMessage());ctx.channel().close();}});}});serverChannelFuture = bootstrap.bind(properties.getPort()).sync();// 保持通道开放serverChannelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}@PreDestroypublic void stopNettyServer() {// 优雅关闭if (serverChannelFuture != null) {serverChannelFuture.channel().close();}bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}

Handler

心跳检测

/*** @author Aseubel*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {private static final int HEARTBEAT_INTERVAL = 15; // 心跳间隔(秒)private static final int MAX_MISSED_HEARTBEATS = 2; // 允许丢失的心跳次数// 记录每个连接的丢失心跳次数private final Map<ChannelId, Integer> missedHeartbeats = new ConcurrentHashMap<>();@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 添加 IdleStateHandler 触发读空闲事件ctx.pipeline().addLast(new IdleStateHandler(HEARTBEAT_INTERVAL * MAX_MISSED_HEARTBEATS, 0, 0));scheduleHeartbeat(ctx);}private void scheduleHeartbeat(ChannelHandlerContext ctx) {ctx.executor().scheduleAtFixedRate(() -> {if (ctx.channel().isActive()) {ctx.writeAndFlush(new PingWebSocketFrame(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)));// 记录丢失的心跳次数missedHeartbeats.compute(ctx.channel().id(), (k, v) -> v == null ? 1 : v + 1);}}, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg instanceof PongWebSocketFrame) {// 收到 Pong 后重置丢失计数missedHeartbeats.remove(ctx.channel().id());ctx.fireChannelRead(msg); // 传递消息给后续处理器} else {ctx.fireChannelRead(msg);}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {int missed = missedHeartbeats.getOrDefault(ctx.channel().id(), 0);if (missed >= MAX_MISSED_HEARTBEATS) {// 超过最大丢失次数,关闭连接System.out.println("连接超时,关闭连接" + ctx.channel().id().asLongText());ctx.close();cleanOfflineResources(ctx.channel());}}}private void cleanOfflineResources(Channel channel) {MessageHandler.removeUserChannel(channel);missedHeartbeats.remove(channel.id());}
}

处理http请求,建立连接

/*** @author Aseubel* @description 处理websocket连接请求,将code参数存入channel的attribute中* @date 2025-02-21 15:34*/
public class HttpHandler extends ChannelInboundHandlerAdapter {public static final AttributeKey<String> WS_TOKEN_KEY = AttributeKey.valueOf("code");@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 判断是否是连接请求if (msg instanceof FullHttpRequest) {FullHttpRequest request = (FullHttpRequest) msg;try {QueryStringDecoder decoder = new QueryStringDecoder(request.uri());ctx.channel().attr(WS_TOKEN_KEY).set(decoder.parameters().get("code").get(0));} catch (Exception e) {throw new AppException("非法的websocket连接请求");}// 将 FullHttpRequest 转发到 MessageHandlerctx.fireChannelRead(request);// 重新设置 uri,将请求转发到 websocket handler,否则无法成功建立连接request.setUri("/ws");}// 消息直接交给下一个 handlersuper.channelRead(ctx, msg);}}

 消息处理

/*** @author Aseubel* @description 处理 WebSocket 消息* @date 2025-02-21 15:33*/
@Component
@Slf4j
@Sharable
public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {public static final AttributeKey<String> WS_TOKEN_KEY = AttributeKey.valueOf("code");public static final AttributeKey<String> WS_USER_ID_KEY = AttributeKey.valueOf("userId");private static final Map<String, Queue<WebSocketFrame>> OFFLINE_MSGS = new ConcurrentHashMap<>();private static final Map<String, Channel> userChannels = new ConcurrentHashMap<>();@Autowiredprivate ThreadPoolTaskExecutor threadPoolExecutor;@Resourceprivate IMessageRepository messageRepository;// 提供受控的访问方法public static void removeUserChannel(Channel channel) {userChannels.values().remove(channel);}public static boolean containsUser(String userId) {return userChannels.containsKey(userId);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object req) throws Exception {if (req instanceof FullHttpRequest) {String code = getCodeFromRequest(ctx); // 从请求中提取 codeString userId = getOpenid(APPID, SECRET, code);    // 验证 code 获取 openiduserChannels.put(userId, ctx.channel());ctx.channel().attr(WS_USER_ID_KEY).set(userId);System.out.println("客户端连接成功,用户id:" + userId);// 由于这里还在处理握手请求也就是建立连接,所以需要延迟发送离线消息new Thread(() -> {try {Thread.sleep(50);OFFLINE_MSGS.getOrDefault(userId, new LinkedList<>()).forEach(ctx::writeAndFlush);OFFLINE_MSGS.remove(userId);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();} else if (req instanceof TextWebSocketFrame ) {this.channelRead0(ctx, (TextWebSocketFrame) req);} else {ctx.fireChannelRead(req);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {if (frame instanceof TextWebSocketFrame) {MessageEntity message = validateMessage(ctx.channel().attr(WS_USER_ID_KEY).get(), (TextWebSocketFrame) frame);saveMessage(message);sendOrStoreMessage(message.getToUserId(), frame);} else {ctx.close();}}// 处理连接断开@Overridepublic void channelInactive(ChannelHandlerContext ctx) {System.out.println("客户端断开连接,用户id:" + ctx.channel().attr(WS_USER_ID_KEY).get());Channel channel = ctx.channel();for (Map.Entry<String, Channel> entry : userChannels.entrySet()) {if (entry.getValue() == channel) {userChannels.remove(entry.getKey());break;}}}private MessageEntity validateMessage(String userId, TextWebSocketFrame textFrame) {String message = textFrame.text();try {JsonObject json = JsonParser.parseString(message).getAsJsonObject();String toUserId = json.get("toUserId").getAsString();String content = json.get("content").getAsString();String type = json.get("type").getAsString();if (type.equals("text") || type.equals("image")) {return new MessageEntity(userId, toUserId, content, type);} else {throw new AppException("非法的消息类型!");}} catch (Exception e) {throw new AppException("非法的消息格式!");}}private void sendOrStoreMessage(String toUserId, WebSocketFrame message) {if (isUserOnline(toUserId)) {Channel targetChannel = userChannels.get(toUserId);if (targetChannel != null && targetChannel.isActive()) {targetChannel.writeAndFlush(message.retain());}} else {// 存储原始WebSocketFrame(需保留引用)OFFLINE_MSGS.computeIfAbsent(toUserId, k -> new LinkedList<>()).add(message.retain());}}private void saveMessage(MessageEntity message) {threadPoolExecutor.execute(() -> {messageRepository.saveMessage(message);});}private boolean isUserOnline(String userId) {return userChannels.containsKey(userId);}private String getCodeFromRequest(ChannelHandlerContext ctx) {String code = ctx.channel().attr(WS_TOKEN_KEY).get();// 检查 code 参数是否存在且非空if (code == null || code.isEmpty()) {throw new IllegalArgumentException("WebSocket token  is missing or empty");}return code;}private String getOpenid(String appid, String secret, String code) {Map<String, String> paramMap = new HashMap<>();paramMap.put("appid", appid);paramMap.put("secret", secret);paramMap.put("js_code", code);paramMap.put("grant_type", "authorization_code");String result = HttpClientUtil.doGet(WX_LOGIN, paramMap);//获取请求结果JSONObject jsonObject = JSON.parseObject(result);String openid = jsonObject.getString("openid");//判断openid是否存在if (StringUtils.isEmpty(openid)) {throw new WxException(jsonObject.getString("errcode"), jsonObject.getString("errmsg"));}return openid;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {if (cause instanceof AppException appCause) {log.error("AppException caught: {}", appCause.getInfo());} else if (cause instanceof WxException wxCause) {log.error("WxException caught: {}", wxCause.getMessage());} else {log.error("Exception caught: {}", cause.getMessage(), cause);}ctx.close(); // 建议关闭发生异常的连接}}

连接及消息格式:

wss://127.0.0.1:21611/ws?code=xxxxxx{"toUserId": "1001","type": "text","content": "Hello World!"
}

规定了type只有text和image两种,text为文本content,image则为Base64编码格式

本文功能实现较为简陋,demo内容仅供参考,可能有注释错误或设计不合理的地方

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/75607.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GitLab 中文版17.10正式发布,27项重点功能解读【二】

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署极狐GitLab。 学习极狐GitLab 的相关资料&#xff1a; 极狐GitLab 官网极狐…

好消息!软航文档控件(NTKO WebOffice)在Chrome 133版本上提示扩展已停用的解决方案

软航文档控件现有版本依赖Manifest V2扩展技术支持才能正常运行&#xff0c;然而这个扩展技术到2025年6月在Chrome高版本上就彻底不支持了&#xff0c;现在Chrome 133开始的版本已经开始弹出警告&#xff0c;必须手工开启扩展支持才能正常运行。那么如何解决这个技术难题呢&…

字典树与01trie

字典树简介 当我们通过字典查一个字或单词的时候&#xff0c;我们会通过前缀或关键字的来快速定位一个字的位置&#xff0c;进行快速查找。 字典树就是类似字典中索引表的一种数据结构&#xff0c;能够帮助我们快速定位一个字符串的位置。 字典树是一种存储字符串的数据结构…

二十五、实战开发 uni-app x 项目(仿京东)- 前后端轮播图

定义了一个名为 Swiper 的Java类,用于表示一个轮播图实体。它使用了 Jakarta Persistence API (JPA) 来映射数据库表,并使用了 Lombok 库来简化代码。以下是对代码的详细讲解: 1. 包声明 package com.jd.jdmall.model; 这行代码声明了该类所在的包路径为 com.jd.jdmall.mode…

游戏摇杆开发:利用 Windows API 实现摇杆输入捕获

在现代游戏开发中&#xff0c;游戏摇杆&#xff08;Joystick&#xff09;作为一种重要的输入设备&#xff0c;能够为玩家提供更加沉浸式的游戏体验。Windows 操作系统提供了一系列 API 函数&#xff0c;允许开发者轻松地捕获和处理游戏摇杆的输入。本文将介绍如何使用 Windows …

Ceph集群2025(Squid版)快速对接K8S cephFS文件存储

ceph的块存储太简单了。所以不做演示 查看集群 创建一个 CephFS 文件系统 # ceph fs volume create cephfs01 需要创建一个子卷# ceph fs subvolume create cephfs01 my-subvol -----------------#以下全部自动创建好 # ceph fs ls name: cephfs01, metadata pool: c…

Python中数据结构元组详解

在Python中&#xff0c;元组&#xff08;Tuple&#xff09;是一种不可变的序列类型&#xff0c;常用于存储一组有序的数据。与列表&#xff08;List&#xff09;不同&#xff0c;元组一旦创建&#xff0c;其内容无法修改。本文将详细介绍元组的基本操作、常见运算、内置函数以及…

游戏引擎学习第183天

回顾和今天的计划 我对接下来的进展感到非常兴奋。虽然我们可能会遇到一些问题&#xff0c;但昨天我们差不多完成了将所有内容迁移到新的日志系统的工作&#xff0c;我们正在把一些内容整合进来&#xff0c;甚至是之前通过不同方式记录时间戳的旧平台层部分&#xff0c;现在也…

Spring 如何处理循环依赖

在 Spring 框架里&#xff0c;循环依赖指的是多个 Bean 之间相互依赖&#xff0c;从而形成一个闭环。例如&#xff0c;Bean A 依赖 Bean B&#xff0c;而 Bean B 又依赖 Bean A。Spring 主要通过三级缓存机制来处理循环依赖&#xff0c;下面详细介绍相关内容。 1. 三级缓存的定…

Android开发layer-list

Android开发layer-list 它的用处可以在drawable上进行多图拼接&#xff0c;比如启动页&#xff0c;不想图片被拉伸就这么做。还有做某些线突出来。 示例代码&#xff1a; <?xml version"1.0" encoding"utf-8"?> <layer-list xmlns:android&q…

手机测试,工作中学习

要学习各种机型的截图方式、开发模式在哪。 荣耀机型&#xff1a;截图&#xff1a;关节快速敲两下。开发者模式在“系统和更新”里。 1.出现缺陷&#xff0c;需要获取日志。 学习adb生成日志&#xff1a;当测试中出现缺陷的&#xff0c;使用adb logcat -d > d:/log.txt …

OBS虚拟背景深度解析:无需绿幕也能打造专业教学视频(附插件对比)

想要录制教学视频却苦于背景杂乱&#xff1f;本文将手把手教你用OBS实现专业级虚拟背景效果&#xff0c;无需绿幕也能轻松营造沉浸式教学场景。文末附6个提升画面质感的免费背景资源&#xff01; 一、虚拟背景的核心价值&#xff1a;从「教师宿舍」到「虚拟讲堂」的蜕变 我们调…

零基础搭建智能法律知识库!腾讯云HAI实战教程

为什么需要法律知识库&#xff1f; 想象一下&#xff0c;你的所有法律文件都在手边&#xff0c;随时可以搜索和分析。这就是法律知识库的魅力所在。对于法律专业人士、处理大量法律文档的企业&#xff0c;甚至是希望了解法律事项的普通人来说&#xff0c;法律知识库都是一个不…

Rust从入门到精通之进阶篇:19.Rust 生态系统

Rust 生态系统 Rust 拥有一个丰富而活跃的生态系统&#xff0c;提供了各种库和框架来支持不同领域的开发。在本章中&#xff0c;我们将探索 Rust 生态系统中的主要组件&#xff0c;了解常用的库和工具&#xff0c;以及如何在项目中有效地使用它们。 Rust 包管理&#xff1a;C…

前端面试:如何去衡量用户操作过程中否卡顿?

衡量用户在应用中的操作是否卡顿是前端开发中的一个关键任务&#xff0c;涉及用户体验的各个方面。以下是一些常用的方法和工具&#xff0c;可以帮助你有效地评估用户操作中的卡顿情况&#xff1a; 1. 使用性能分析工具 浏览器开发者工具&#xff1a;大多数现代浏览器&#xf…

Python技术栈与数据可视化创意实践详解(三)

Python在数据可视化领域凭借丰富的库和灵活的生态系统&#xff0c;能够实现从基础图表到复杂交互式可视化的全场景覆盖。以下从技术选型、创意实现到实战优化进行系统化解析&#xff0c;并提供可直接落地的代码示例。 一、Python数据可视化技术栈 1. 基础与统计可视化 Matplotl…

订票系统|基于Java+vue的火车票订票系统(源码+数据库+文档)

订票系统目录 基于Springbootvue的火车票订票系统 一、前言 二、系统设计 三、系统功能设计 1会员信息管理 2 车次信息管理 3订票订单管理 4留言板管理 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介绍…

Snowflake 算法的实现

snowflake(雪花算法)是一个开源的分布式 ID 生成算法&#xff0c;结果是一个 long 型的 ID。snowflake 算法将 64bit 划分为多段&#xff0c;分开来标识机器、时间等信息&#xff0c;具体组成结构如下图所示&#xff1a; snowflake 算法的核心思想是使用 41bit 作为毫秒数&…

C 语言中, scanf 函数在哪些情况下会结束输入读取:

在 C 语言中&#xff0c; scanf 函数在以下几种情况下会结束输入读取&#xff1a; &#xff1a; 1. 遇到指定格式匹配失败&#xff1a; scanf 按照格式字符串要求读取输入。当输入数据格式与格式字符串不匹配时&#xff0c;就会结束读取。例如 scanf(“%d”, &num) 要求输…

括号合法题

一、括号合法题 2116. 判断一个括号字符串是否有效 //采用从左往右和从右往左遍历的贪心算法&#xff0c;分别保证前缀合法&#xff0c;后缀合法。public boolean canBeValid(String s, String locked) {int ns.length();if (n%21) return false;int num0;// 从左到右扫描&…