11.netty入站与出站(处理器链调用机制)

【README】

  • 1.本文源代码总结自 B站《netty-尚硅谷》;
  • 2.本文部分内容总结自 https://www.baeldung.com/netty
  • 3.本文主要介绍了通道管道中多个入栈处理器与多个出站处理器如何执行?并用代码演示执行顺序;

补充:文末附带了 log4j整合到netty的配置;


【1】事件与处理器

1)概述:

  • netty使用了一种事件驱动的应用范式,因此数据处理的管道是一个经过处理器的事件链。事件和处理器可以关联到 inbound入站 与 outbound出站 数据流。

2)入站事件如下(inbound):

  • channel通道激活与失活(activation and deactivation)
  • 读操作事件
  • 异常事件;
  • 用户事件;

3)出站事件很简单,如下(outbound):

  • 打开与关闭连接;
  • 写出或刷新缓冲区数据

4)netty应用包含许多网络和应用逻辑事件及它们的处理器。

  • 通道事件处理器的基本接口是 ChannelHandler,其子类有 ChannelOutboundHandler 和 ChannelInboundHandler ;

【2】处理器链

1)入站和出栈事件都会经过预设的处理器链(多个处理器);

  • 即入站事件经过 入站处理器;出站事件经过出站处理器;多个处理器形成一个链或管道;

2)处理器举例:

  • 网络传输全是字节形式,而业务逻辑处理是对象形式,所以需要编码器把对象转字节,需要解码器把字节转对象
    • ByteToMessageDecoder 字节转消息(对象)解码器;
    • MessageToByteEncoder 消息(对象)转字节编码器;
  • 业务逻辑处理器(如加工,统计,入库,消息转发等);

3)以客户端服务器模式介绍入站与出站处理器的事件处理过程

【图解】

 客户端的处理器有:

  • 解码处理器;
  • 编码处理器;
  • 客户端业务处理器;

服务端的处理器有:

  • 解码处理器;
  • 编码处理器;
  • 服务器业务处理器;

补充:多个处理器封装到通道管道 ChannelPipeline;


【3】处理器链调用机制代码实现

1)需求描述:

  • 自定义编码器和解码器实现客户端与服务器间的数据传输;

2)通道管道ChannelPipeline 可以封装多个处理器;其处理器执行顺序特别重要(前后关系特别重要,如入栈解码处理器要第1个执行,又如出站编码器要最后一个执行),否则客户端与服务器将无法通信(因为事件或数据要经过所有的处理器);类似于如下:

for (event event : events) {handler1(event);handler2(event);handler3(event);
}

3)入站与出站处理器执行顺序:

3.1)服务器初始化器,添加处理器到管道;

 3.2)客户端初始化器:


  【3.1】服务器

1)服务器:用于监听网络端口,处理请求;

/*** @Description 测试handler链的服务器* @author xiao tang* @version 1.0.0* @createTime 2022年09月10日*/
public class NettyServerForHandlerChain80 {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new NettyServerInitializer()); // 自定义一个初始化类// 自动服务器ChannelFuture channelFuture = serverBootstrap.bind(8089).sync();System.out.println("服务器启动成功");// 监听关闭channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

2)通道初始化器:把多个处理器添加到通道管道;

/*** @Description 初始化器* @author xiao tang* @version 1.0.0* @createTime 2022年09月10日*/
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 入站handler,把字节型数据解码为long型
//        pipeline.addLast(new MyByte2LongDecoder()); // MyByte2LongDecoder 与 MyByte2LongDecoder2 等价pipeline.addLast(new MyByte2LongDecoder2());// 出站handler,  把long型数据编码为字节(编码器)pipeline.addLast(new MyLong2ByteEncoder());// 添加业务逻辑handlerpipeline.addLast(new NettyServerHandler());System.out.println("NettyServerInitializer.initChannel 执行成功.");}
}

3)服务端业务处理器:业务逻辑处理; 

/*** @Description nety服务器handler* @author xiao tang* @version 1.0.0* @createTime 2022年09月10日*/
public class NettyServerHandler extends SimpleChannelInboundHandler<Long> {// 被调用多次@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到long" + msg);// 给客户端回送消息ctx.writeAndFlush(98765L);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}

 【3.2】客户端

1)客户端:建立与服务器的连接;

/*** @Description netty客户端* @author xiao tang* @version 1.0.0* @createTime 2022年09月10日*/
public class NettyClientForHandlerChain81 {public static void main(String[] args) throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new NettyClientInitializer());  // 自定义一个初始化类// 连接服务器ChannelFuture channelFuture = bootstrap.connect("localhost", 8089).sync();channelFuture.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}
}

