8.基于netty实现群聊,心跳检测

【README】

1.本文总结自B站《netty-尚硅谷》,很不错;

2.本文po出了 Unpooled创建缓冲区的 代码示例;

3.本文示例代码基于netty实现以下功能:

  • 群聊客户端及服务器;
  • 心跳检测;

【1】Unpooled创建缓冲区

Unpooled定义:

  • 是Netty 提供的一个专门用来操作缓冲区(即Netty的数据容器)的工具

【1.1】Unpooled.buffer-申请给定容量的缓冲区

1)Unpooled.buffer(capacity) 定义:

public static ByteBuf buffer(int initialCapacity) {return ALLOC.heapBuffer(initialCapacity);}

代码示例 :

public class NettyByteBuf61 {public static void main(String[] args) {// 创建一个对象,该对象包含一个数组 byte[10]// 在netty buf中,不需要像nio那样 执行flip 切换读写模式// 因为 netty buf,维护了一个 readerIndex 和 writerIndex,分别表示下一次要读入和写出的位置ByteBuf byteBuf = Unpooled.buffer(10);for (int i = 0; i < 10; i++) {byteBuf.writeByte(i); // writerIndex 自增}// 输出for (int i = 0; i < byteBuf.capacity(); i++) {System.out.printf(byteBuf.readByte() + " "); // readerIndex 自增
//            System.out.println(byteBuf.getByte(i));}// 查看 byteBuf 的类型-UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 10, widx: 10, cap: 10)System.out.println(byteBuf);}
}

【代码解说】

  • 在netty buf中,不需要像nio那样 执行flip 切换读写模式;
  • 因为 netty buf,维护了一个 readerIndex 和 writerIndex,分别表示下一次要读入和写出的位置;

运行结果:

0 1 2 3 4 5 6 7 8 9
UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 10, widx: 10, cap: 10)


【1.2】Unpooled.copiedBuffer() 创建buf 缓冲区

copiedBuffer(CharSequence string, Charset charset) 定义:

  • 通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于NIO中的ByteBuffer但有区别)
public static ByteBuf copiedBuffer(CharSequence string, Charset charset) {if (string == null) {throw new NullPointerException("string");}if (string instanceof CharBuffer) {return copiedBuffer((CharBuffer) string, charset);}return copiedBuffer(CharBuffer.wrap(string), charset);}

代码示例

public class NettyByteBuf62 {public static void main(String[] args) {// 通过 Unpooled.copiedBuffer  创建 buf缓冲区ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", StandardCharsets.UTF_8);// 1 使用相关方法-byteBuf.hasArray()if (byteBuf.hasArray()) {String content = new String(byteBuf.array(), StandardCharsets.UTF_8);System.out.println(content);// 查看ByteBuf的类型-UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 24)System.out.println("bytebuf = " + byteBuf);// 查看偏移量System.out.println("byteBuf.arrayOffset() = " + byteBuf.arrayOffset()); // 0// 查看 readerIndexSystem.out.println("byteBuf.readerIndex() = " + byteBuf.readerIndex()); // 0// 查看 writerIndexSystem.out.println("byteBuf.writerIndex() = " + byteBuf.writerIndex()); // 12// 查看 capacitySystem.out.println("byteBuf.capacity() = " + byteBuf.capacity());// 查看可读取的字节数量 12System.out.println("byteBuf.readableBytes() = " + byteBuf.readableBytes());// 使用for循环读取byteBuffor (int i = 0; i < byteBuf.readableBytes(); i++) {System.out.print((char)byteBuf.getByte(i));}System.out.println();// 读取 byteBuf 其中某一段,从下标4开始,读取6个字节CharSequence charSequence = byteBuf.getCharSequence(4, 6, StandardCharsets.UTF_8);System.out.println(charSequence);}}
}

运行结果:

hello world                      
bytebuf = UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 11, cap: 33)
byteBuf.arrayOffset() = 0
byteBuf.readerIndex() = 0
byteBuf.writerIndex() = 11
byteBuf.capacity() = 33
byteBuf.readableBytes() = 11
hello world
o worl


【2】netty群聊客户端与服务器

