Netty服务器结合WebSocke协议监听和接收数据

目录

  • 1.pom依赖
  • 2.配置属性
  • 3.创建netty服务器
  • 4.建立监听和响应
  • 5.创建启动器
  • 6.前端static下页面
  • 7.前端js
  • 8.注意异常问题
  • 9.创建netty服务器--使用守护线程

1.pom依赖

        <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><!-- 根据需要选择版本 --><version>4.1.86.Final</version> </dependency>

2.配置属性

application.properties

#启动端口
server.port=8088
server.servlet.context-path=/rxtxcommon
#logging.level.web=DEBUGnetty.server.port=8023
//根据实际情况分配
netty.server.bossThreads=1
//根据实际情况分配
netty.server.workerThreads=4

3.创建netty服务器

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;import java.util.Objects;
import java.util.concurrent.TimeUnit;@Slf4j
public class NettyServer {private Integer port;private Integer boosThreads = 1;private Integer workerThreads;private Channel channel;private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;public NettyServer(Integer port, Integer boosThreads, Integer workerThreads) throws InterruptedException{this.port = port;this.boosThreads = boosThreads;this.workerThreads = workerThreads;this.init();}private void init() throws InterruptedException {//1.创建一个服务端的引导类ServerBootstrap bootstrap = new ServerBootstrap();//2.创建反应器事件轮询组//boss轮询组(负责处理父通道:连接/接收监听(如NioServerSocketChannel))bossGroup = new NioEventLoopGroup(boosThreads);//workerGroup轮询组(负责处理子通道:读/写监听(如NioSocketChannel))if(Objects.nonNull(this.workerThreads) && this.workerThreads > 0){workerGroup= new NioEventLoopGroup(this.workerThreads);}else{//线程数默认为cpu核心数的2倍workerGroup = new NioEventLoopGroup();}//3.设置父子轮询组bootstrap.group(bossGroup, workerGroup);//4.设置传输通道类型,Netty不仅支持Java NIO,也支持阻塞式的OIObootstrap.channel(NioServerSocketChannel.class);//5.设置监听端口bootstrap.localAddress(new InetSocketAddress(this.port));//6.设置通道参数bootstrap.option(ChannelOption.SO_KEEPALIVE, true);bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);//7.装配子通道的Pipeline流水线bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){@Overrideprotected void initChannel(SocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();//添加对byte数组的编解码,是由netty提供的//        pipeline.addLast(new ByteArrayDecoder()); //入站//        pipeline.addLast(new ByteArrayEncoder()); //出站pipeline.addLast("http-codec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//负责WebSocket协议相关的握手处理和数据帧的编解码。当你创建一个WebSocket服务器时,这个处理器会处理来自客户端的WebSocket握手请求,完成握手过程,并在握手成功后,将连接转换为WebSocket连接。参数 "/websocket" 通常代表WebSocket连接的路径pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));pipeline.addLast("http-chunked", new ChunkedWriteHandler());//添加自定义处理器 pipeline.addLast(new SocketInboundHandler());}});//8.开始绑定端口,并通过调用sync()同步方法阻塞直到绑定成功ChannelFuture future = bootstrap.bind().sync();log.info("服务端启动成功,监听端口:{}", this.port);channel=future.channel();//9.自我阻塞,直到监听通道关闭ChannelFuture closeFuture = future.channel().closeFuture();closeFuture.sync();log.info("服务端已停止运行");//10.释放所有资源,包括创建的反应器线程workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();log.info("已释放服务端占用资源");}@PreDestroypublic void stop() {System.out.println("关闭netty服务器");if (channel != null) {channel.close();}if (workerGroup != null) {workerGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}if (bossGroup != null) {bossGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}}}

4.建立监听和响应

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class SocketInboundHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//主要用于处理接收数据的某些事件。它支持泛型的消息处理,并在默认情况下,当消息处理完毕后会自动释放。/*** 读取客户端发送来的消息*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器返回"+textWebSocketFrame.text()));}/*** 接入客户端*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("新客户端连接:{}", ctx.channel().id().asShortText());}/*** 断开客户端*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("客户端连接断开:{}", ctx.channel().id().asShortText());}/*** 异常处理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("客户端异常:{}", cause.getMessage(), cause);ctx.channel().close();}
}

5.创建启动器

@Slf4j
@Component
public class NettyStarter implements ApplicationRunner {
//ApplicationRunner 实现这个接口,不会阻断springboot启动应用线程的阻塞@Value("${netty.server.port}")private Integer port;@Value("${netty.server.bossThreads}")private Integer bossThreads;@Value("${netty.server.workerThreads:-1}")private Integer workerThreads;@Overridepublic void run(ApplicationArguments args) throws Exception {try{new NettyServer(port, bossThreads, workerThreads);}catch (Exception e){log.info("服务端启动异常:{}", e.getMessage(), e);}}
}

6.前端static下页面

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>WebSocket Client</title><script src="sockt.js"></script>
</head>
<body>
<h1>WebSocket Client</h1>
<button onclick="connect()">Connect1</button>
<button onclick="sendMessage()">Send Message1</button>
<button onclick="connect2()">Connect2</button>
<button onclick="sendMessage2()">Send Message2</button>
<ul id="messages"></ul></body>
</html>

7.前端js


var ws;
var ws2;
function connect() {debugger// 假设你的WebSocket服务器运行在以下URL,并且WebSocket的端点是"/websocket"var wsUrl = 'ws://localhost或者ip地址:8023/websocket';// 创建WebSocket连接ws = new WebSocket(wsUrl);// 监听连接打开事件ws.onopen = function(event) {console.log('WebSocket连接已打开');displayMessage('Connected to server.');};// 监听从服务器接收到的消息事件ws.onmessage = function(event) {console.log('收到来自服务器的消息:', event.data);displayMessage('Received: ' + event.data);};// 监听连接关闭事件ws.onclose = function(event) {console.log('WebSocket连接已关闭');displayMessage('Connection closed.');};// 监听连接错误事件ws.onerror = function(error) {console.error('WebSocket发生错误', error);displayMessage('WebSocket Error: ' + error.message);};
}function connect2() {debugger// 假设你的WebSocket服务器运行在以下URL,并且WebSocket的端点是"/websocket"var wsUrl = 'ws://localhost或者ip地址:8023/websocket';// 创建WebSocket连接ws2 = new WebSocket(wsUrl);// 监听连接打开事件ws2.onopen = function(event) {console.log('WebSocket连接已打开');displayMessage('Connected to server.');};// 监听从服务器接收到的消息事件ws2.onmessage = function(event) {console.log('收到服务器2信息:', event.data);displayMessage('收到服务器2信息: ' + event.data);};// 监听连接关闭事件ws2.onclose = function(event) {console.log('WebSocket连接已关闭');displayMessage('Connection closed.');};// 监听连接错误事件ws2.onerror = function(error) {console.error('WebSocket发生错误', error);displayMessage('WebSocket Error: ' + error.message);};
}function sendMessage() {if (ws.readyState === WebSocket.OPEN) {var message = prompt('请输入要发送的消息:');ws.send(message);displayMessage('Sent: ' + message);} else {displayMessage('WebSocket连接未打开,请先连接。');}
}
function sendMessage2() {if (ws2.readyState === WebSocket.OPEN) {var message = prompt('请输入要发送的消息:');ws2.send(message);displayMessage('Sent: ' + message);} else {displayMessage('WebSocket连接未打开,请先连接。');}
}
function displayMessage(message) {var messagesList = document.getElementById('messages');var li = document.createElement('li');li.textContent = message;messagesList.appendChild(li);
}

8.注意异常问题

1.不用InitializingBean接口初始化–阻塞springboot应用线程启动创建
2.不用 @PostConstruct执行初始化方法建立接口绑定–阻塞springboot应用线程启动创建

在类的方法上使用@PostConstruct注解,可以确保该方法在依赖注入完成后立即执行。这也可以作为启动Netty服务器的一个点。但是,需要注意的是,@PostConstruct在单例Bean的初始化阶段执行,可能早于ApplicationRunner或CommandLineRunner。
3.用异步注解@Async–没有尝试过
3.创建netty过程中,启动时用ApplicationRunner --正确
4.创建netty过程中,启动时用CommandLineRunner–正确
无论选择哪种方式,都需要确保Netty服务器的启动不会阻塞Spring Boot的主线程。如果需要,可以考虑使用线程池来管理Netty的IO操作,以避免阻塞。

9.创建netty服务器–使用守护线程

import com.groupname.rxtxcommon.socket.handler.WebSocketHandler2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.TimeUnit;@Component
//public class NettyWebSocketServer implements ApplicationRunner {
public class NettyWebSocketServer {private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private Channel channel;private static Thread server;
//    @PostConstruct 这个注解会阻塞springboot主线程运行@PostConstructpublic synchronized void start() {if (server!=null) return;server = new Thread(() -> {bossGroup = new NioEventLoopGroup(1);workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(65536));pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));// 添加你的自定义HTTP处理器pipeline.addLast(new WebSocketHandler2());}});ChannelFuture future = bootstrap.bind(8023).sync();// 阻塞当前线程,直到服务器Channel关闭System.out.println("开启netty服务器111");channel=future.channel();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();stop();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}});server.setDaemon(true);server.start();}@PreDestroypublic void stop() {System.out.println("关闭netty服务器");if (channel != null) {channel.close();}if (workerGroup != null) {workerGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}if (bossGroup != null) {bossGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}}//    @Override
//    public void run(ApplicationArguments args) throws Exception {
//        start();
//    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/753124.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring集成hazelcast实现分布式缓存

1.Hazelcast介绍 Hazelcast是Hazelcast公司开源的一款分布式内存数据库产品&#xff0c;提供弹性可扩展、高性能的分布式内存计算。并通过提供诸如Map&#xff0c;Queue&#xff0c;ExecutorService&#xff0c;Lock和JCache等Java的许多开发人员友好的分布式实现。 Hazelcast优…

DM数据库(docker)

docker安装 安装必要的系统工具 yum install -y yum-utils device-mapper-persistent-data lvm2 配置阿里云Docker Yum源: yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo 更新yum缓存 yum makecache fast 安装docker-CE: y…

微机二次消谐装置是怎样运行的

微机二次消谐装置是怎样运行的&#xff1f;微机二次消谐装置是一种用于电力系统中的无功补偿和谐波控制的装置&#xff0c;它采用先进的数字信号处理技术和微机控制技术&#xff0c;能够实现对电力系统中的无功功率和谐波进行快速有效的控制和补偿&#xff0c;以保证电力系统的…

Kotlin入门基础知识

前言 实践是最好的学习方式&#xff0c;技术也如此。 文章目录 前言1、安装 Java 和 Kotlin 环境2、程序基本结构3、数据类型1&#xff09;基本数据类型2&#xff09;布尔类型3&#xff09;字符类型4&#xff09;字符串类型 1、安装 Java 和 Kotlin 环境 2、程序基本结构 fun…

【JavaScript】变量的解构赋值

变量的解构赋值用途很多。 &#xff08;1&#xff09;交换变量的值 let x 1; let y 2;[x, y] [y, x];上面代码交换变量x和y的值&#xff0c;这样的写法不仅简洁&#xff0c;而且易读&#xff0c;语义非常清晰。 &#xff08;2&#xff09;从函数返回多个值 函数只能返回…

Issue 2046:Missing array size check in NewFixedArray

文章目录 环境搭建漏洞分析漏洞触发 漏洞利用总结参考 环境搭建 sudo apt install pythongit reset --hard 64cadfcf4a56c0b3b9d3b5cc00905483850d6559 export DEPOT_TOOLS_UPDATE0 gclient sync -D// debug version tools/dev/v8gen.py x64.debug ninja -C out.gn/x64.debug/…

备战蓝桥杯Day29 - 贪心-活动选择问题

问题描述 假设有n个活动&#xff0c;这些活动要占用同一片场地&#xff0c;而场地在某时刻只能供一个活动使用。 每个活动都有一个开始时间 si 和结束时间 fi (题目中时间以整数表示) ,表示活动在[si, f)区间占用场地。 问:安排哪些活动能够使该场地举办的活动的个数最多? 解…

FDA: 用于语义分割的傅里叶域自适应

论文链接&#xff1a;https://arxiv.org/abs/2004.05498 代码链接&#xff1a;GitHub - YanchaoYang/FDA: Fourier Domain Adaptation for Semantic Segmentation 机构&#xff1a;UCLA 发表于2020CVPR 这篇文章别的地方略读了&#xff0c;主要看看方法&#xff0c;感兴趣自…

【C语言】命令行参数;终止程序

终端命令行参数 On many systems, it is possible to pass arguments to main from a command line by including parameters int argc and char argv[] in the parameter list of main. Parameter argc receives the number of command-line arguments. Parameter argv is an …

如何理解“高频信息/高级语义”和“低频信息/低级语义”?

如何区分高频信息和低频信息&#xff1f; 如果一个东西是高度离散化和语义化的&#xff0c;一个字的差异也可能导致词语之间的含义发生重大变化&#xff0c;就是高频东西。例如一句话&#xff0c;如果你改变了一个单词&#xff0c;这个句子就会变成其他的意思。还有就是经过en…

部署高斯喷射项目gaussian-splatting

硬件要求 支持 CUDA 的 GPU&#xff0c;具有 7.0 的计算能力24 GB VRAM 软件要求 Conda用于 PyTorch 扩展的 C 编译器&#xff08;Visual Studio 2019&#xff09; CUDA SDK 11 for PyTorch 扩展&#xff0c;在 Visual Studio 之后安装C 编译器和 CUDA SDK 必须兼容 拉取源码 …

Poly Kernel Inception Network在遥感检测中的应用

摘要 https://export.arxiv.org/pdf/2403.06258 遥感图像&#xff08;RSI&#xff09;中的目标检测经常面临一些日益严重的挑战&#xff0c;包括目标尺度的巨大变化和多样的上下文环境。先前的方法试图通过扩大骨干网络的空间感受野来解决这些挑战&#xff0c;要么通过大核卷积…

.Net使用ElasticSearch

文章目录 前言主体内容一.Kibana中ElasticSearch的基础操作1.GET&#xff08;查询&#xff09;1.POST&#xff08;新增&#xff09;1.PUT&#xff08;修改&#xff09;1.DELET&#xff08;删除&#xff09; 二.在.Net中&#xff0c;对ElasticSearch进行基础操作1.DotNet连接Ela…

低代码与AI:构建面向未来的智能化应用

引言 在当今数字时代&#xff0c;技术的快速发展为各行各业带来了前所未有的机遇和挑战。企业和组织面临着如何迅速开发和交付高质量应用的需求&#xff0c;同时还需要应对日益复杂的业务需求和用户期望。在这样的背景下&#xff0c;低代码与人工智能&#xff08;AI&#xff0…

输送带的制造工艺

输送带的制造工艺 一、引言 输送带作为现代工业生产中不可或缺的物料运输工具&#xff0c;广泛应用于矿山、冶金、化工、电力、港口、粮食等各个行业。随着科技的发展&#xff0c;输送带的制造工艺也在不断进步&#xff0c;以满足日益增长的生产需求和运输效率。本文将详细介…

Python每日三道经典面试题(十三)

1.Python中的unittest是什么&#xff1f; unittest是Python内置的一个测试框架&#xff0c;也是Python标准库的一部分。它被设计用于支持自动化测试&#xff0c;包括单元测试、集成测试以及一些系统测试。unittest提供了丰富的测试构建、测试用例组织和测试运行功能&#xff0…

蓝桥杯可撤销并查集|查找|合并|撤销(C++)

前置知识 蓝桥杯并查集|路径压缩|合并优化|按秩合并|合根植物(C)-CSDN博客 可撤销并查集 关键注意 可撤销并查集的撤销功能如何实现可撤销并查集能不能用路径压缩 可撤销并查集(Reversible Union-Find)是一种扩展了标准并查集(Union-Find)数据结构的数据结构&#xff0c;它允…

高中数学:指数、对数、幂函数综合(拔高)

一、需要掌握的重要函数 1、第一组&#xff08;记住&#xff09; 例题 1、判断奇偶性 2、代值定象限 2、第二组&#xff08;记住&#xff09; 以下几个函数都是奇函数 3、常用知识点 1、找对称中心或对称轴 上加下减&#xff0c;左加右减 2、奇偶函数组合后的奇偶性 …

Excel数字签名技术总结

Excel数字签名 Excel数字签名【商业化】产品对比&#xff1a; Excel数字签名产品对比冰蓝科技GroupDocsconholdatemesciusaspose官网冰蓝科技 e-iceblue &#xff5c; 您的办公文档开发技术专家 &#xff5c; C#/VB.Net Excel, Word, PowerPoint, PDF, Barcode 组件Document …

京东获得JD商品详情 API

公共参数 名称类型必须描述keyString是免费申请调用key&#xff08;必须以GET方式拼接在URL中&#xff09;secretString是调用密钥api_nameString是API接口名称&#xff08;包括在请求地址中&#xff09;[item_search,item_get,item_search_shop等]cacheString否[yes,no]默认y…