2)通道初始化器:添加多个处理器到通道管道

/*** @Description netty客户端初始化器* @author xiao tang* @version 1.0.0* @createTime 2022年09月10日*/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 出站handler,把long型数据解码为字节型pipeline.addLast(new MyLong2ByteEncoder());// 入站handler,把字节型数据解码为long型pipeline.addLast(new MyByte2LongDecoder());// 添加一个自定义handler(入站),处理业务逻辑pipeline.addLast(new NettyClientHandler());}
}

3)客户端业务处理器:业务逻辑处理 ;

/*** @Description 客户端处理器* @author xiao tang* @version 1.0.0* @createTime 2022年09月10日*/
public class NettyClientHandler extends SimpleChannelInboundHandler<Long> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {System.out.println("得到服务器ip = " + ctx.channel().remoteAddress());System.out.println("收到服务器消息 = " + msg);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("NettyClientHandler 发送数据");// 1 分析// 1.1 abcdefgabcdefgab 是16个字节, long型占8个字节,所以服务器需要解码2次,每次解码8个字节// 1.2 该处理器的前一个handler是 MyLong2ByteEncoder,// 1.3 MyLong2ByteEncoder 的父类是  MessageToByteEncoder// 1.4 MessageToByteEncoder.write()方法通过acceptOutboundMessage判断当前msg是否为要处理的数据类型;//     若不是,则跳过encode方法, 否则执行对应的encode 方法(处理方法)// 客户端发送一个ByteBuf,不走Long型编码器ctx.writeAndFlush(Unpooled.copiedBuffer("abcdefgabcdefgab", StandardCharsets.UTF_8));// 客户端发送一个Long,走Long型编码器
//        ctx.writeAndFlush(123456L); // 发送一个long}
}

【3.3】编码器与解码器处理器

1)字节转 Long型的解码器处理器

/*** @Description 字节转long的解码器* @author xiao tang* @version 1.0.0* @createTime 2022年09月10日*/
public class MyByte2LongDecoder extends ByteToMessageDecoder {/*** @description decode 会根据接收的数据,被调用多次,直到确定没有新元素被添加到list为止, 或者 ByteBuf没有更多的可读字节为止;*              如果list out 不为空,就会将list的内容传递给下一个 ChannelInboundHandler,*              且下一个 ChannelInboundHandler的处理方法也会被调用多次* @param ctx 处 理器上下文* @param in  字节输入缓冲* @param out 集合,把处理后的数据传给下一个 ChannelInboundHandler** @return* @author xiao tang* @date 2022/9/10*/@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("decode解码");// long 8个字节,需要判断有8个字节,才能读取一个longif (in.readableBytes() >= 8) {out.add(in.readLong());}}
}

 一个 解码器 变体;

/*** @Description 字节转long的解码器* @author xiao tang* @version 1.0.0* @createTime 2022年09月10日*/
public class MyByte2LongDecoder2 extends ReplayingDecoder {/*** @description decode 会根据接收的数据,被调用多次,直到确定没有新元素被添加到list为止, 或者 ByteBuf没有更多的可读字节为止;*              如果list out 不为空,就会将list的内容传递给下一个 ChannelInboundHandler,*              且下一个 ChannelInboundHandler的处理方法也会被调用多次* @param ctx 处 理器上下文* @param in  字节输入缓冲* @param out 集合,把处理后的数据传给下一个 ChannelInboundHandler** @return* @author xiao tang* @date 2022/9/10*/@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("MyByte2LongDecoder2被调用, decode解码");// ReplayingDecoder,无需判断字节流是否足够读取,内部会进行处理判断
//        if (in.readableBytes() >= 8) { // 无需判断out.add(in.readLong());
//        }}
}

2)Long型转字节的编码器处理器

/*** @Description long转字节的编码器* @author xiao tang* @version 1.0.0* @createTime 2022年09月10日*/
public class MyLong2ByteEncoder extends MessageToByteEncoder<Long> {@Overrideprotected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {System.out.println("MyLong2ByteEncoder.encode 被调用");System.out.println("MyLong2ByteEncoder msg = " + msg);out.writeLong(msg);}
}

【3.4】运行效果

1)客户端:

decode解码
得到服务器ip = localhost/127.0.0.1:8089
收到服务器消息 = 98765
decode解码
得到服务器ip = localhost/127.0.0.1:8089
收到服务器消息 = 98765

2)服务器:

MyByte2LongDecoder2被调用, decode解码
从客户端/127.0.0.1:56272读取到long7017280452245743457
MyLong2ByteEncoder.encode 被调用
MyLong2ByteEncoder msg = 98765
MyByte2LongDecoder2被调用, decode解码
从客户端/127.0.0.1:56272读取到long7089620625083818338
MyLong2ByteEncoder.encode 被调用
MyLong2ByteEncoder msg = 98765

【4】 log4j整合到 netty

1)引入log4j maven依赖;

 <dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>test</scope>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version><scope>test</scope>
</dependency>

2)配置log4j;

log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%p] %C{1} - %m%n

3)效果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329362.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1688推广工具_全面了解1688数字营销

全面了解1688数字营销什么是数字营销&#xff1f;在数字营销中&#xff0c;有一款很重要的工具-一键推广。在我们的日常经营和大促中&#xff0c;通过一键站外推广可以在各大站外平台实现全店、各产品的推广&#xff0c;尤其在大促中&#xff0c;结合活动当天的站内流量&#x…

Entity Framework升级

第三篇是Entity Framework升级 修改project.json 把原来 EntityFramework 的包 换成 Microsoft.EntityFrameworkCore 版本从 7.0.0-rc1-final 改为 1.0.0-rc2-final 对照表如下&#xff1a; RC1 PackageRC2 EquivalentEntityFramework.MicrosoftSqlServer 7.0.0-rc1-finalMicro…

12.netty中tcp粘包拆包问题及解决方法

【README】 1.本文源代码总结自B站《netty-尚硅谷》&#xff1b;2.本文介绍了tcp粘包拆包问题&#xff1b;3.本文po 出了粘包拆包问题解决方案及源代码实现&#xff1b;【1】tcp粘包拆包问题 refer2 How to deal with the problem of packet sticking and unpacking during T…

Oracle入门(十四.7)之良好的编程习惯

一、目的良好的编程实践是可以遵循的技术来创建最佳代码。 编程实践涵盖了从编写更易读的代码到创建具有更快性能的代码。软件工程团队通常会遵循风格指南&#xff0c;以便团队中的每个人都使用相同的技术。 这使得读取和修改其他人编写的代码变得更加容易。二、编程实践您已经…

css 浏览器调试中不可见_前端入门必会的初级调试技巧

本文仅仅针对前端初学者&#xff0c;目的是【用20%不到的时间】 学会【前端最常用的部分调试技巧】&#xff0c;如果需要最详细的调试技巧&#xff0c;包括调试性能优化的相关知识&#xff0c;文末会补充最全的文档&#xff08;chrome devtool的官方文档链接&#xff09;初学一…

升级ASP.Net Core项目

升级完类库项目&#xff0c;第二篇&#xff0c;我们来升级ASP.Net Core项目 修改global.json与project.json 这里可以参照&#xff0c;升级.Net Core RC2的那些事&#xff08;一&#xff09; 这里补充一点就是如果你觉得这样修改复杂&#xff0c;你完全可以新建一个项目&#x…

gophp解释器_【干货】Gisp 解释器 Golang 辅助开发工具

Gisp 是一个提供给 golang 使用的 Lisp 类 DSL 解释器。在 Lisp 的基本语法基础上&#xff0c;针对 go 环境稍作了一点语法糖。主要目标是提供一个尽可能便于与 golang 互操作的微型DSL工具。简介Gisp用go语言编写&#xff0c;是一个DSL 解释器&#xff0c;这个 DSL 基本上就是…

Oracle入门(十四.8)之迭代控制:基本循环Loop

一、迭代控制&#xff1a;LOOP语句 循环多次重复一个语句或一系列语句。 PL / SQL提供了以下几种类型的循环&#xff1a;•没有全面条件执行重复操作的基本循环 •FOR循环&#xff0c;基于计数器执行迭代操作•WHILE循环根据条件执行重复操作二、基本循环LOOP语句的最简单形式…

phpst安装memcache扩展_在 Ubuntu/Debian 下安装 PHP7.3 教程

介绍最近的 PHP 7.3.0 已经在 2018 年12月6日 发布 GA&#xff0c;大家已经可以开始第一时间体验新版本了&#xff0c;这里先放出 PHP7.3 安装的教程以便大家升级。适用系统&#xff1a; Ubuntu 18.04 LTS / Ubuntu 16.04 LTS &#xff0f; Ubuntu 14.04 LTS / Debian 9 stretc…

升级.Net Core RC1的类库项目