需求描述:

  1. 基于Netty 实现 多人群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
  2.  服务器端:可以监测用户上线,离线,并实现消息转发功能;
  3.  客户端: 通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(由服务器转发得到);

【2.1】netty服务器

1)群聊服务器代码

/*** @Description netty群聊服务器* @author xiao tang* @version 1.0.0* @createTime 2022年09月03日*/
public class NettyGroupChatServer63 {private int port;public NettyGroupChatServer63(int port) {this.port = port;}public static void main(String[] args) {try {new NettyGroupChatServer63(8089).run();} catch (InterruptedException e) {e.printStackTrace();}}public void run() throws InterruptedException {// 创建两个线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 服务器启动引导对象ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 获取pipelineChannelPipeline pipeline = socketChannel.pipeline();// 添加解码处理器 编码器pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());// 添加业务处理handlerpipeline.addLast(new NettyGroupChatServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();System.out.println("netty服务器启动成功");// 监听关闭channelFuture.channel().closeFuture().sync();} finally {// 优雅关闭线程bossGroup.shutdownGracefully().sync();workerGroup.shutdownGracefully().sync();}}
}

2)群聊服务器处理器

/*** @Description netty服务器处理器* @author xiao tang* @version 1.0.0* @createTime 2022年09月03日*/
public class NettyGroupChatServerHandler extends SimpleChannelInboundHandler<String> {// 定义一个 channel 组,用于管理channel// GlobalEventExecutor.INSTANCE 是全局事件执行器,是一个单例private static final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);// 读取数据并转发@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {// 获取当前channelChannel channel = ctx.channel();// 遍历 channelGroup, 根据不同情况 回送不同消息channelGroup.forEach(otherChannel-> {if (channel != otherChannel) { // 非当前channel, 直接转发otherChannel.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户 " + channel.remoteAddress() + "说:" + msg + "\n");} else { // 回显自己发送的消息给自己channel.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "自己说:" + msg + "\n");}});}// 一旦连接建立,第一个被执行// 将当前channel 添加到channelGroup@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();// 把客户端加入群组的信息发送到其他客户端channelGroup.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + channel.remoteAddress() + " 加入聊天");// 把当前channel 添加到 channel 组channelGroup.add(channel);}// 表示 channel 处于活动状态, 提示 xx 上线@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + ctx.channel().remoteAddress() + " 上线了");}// 表示 channel 处于离线状态, 提示 xx 离线@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + ctx.channel().remoteAddress() + " 离开了");}// 断开连接,把xx客户离开的信息推送给其他在线客户@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + channel.remoteAddress() + " 离开了");System.out.println("channelGroup.size() = " + channelGroup.size());}// 发送异常如何处理@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 关闭通道ctx.close();}
}

【2.2】netty客户端

1)群聊客户端代码:

/*** @Description netty群聊客户端* @author xiao tang* @version 1.0.0* @createTime 2022年09月03日*/
public class NettyGroupChatClient64 {/** 主机和端口 */private final String host;private final int port;/*** @description 构造器* @author xiao tang* @date 2022/9/3*/public NettyGroupChatClient64(String host, int port) {this.host = host;this.port = port;}public static void main(String[] args) {try {new NettyGroupChatClient64("127.0.0.1", 8089).run();} catch (InterruptedException e) {e.printStackTrace();}}public void run() throws InterruptedException {// 事件运行的线程池EventLoopGroup eventExecutors = new NioEventLoopGroup();try {// 客户端启动引导对象Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventExecutors).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();// 添加解码器 编码器pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());// 添加业务逻辑的 handlerpipeline.addLast(new NettyGroupChatClietnHandler());}});//  连接给定主机的端口,阻塞直到连接成功ChannelFuture channelFuture = bootstrap.connect(host, port).sync();// 得到 channelChannel channel = channelFuture.channel();System.out.println("----------" + channel.localAddress() + "----------");// 客户端需要输入信息,创建一个扫描器Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String msg = scanner.nextLine();// 通过channel 发送到服务器channel.writeAndFlush(msg);}} finally {// 关闭线程池,释放所有资源,阻塞直到关闭成功eventExecutors.shutdownGracefully().sync();}}
}

2)群聊客户端处理器代码:

/*** @Description netty群聊客户端处理器* @author xiao tang* @version 1.0.0* @createTime 2022年09月03日*/
public class NettyGroupChatClietnHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg.trim());}
}

【2.3】 运行结果:

1)服务器与客户端: 服务器1个,客户端3个;

2)客户端离线:

 


【3】netty心跳检测

【3.1】netty心跳检测概述

1)netty定义的空闲状态事件:

Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
* read, write, or both operation for a while.

当一个通道一段时间内没有执行 读,写,或读写操作时,就会触发 IdleStateEvent事件

2)需求描述:

  1. 编写一个 Netty心跳检测机制案例, 当服务器超过3秒没有读时,就提示读空闲;
  2. 当服务器超过5秒没有写操作时,就提示写空闲;
  3.  实现当服务器超过7秒没有读或者写操作时,就提示读写空闲;

【3.2】netty心跳检测代码实现

1)netty心跳检测服务器

/*** @Description netty心跳检测服务器* @author xiao tang* @version 1.0.0* @createTime 2022年09月03日*/
public class NettyHeartbeatCheckServer66 {public static void main(String[] args) {try {new NettyHeartbeatCheckServer66().run();} catch (InterruptedException e) {e.printStackTrace();}}public void run() throws InterruptedException {// 创建线程池执行器EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(8);try {// 服务器启动引导对象ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_KEEPALIVE, true).handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 添加处理器ChannelPipeline pipeline = socketChannel.pipeline();// 1. 添加空闲状态处理器 :// readerIdleTime: 表示多长时间没有读入io事件,就会发送一个心跳检测包,检测是否连接状态// writerIdleTime: 表示多长时间没有写出io事件,就会发送一个心跳检测包,检测是否连接状态// allIdleTime:   表示多长时间没有读入和写出io事件,就会发送一个心跳检测包,检测是否连接状态//  2. 文档说明// Triggers an {@link IdleStateEvent } when a {@link Channel} has not performed//     * read, write, or both operation for a while.// 3. 当 IdleStateEvent 事件触发后, 就会传递给管道的 下一个处理器 去处理// 通过调用下一个handler的 userEventTriggered 方法,即在该方法中处理IdleStateEvent 事件;pipeline.addLast(new IdleStateHandler(4, 5,  7, TimeUnit.SECONDS));// 添加一个对空闲检测 进一步处理的handler(自定义 )pipeline.addLast(new NettyHeartbeatCheckServerHandler());}});// 启动服务器,监听端口,阻塞直到启动成功ChannelFuture channelFuture = serverBootstrap.bind(8089).sync();// 阻塞直到channel关闭channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully().sync();workerGroup.shutdownGracefully().sync();}}
}

2)netty心跳检测服务器处理器

/*** @Description netty心跳检测服务器处理器* @author xiao tang* @version 1.0.0* @createTime 2022年09月04日*/
public class NettyHeartbeatCheckServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event2 = (IdleStateEvent) evt;String eventType = ""; // 事件类型switch (event2.state()) {case READER_IDLE: eventType = "读空闲"; break;case WRITER_IDLE: eventType = "写空闲"; break;case ALL_IDLE: eventType = "读写空闲"; break;}System.out.println("客户端" + ctx.channel().remoteAddress() + "--超时事件--" + eventType);System.out.println("服务器做相应处理");// 如果发生空闲,马上关闭通道
//            System.out.println("一旦发生超时事件,则关闭 channel");
//            ctx.channel().close();}}
}

【3.3】运行结果:

1)以 NettyGroupChatClient64 作为客户端连接到 服务器 NettyHeartbeatCheckServer66;

2)打印结果如下:

// 控制台打印结果
客户端/127.0.0.1:61278--超时事件--读空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--写空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--读写空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--读空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--写空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--读空闲
服务器做相应处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329373.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Oracle入门(十四.3)之创建PL / SQL块

