【README】
1.本文总结自 B站 《尚硅谷-netty》;
2.NioEventLoop实际上是一个提交到线程池的Runnable任务,在while无限循环中运行 taskQueue中的任务(串行);
【1】提交任务到NioEventLoop
1)NioEventLoop:
- 表示一个不断循环执行的Runnable任务,每个NioEventLoop都有一个selector,用于监听绑定在其上的 socket 网络通道;
2)NioEventLoop 内部采用串行化设计:
- 从消息的读取->解码->处理->编码->发送,始终由IO 线程 NioEventLoop 负责;
3)NioEventLoopGroup 下包含多个 NioEventLoop ;
- 每个 NioEventLoop 中包含有一个 Selector,一个taskQueue ;
- 每个 NioEventLoop 的 Selector 上可以注册监听多个NioChannel ;
- 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上;
- 每个 NioChannel 都绑定有一个自己的 ChannelPipeline ;
4)任务提交到 NioEventLoop 后,实际会添加到 taskQueue ;
- taskQueue的访问路径如下:
- ChannelHandlerContext ->
- DefaultChannelPipeline ->
- NioSocketChannel ->
- NioEventLoop ->
- taskQueue (任务队列);
【1.1】场景1-在 netty server的handler中提交普通任务
/*** @Description netty服务器处理器* @author xiao tang* @version 1.0.0* @createTime 2022年08月27日*/
public class SimpleNettyServerHandler45 extends ChannelInboundHandlerAdapter {// 读写数据事件(读取客户端发送的消息)// 1. ChannelHandlerContext ctx: 上下文信息,包括管道pipeline,通道channel,地址// 2. Object msg: 客户端发送的数据,默认是 Object@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("server ctx = " + ctx);System.out.println("查看 channel 和 pipeline的关系 ");Channel channel = ctx.channel();ChannelPipeline channelPipeline = ctx.pipeline(); // 管道是双向链表,出栈入栈// 将 msg 转为 ByteBuf 字节缓冲// 这个 ByteBuf 是 netty提供的, 不是 nio的ByteBufferByteBuf buf = (ByteBuf) msg;System.out.println("客户端发送消息:" + buf.toString(StandardCharsets.UTF_8));System.out.println("客户端地址:" + ctx.channel().remoteAddress());// 同步任务1// 业务场景: 有一个耗时长的业务 -> 异步执行 -> 提交该 channel对应的 NIOEventLoop 的 taskQueue 中;
// Thread.sleep(10 * 1000);
// ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是同步任务1", StandardCharsets.UTF_8));
// System.out.println("go on.....");// 以上耗时操作的解决方案1:用户程序自定义的普通任务// 异步任务2ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(10 * 1000);ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是异步任务2-休眠10s", StandardCharsets.UTF_8));} catch (Exception e) {System.out.println("发生异常, " + e.getMessage());}}});// 异步任务3ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(20 * 1000);ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是异步任务3-休眠20s", StandardCharsets.UTF_8));} catch (Exception e) {System.out.println("发生异常, " + e.getMessage());}}});
【1.2】场景2-在 netty server的handler中提交定时任务
// 异步任务2和异步任务3添加到同一个任务队列,由同一个线程来运行,所以异步任务2阻塞10s,而异步任务3会阻塞30s(10+20)System.out.println("异步任务 go on.....");// 用户自定义定时任务 -》 定时任务提交到 scheduledTaskQueue 中ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {try {Thread.sleep(20 * 1000);ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是定时任务1-休眠5s", StandardCharsets.UTF_8));} catch (Exception e) {System.out.println("发生异常, " + e.getMessage());}}}, 5, TimeUnit.SECONDS);System.out.println("定时任务 go on.....");
【代码解说】
- 以上任务提交后,实际会添加到 taskQueue ;
- taskQueue的访问路径如下:
- ChannelHandlerContext ->
- DefaultChannelPipeline ->
- NioSocketChannel ->
- NioEventLoop ->
- taskQueue (任务队列);
- 每个NioEventLoop只能使用1个独立线程运行任务队列中的任务,即多个任务串行执行;
【补充】taskQueue目录树: