netty中的Future,继承自 jdk中的Future,, jdk中的Future,很垃圾,只能同步阻塞获取结果,,,
netty中的Future进行了升级,,可以addListener()
异步获取结果,,可以isSuccess()
判断任务成功还是失败,,
- jdk的Future
- get()
- isDone()
- cancel() : 取消当前任务
public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService service = Executors.newFixedThreadPool(2);Future<Integer> future = service.submit(() -> {log.debug("running...");Thread.sleep(2000);return 2;});Integer i = future.get();log.debug("i = " + i);}
- netty中的Future
- isSuccess() : 判断任务是否成功
- sync() : 同步等待,,任务不成功会抛错
- getNow() : 获取结果,没有就返回null
- await() : 同步等待,,任务不成功不会报错,,后面通过isSuccess()判断是否成功
- addListener() : 任务结束回调
public static void main(String[] args) {// netty中的线程池 eventLoop,, eventloop中就一个线程NioEventLoopGroup group = new NioEventLoopGroup(2);EventLoop eventLoop = group.next();Future<String> future = eventLoop.submit(() -> {Thread.sleep(2000);return "hehe";});String now = future.getNow();System.out.println("now = " + now);boolean success = future.isSuccess();System.out.println("success = " + success);future.addListener(new GenericFutureListener<Future<? super String>>() {@Overridepublic void operationComplete(Future<? super String> future) throws Exception {Object now1 = future.getNow();System.out.println("now1 = " + now1);boolean success = future.isSuccess();System.out.println("success = " + success);}});}
- netty中的Promise
继承自netty的Future,
Promise可以设置成功和失败,,不用等到任务结束
public static void main(String[] args) throws ExecutionException, InterruptedException {EventLoopGroup group = new NioEventLoopGroup(2);EventLoop eventLoop = group.next();// 主动创建promise ===> 结果的容器,DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);new Thread(()->{System.out.println("开始计算");try {int i = 1/0;Thread.sleep(1000);promise.setSuccess(1000);} catch (InterruptedException e) {e.printStackTrace();promise.setFailure(e);}//}).start();Integer i = promise.get();System.out.println("i = " + i);}
ChannelHandler
netty中handler分为两类:
- ChannelInboundHandler : 入站,, 读取数据,,,channel按照添加顺序依次执行
- ChannelOutboundHandler :出站 : 发送数据,,channel 逆序执行
channel.wirte() : 从末尾逆序执行
ctx.wirte() : 是从当前的handler,往前面找ChannelOutboundHandler执行
public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 添加处理器 ,,, netty会自动添加两个handler,,一个叫head,,一个叫tail,,,// 底层是 双向链表pipeline.addLast("handle01",new ChannelInboundHandlerAdapter(){// 入站的handler,,一般关心的 read@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("msg:{}",msg);ByteBuf byteBuf = (ByteBuf) msg;String s = byteBuf.toString(Charset.defaultCharset());// 调用下一个handle ctx.fireChannelRead(msg);,,并且将处理完成的结果,传递给下一个handlersuper.channelRead(ctx, s);}});pipeline.addLast("handle02",new ChannelInboundHandlerAdapter(){// 入站的handler,,一般关心的 read@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("msg222:{}",msg);User user = new User();user.setName(((String) msg));super.channelRead(ctx, user);}});pipeline.addLast("handle03",new ChannelInboundHandlerAdapter(){// 入站的handler,,一般关心的 read@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("msg333:{}",msg);super.channelRead(ctx, msg);ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes()));}});// 出站是,,从后面往前走 ,,只有有写出的时候,才会触发出站方法,,,,pipeline.addLast("handle04",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("msg444:{}",msg);super.write(ctx, msg, promise);}});pipeline.addLast("handle05",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("msg555:{}",msg);super.write(ctx, msg, promise);}});}}).bind(new InetSocketAddress(8080));}
EmbeddedChannel 模拟channel执行
public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("msg = " + msg);super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h2 = new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext channelHandlerContext, Object o, ChannelPromise channelPromise) throws Exception {System.out.println(4444);}};EmbeddedChannel channel = new EmbeddedChannel(h1,h2);
// channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hehe".getBytes()));channel.writeOutbound(channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hehe".getBytes())));}