一、PL / SQL块结构 一个PL / SQL块由三部分组成。PL / SQL块结构部分二、PL / SQL编译器用高级编程语言&#xff08;C&#xff0c;Java&#xff0c;PL / SQL等&#xff09;编写的每个程序都必须经过检查并转换为二进制代码&#xff08;1和0&#xff09;&#xff0c;然后才能执…

[.NET Core].NET Core R2安装及示例教程

前言 前几天.NET Core发布了.NET Core 1.0.1 R2 预览版&#xff0c;之前想着有时间尝试下.NET Core。由于各种原因&#xff0c;就没有初试。刚好&#xff0c;前几天看到.NET Core发布新版本了&#xff0c;决定要去一探究竟。于是乎&#xff0c;就立马去官网查找相关的信息&…

Oracle入门(十四.4)之在PL / SQL中使用变量

一、变量的使用 &#xff08;1&#xff09;使用变量的原因•临时存储数据 •储存值的操作 •可重用性&#xff08;2&#xff09;处理PL / SQL中的变量变量是&#xff1a; •在声明部分声明并初始化 •在可执行部分中使用并分配新值 变量可以是&#xff1a; •作为参数传递给PL …

dismiss的词组_法律英语常用词必记:Dismiss

Dismiss英[dɪsmɪs] 美[dɪsmɪs]【英文释义】v.to refuse to give further hearing to a case in court. If a judge dismisses a court case, they officially decide that the case should not continue【中文释义】v. (1)解雇&#xff1b;免职&#xff1b;开除(2)驳回(起诉…

构建高性能.NET应用之配高可用IIS服务器-第五篇 IIS常见问题之:工作进程回收机制(中)

我们在本篇中接着讲述“工作进程回收机制”。 本篇文章的议题如下&#xff1a; 工作进程回收机制讲解 基于时间的回收机制 基于请求数的回收机制 基于内存使用的回收机制 基于活动状态的回收机制 基于请求数的回收机制 这种基于请求数量回收的机制非常的好理解&#xff1a;当我…

10.netty客户端与服务器使用protobuf传输报文

【README】 本文总结自B站《尚硅谷-netty》&#xff0c;很不错&#xff1b; 内容如下&#xff1a; netty的编码器与解码器&#xff1b;netty客户端与服务器通过 protobuf 传输报文的开发方式&#xff1b;文末po出了所有代码&#xff1b;【1】netty的编码器与解码器 codec 1&…

Oracle入门(十四.5)之识别数据类型

一、PL / SQL数据类型 数据类型指定存储格式&#xff0c;约束和有效的值范围。 PL / SQL支持五类数据类型&#xff1a;二、标量数据类型&#xff08;Scalar&#xff09; &#xff08;1&#xff09;标量数据类型•保持一个值 •没有内部组件 •可以分为四类&#xff1a; - 性格…

TFS2015的CI集成

这篇应该是这个系列的最后一篇了 配置生成代理 配置dotnet cli环境 这步&#xff0c;需要在生成代理的机器上配置cli环境&#xff0c;与本地配置方法一致&#xff0c;可以自行Google 下载及参考地址&#xff1a; https://www.microsoft.com/net/core#windows 配置环境变量 在生…

pivot sqlserver 条件_行转列之SQLSERVERPIVOT与用法详解

在数据库操作中&#xff0c;有些时候我们遇到需要实现“行转列”的需求&#xff0c;例如一下的表为某店铺的一周收入情况表&#xff1a;WEEK_INCOME(WEEK VARCHAR(10),INCOME DECIMAL)我们先插入一些模拟数据&#xff1a;INSERT INTO WEEK_INCOMESELECT 星期一,1000UNION ALLSE…

Oracle入门(十四.6)之使用标量数据类型

一、声明字符变量 字符数据类型包括CHAR&#xff0c;VARCHAR2和LONG。 DECLAREv_emp_job VARCHAR2(9);v_order_no VARCHAR2(6);v_product_id VARCHAR2(10);v_rpt_body_part LONG; …二、声明数字变量 数字数据类型包括NUMBER&#xff0c;PLS_INTEGER&#xff0c;BINARY_INTEGER…

11.netty入站与出站(处理器链调用机制)