微软终于发布了.Net Code RC2了&#xff0c;作为一个软粉当然是第一时间升级了。《升级.Net Core RC2的那些事》系列文章主要是记录本人升级RC2的相关步骤以及遇到过的坑。 第一篇先写类库项目&#xff08;Nuget包项目&#xff09;的升级 升级VS工具 这里只提供一个下载地址&am…

Oracle入门(十四.9)之迭代控制:WHILE和FOR循环

一、WHILE循环您可以使用WHILE循环重复一系列语句&#xff0c;直到控制条件不再为TRUE。 条件在每次迭代开始时进行评估。当条件为FALSE或NULL时&#xff0c;循环终止。 如果条件在循环开始时为FALSE或NULL&#xff0c;则不会执行进一步的迭代。 WHILE condition LOOPstatement…

为TFS配置跨平台的生成服务器Xplat (Ubuntu Linux)

1. 概述 从TFS 2015开始&#xff0c;微软开始支持跨平台的构建代理。你可以使用TFS的Xplat代理&#xff0c;方便的在基于IOS, Unix和Linux的服务器上搭建生成代理&#xff0c;实现构建、发布等功能。本文档已Ubuntu为例&#xff0c;指导如何安装和运行Xplat代理。 2. 配置TFS的…

分数优先遵循志愿php源码_分数优先 遵循志愿

本报讯 昨日&#xff0c;广东省考试院发布2019年我省普通高校招生平行志愿投档及录取实施办法。今年我省依旧实行普通高校招生平行志愿投档录取模式&#xff0c;按照“分数优先、遵循志愿”的原则&#xff0c;根据考生高考成绩高低排序和院校志愿先后顺序投档&#xff0c;投出…

Oracle入门(十四.10)之显式游标简介

一、上下文区域和游标Oracle服务器分配一个称为上下文区域的私有内存区域来存储由SQL语句处理的数据。 每个上下文区域&#xff08;因此每个SQL语句&#xff09;都有一个与其关联的游标。您可以将游标视为上下文区域的标签&#xff0c;或者将其作为指向上下文区域的指针。 事实…

1.(转)canal背景与工作原理

【README】 1.canal是一个工具&#xff0c;由阿里开源&#xff0c;用于解析mysql的binlog增量日志&#xff0c;重放日志还原出业务数据&#xff0c;下游可以送入 es&#xff0c;mysql&#xff0c;hbase等&#xff1b; 2.本文以下内容转自&#xff1a;GitHub - alibaba/canal:…

Dapper、Entity Framework 和混合应用

你大概注意到了&#xff0c;自 2008 年以来&#xff0c;我写过许多关于 Entity Framework&#xff08;即 Microsoft 对象关系映射器 (ORM)&#xff09;的文章&#xff0c;ORM 一直是主要的 .NET 数据访问 API。市面上还有许多其他 .NET ORM&#xff0c;但是有一个特殊类别因其强…

html让时间只展示年月日_如何用html写代码,使得在网页上显示当前的时间和日期...

展开全部在网页62616964757a686964616fe59b9ee7ad9431333363363537中动态的显示日期时间&#xff0c;一般都是使用js来实现&#xff0c;很简单&#xff0c;一看就会。网页中动态的显示系统日期时间function startTime(){var todaynew Date();//定义日期对象var yyyy today.get…

Oracle入门(十四.11)之使用显式游标属性

一、游标和记录 此示例中的游标基于SELECT语句&#xff0c;该语句仅检索每个表行的两列。 如果它检索了六列或七&#xff0c;八&#xff0c;二十个呢&#xff1f; DECLAREv_emp_id employees.employee_id%TYPE;v_last_name employees.last_name%TYPE;CURSOR emp_cursor ISSEL…

(转 )centos8安装mysql

【1】下载 mysql rpm包 MySQL :: Download MySQL Yum Repositoryhttps://dev.mysql.com/downloads/repo/yum/ 【2】安装mysql 根据官方文档安装&#xff0c;如下&#xff1a; MySQL :: A Quick Guide to Using the MySQL Yum Repositoryhttps://dev.mysql.com/doc/mysql-yu…

IIS负载均衡-Application Request Route详解第一篇: ARR介绍

说到负载均衡&#xff0c;相信大家已经不再陌生了&#xff0c;本系列主要介绍在IIS中可以采用的负载均衡的软件&#xff1a;微软的Application Request Route模块。 其实Application RequestRoute已经有很多文章介绍过了&#xff0c;但是有很多的文档都是英文的&#xff0c;笔者…