【README】 1.本文源代码总结自 B站《netty-尚硅谷》&#xff1b;2.本文部分内容总结自 https://www.baeldung.com/netty3.本文主要介绍了通道管道中多个入栈处理器与多个出站处理器如何执行&#xff1f;并用代码演示执行顺序&#xff1b; 补充&#xff1a;文末附带了 log4j整…

1688推广工具_全面了解1688数字营销

全面了解1688数字营销什么是数字营销&#xff1f;在数字营销中&#xff0c;有一款很重要的工具-一键推广。在我们的日常经营和大促中&#xff0c;通过一键站外推广可以在各大站外平台实现全店、各产品的推广&#xff0c;尤其在大促中&#xff0c;结合活动当天的站内流量&#x…

Entity Framework升级

第三篇是Entity Framework升级 修改project.json 把原来 EntityFramework 的包 换成 Microsoft.EntityFrameworkCore 版本从 7.0.0-rc1-final 改为 1.0.0-rc2-final 对照表如下&#xff1a; RC1 PackageRC2 EquivalentEntityFramework.MicrosoftSqlServer 7.0.0-rc1-finalMicro…

12.netty中tcp粘包拆包问题及解决方法

【README】 1.本文源代码总结自B站《netty-尚硅谷》&#xff1b;2.本文介绍了tcp粘包拆包问题&#xff1b;3.本文po 出了粘包拆包问题解决方案及源代码实现&#xff1b;【1】tcp粘包拆包问题 refer2 How to deal with the problem of packet sticking and unpacking during T…

Oracle入门(十四.7)之良好的编程习惯

一、目的良好的编程实践是可以遵循的技术来创建最佳代码。 编程实践涵盖了从编写更易读的代码到创建具有更快性能的代码。软件工程团队通常会遵循风格指南&#xff0c;以便团队中的每个人都使用相同的技术。 这使得读取和修改其他人编写的代码变得更加容易。二、编程实践您已经…

css 浏览器调试中不可见_前端入门必会的初级调试技巧

本文仅仅针对前端初学者&#xff0c;目的是【用20%不到的时间】 学会【前端最常用的部分调试技巧】&#xff0c;如果需要最详细的调试技巧&#xff0c;包括调试性能优化的相关知识&#xff0c;文末会补充最全的文档&#xff08;chrome devtool的官方文档链接&#xff09;初学一…

升级ASP.Net Core项目

升级完类库项目&#xff0c;第二篇&#xff0c;我们来升级ASP.Net Core项目 修改global.json与project.json 这里可以参照&#xff0c;升级.Net Core RC2的那些事&#xff08;一&#xff09; 这里补充一点就是如果你觉得这样修改复杂&#xff0c;你完全可以新建一个项目&#x…

gophp解释器_【干货】Gisp 解释器 Golang 辅助开发工具

Gisp 是一个提供给 golang 使用的 Lisp 类 DSL 解释器。在 Lisp 的基本语法基础上&#xff0c;针对 go 环境稍作了一点语法糖。主要目标是提供一个尽可能便于与 golang 互操作的微型DSL工具。简介Gisp用go语言编写&#xff0c;是一个DSL 解释器&#xff0c;这个 DSL 基本上就是…

Oracle入门(十四.8)之迭代控制:基本循环Loop

一、迭代控制&#xff1a;LOOP语句 循环多次重复一个语句或一系列语句。 PL / SQL提供了以下几种类型的循环&#xff1a;•没有全面条件执行重复操作的基本循环 •FOR循环&#xff0c;基于计数器执行迭代操作•WHILE循环根据条件执行重复操作二、基本循环LOOP语句的最简单形式…

phpst安装memcache扩展_在 Ubuntu/Debian 下安装 PHP7.3 教程

介绍最近的 PHP 7.3.0 已经在 2018 年12月6日 发布 GA&#xff0c;大家已经可以开始第一时间体验新版本了&#xff0c;这里先放出 PHP7.3 安装的教程以便大家升级。适用系统&#xff1a; Ubuntu 18.04 LTS / Ubuntu 16.04 LTS &#xff0f; Ubuntu 14.04 LTS / Debian 9 stretc…