理解io/nio/netty

一、io

io即input/output,输入和输出

1.1 分类

输入流、输出流(按数据流向)
字节流(InputStream/OutputStream(细分File/Buffered))、字符流(Reader/Writer(细分File/Buffered/put))(按数据处理方式)
字节缓存流:避免频繁的io操作,缓冲区的大小默认为 8192 字节

二、字节

  • 字节:存储数据的单元
    1byte=8bit
    一个英文字母=1byte,一个汉字=2byte
  • 字符:1字符=2byte

三、nio

3.1 基本概念

  • 同步:当前任务完成前,不能做其他操作(单线程)
  • 异步:当前任务完成前,可以做其他操作(多线程)
  • 阻塞:当前任务挂起,不能做其他操作的状态(等待)
  • 非阻塞:当前任务进行中,无需挂起,可以做其他操作的状态(一心二用)

3.2 定义

bio为同步阻塞模式
nio为同步非阻塞模式,一个线程管理多个输入输出通道,涉及轮询、多路复用(一个线程不断轮询多个socket的状态,当socket有读写事件时调用io事件)

核心:channel(双向)、buffer、selector(监听通道事件)

3.3 流程

服务器端(pool)
属性:线程池、选择器selector

  • 创建一个PoolServer,
  • 初始化,并指定端口
    开通渠道ServerSocketChannel
    设置非阻塞
    绑定端口
    开通选择器
    将渠道注册到选择器
  • 监听事件
    轮询访问选择器
    处理对应的通道事件
    如果事件key状态为可接收:注册通道到选择器,设置状态为可读
    如果事件key状态为可读:将key对应通道设置为可读,线程池执行key对应的继承Thread的handler方法,重写run方法(通过key拿到通道;分配缓冲区,分配输出流;将通道读取的缓冲区内容写入输出流;将服务端回执写入通道;将通道设置可读;唤醒选择器)

3.4 应用

客户端:

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;public class Client {public static void main(String[] args) throws IOException {Socket s = new Socket("127.0.0.1", 8888);s.getOutputStream().write("HelloServer".getBytes());s.getOutputStream().flush();System.out.println("write over, waiting for msg back...");byte[] bytes = new byte[1024];int len = s.getInputStream().read(bytes);System.out.println(new String(bytes, 0, len));s.close();}
}

服务端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class Server {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8888));ssc.configureBlocking(false);System.out.println("server started, listening on :" + ssc.getLocalAddress());Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);while(true) {selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while(it.hasNext()) {SelectionKey key = it.next();it.remove();handle(key);}}}private static void handle(SelectionKey key) {if(key.isAcceptable()) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel sc = ssc.accept();sc.configureBlocking(false);sc.register(key.selector(), SelectionKey.OP_READ );} catch (IOException e) {e.printStackTrace();} finally {}} else if (key.isReadable()) { //flipSocketChannel sc = null;try {sc = (SocketChannel)key.channel();ByteBuffer buffer = ByteBuffer.allocate(512);buffer.clear();int len = sc.read(buffer);if(len != -1) {System.out.println(new String(buffer.array(), 0, len));}ByteBuffer bufferToWrite = ByteBuffer.wrap("HelloClient".getBytes());sc.write(bufferToWrite);} catch (IOException e) {e.printStackTrace();} finally {if(sc != null) {try {sc.close();} catch (IOException e) {e.printStackTrace();}}}}}
}

服务端:pool

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class PoolServer {ExecutorService pool = Executors.newFixedThreadPool(50);private Selector selector;/**** @throws IOException*/public static void main(String[] args) throws IOException {PoolServer server = new PoolServer();server.initServer(8000);server.listen();}/**** @param port* @throws IOException*/public void initServer(int port) throws IOException {//ServerSocketChannel serverChannel = ServerSocketChannel.open();//serverChannel.configureBlocking(false);//serverChannel.socket().bind(new InetSocketAddress(port));//this.selector = Selector.open();serverChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务端启动成功!");}/**** @throws IOException*/@SuppressWarnings("unchecked")public void listen() throws IOException {// 轮询访问selector  while (true) {//selector.select();//Iterator ite = this.selector.selectedKeys().iterator();while (ite.hasNext()) {SelectionKey key = (SelectionKey) ite.next();//ite.remove();//if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) key.channel();//SocketChannel channel = server.accept();//channel.configureBlocking(false);//channel.register(this.selector, SelectionKey.OP_READ);//} else if (key.isReadable()) {//key.interestOps(key.interestOps()&(~SelectionKey.OP_READ));//pool.execute(new ThreadHandlerChannel(key));}}}}
}/**** @param* @throws IOException*/
class ThreadHandlerChannel extends Thread{private SelectionKey key;ThreadHandlerChannel(SelectionKey key){this.key=key;}@Overridepublic void run() {//SocketChannel channel = (SocketChannel) key.channel();//ByteBuffer buffer = ByteBuffer.allocate(1024);//ByteArrayOutputStream baos = new ByteArrayOutputStream();try {int size = 0;while ((size = channel.read(buffer)) > 0) {buffer.flip();baos.write(buffer.array(),0,size);buffer.clear();}baos.close();//byte[] content=baos.toByteArray();ByteBuffer writeBuf = ByteBuffer.allocate(content.length);writeBuf.put(content);writeBuf.flip();channel.write(writeBuf);//if(size==-1){channel.close();}else{//key.interestOps(key.interestOps()|SelectionKey.OP_READ);key.selector().wakeup();}}catch (Exception e) {System.out.println(e.getMessage());}}
}

四、netty

netty是JBoss提供的开源网络编程框架,提供异步的、基于事件驱动的网络应用程序框架和工具。

架构
三层网络架构,Reactor 通信调度层 -> 职责链 PipeLine -> 业务逻辑处理层

为什么选择netty

  • API使用简单,开发门槛低
  • 功能强大,预置了多种编解码功能,支持多种主流协议
  • 定制能力强,可以通过ChannelHandler对通信框架进行灵活的扩展
  • 性能高,通过与其他业界主流的nio框架对比,netty的综合性能最优
  • 成熟、稳定,netty修复了已经发现的所有的JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼
  • 社区活跃,版本迭代周期短,发现的BUGkey倍及时修复,同时更多的新功能会被加入
  • 经历了大规模的商业应用考验,质量已经得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它可以完全满足不同行业的商业应用。

4.1 流程

在这里插入图片描述
netty的接收和发送ByteBuffer采用direct buffers,使用堆外直接内存进行socket读写,不需要进行字节缓冲区的二次拷贝。(如果使用传统的堆内存进行socket读写,JVM会将堆内存buffer拷贝一份到直接内存中,然后才写入socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。)

  • 服务端:

创建服务端并指定端口,启动服务端
创建boss和worker事件组
绑定事件组到通道,并指定子处理器,初始化通道,将处理器(继承ChannelHandlerContext,重写读方法(获取信息,将信息写入上下文,关闭上下文)以及异常捕获方法(关闭上下文))加到管道的最后
绑定端口获取future

  • 客户端:

创建客户端,启动客户端
创建workers事件组
绑定事件组到通道,并指定处理器,初始化通道,将定义的客户端处理器(继承ChannelInboundHandlerAdapter,重写通道激活方法(将信息写入上下文,获取future,添加监听器,当服务端收到信息时输出提示信息)以及读方法(读取信息,最后释放信息))添加到管道的后面
绑定端口,获取future

4.2 应用

服务端

import com.mashibing.io.aio.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;public class HelloNetty {public static void main(String[] args) {new NettyServer(8888).serverStart();}
}class NettyServer {int port = 8888;public NettyServer(int port) {this.port = port;}public void serverStart() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new Handler());}});try {ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}class Handler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//super.channelRead(ctx, msg);System.out.println("server: channel read");// ByteBuf是netty的一个字节容器ByteBuf buf = (ByteBuf)msg;System.out.println(buf.toString(CharsetUtil.UTF_8));ctx.writeAndFlush(msg);ctx.close();//buf.release();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//super.exceptionCaught(ctx, cause);cause.printStackTrace();ctx.close();}
}

客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;public class Client {public static void main(String[] args) {new Client().clientStart();}private void clientStart() {EventLoopGroup workers = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(workers).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {System.out.println("channel initialized!");ch.pipeline().addLast(new ClientHandler());}});try {System.out.println("start to connect...");ChannelFuture f = b.connect("127.0.0.1", 8888).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workers.shutdownGracefully();}}}class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channel is activated.");final ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer("HelloNetty".getBytes()));f.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {System.out.println("msg send!");//ctx.close();}});}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {ByteBuf buf = (ByteBuf)msg;System.out.println(buf.toString());} finally {ReferenceCountUtil.release(msg);}}
}

场景

  • 构建高性能、低时延的各种Java中间件,
    例如MQ、分布式服务框架、ESB消息总线,netty主要作为基础框架提供高性能、低时延的通信服务
  • 共有或者私有协议栈的基础通信框架,
    例如可以基于netty构建异步、高性能的websocket协议栈
  • 各领域应用,netty作为高性能的通信框架用于内部各模块的数据分发、传输和汇总等,实现模块之间高性能通信
    例如大数据、游戏等

4.3 拆包器

TCP拆包粘包

发送的数据出现断开接收或者多个包数据发生粘连

  • 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包
  • 待发送数据大于MSS最大报文长度,TCP在传输前将进行拆包
  • 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包
  • 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包

解决方法

  • 发送端给每个数据包添加包首部,首部中应该至少包含数据包的长度,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度 了
  • 发送端将每个数据包封装为固定长度,这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开
  • 可以在数据包之间设置边界,如添加特殊符号,这样接收端通过这个边界就可以将不同的数据包拆分开

netty提供了封装的拆包器:

  • 固定长度
  • 分隔符
  • 基于长度域(最通用)

4.4 零拷贝

传统拷贝:需要4次数据拷贝和4次上下文切换
磁盘->内核缓冲区的read buffer->用户缓冲区->内核的socket buffer->网卡接口(硬件)的缓冲区

零拷贝:省略中间的2步,不需要CPU的参与
磁盘->内核缓冲区的read buffer->网卡接口(硬件)的缓冲区
零拷贝是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源。而它通常是指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间,而直接在内核空间中传输到网络的方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/579075.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能优化算法应用:基于卷积优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于卷积优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于卷积优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.卷积优化算法4.实验参数设定5.算法结果6.…

[AIGC] ArrayList介绍

在Java编程中&#xff0c;我们经常需要存储和处理一组数据。为了更方便地管理数据集合&#xff0c;Java提供了许多集合类。其中之一就是ArrayList。 文章目录 是什么为什么怎么用总结 是什么 ArrayList是Java中的一个动态数组类&#xff0c;它实现了List接口。它可以自动调整大…

完美解决org.apache.jasper.JasperException: java.lang.ClassNotFoundException: org.apache.jsp.index_jsp

已解决org.apache.jasper.JasperException: java.lang.ClassNotFoundException: org.apache.jsp.index_jsp 下滑查看解决方法 文章目录 报错问题解决思路解决方法交流 报错问题 org.apache.jasper.JasperException: java.lang.ClassNotFoundException: org.apache.jsp.index_…

LINUX自启动线程学习笔记

由于嵌入式处理器的性能不够&#xff0c;希望尽量能减少不必要的启动进程。对处理器启动进程进行了学习&#xff0c;了解哪些相关介绍判断哪些必须保留的线程。 当前我使用的LINUX 系统启动后的线程 # ps PID USER COMMAND1 root init //启动部分的和GRUB相关2 ro…

Duboo-入门到学废【上篇】

目录 1&#x1f95e;.什么是duboo 2&#x1f32d;.架构图 3.&#x1f37f;快速入门 4.&#x1f9c7;浅浅理解 1.什么是duboo&#x1f936;&#x1f936;&#x1f936; Dubbo是一个由阿里巴巴开发的基于Java的开源RPC框架。它提供了高性能、透明化的远程方法调用&#xff0…

JWT 单点登录探析:原理、用途与安全实践

JWT 单点登录探析&#xff1a;原理、用途与安全实践 什么是 JWT&#xff1f; JWT &#xff08;JSON Web Token&#xff09; 是目前最流行的跨域认证解决方案&#xff0c;是一种基于 Token 的认证授权机制。 从 JWT 的全称可以看出&#xff0c;JWT 本身也是 Token&#xff0c…

给WordPress网站添加返回顶部按钮

给WordPress网站底部添加一个按钮&#xff0c;点它就可以现实快速返回到顶部。有两种方法可以现实&#xff0c;一种是通过安装相关插件来实现。另外一种方式就是以纯属代码的方式来实现。 给WordPress网站底部添加一个按钮&#xff0c;点它就可以现实快速返回到顶部。有两种方…

操作系统 面试第一弹

1. 进程和线程的区别 进程&#xff08;Process&#xff09;和线程&#xff08;Thread&#xff09;是操作系统中的重要概念&#xff0c;它们表示执行中的程序的不同执行单元。下面是它们的区别&#xff1a; 定义&#xff1a;进程是一个独立的执行环境&#xff0c;具有独立的内存…

【深度学习】DataComp论文,数据集介绍,大数据模型的数据集介绍

参考&#xff1a; https://laion.ai/blog/datacomp/ 论文&#xff1a;https://arxiv.org/abs/2304.14108 文章目录 论文报告的一些内容datacomp-1B 数据质量比lainon2B要好不同规模数据有多少数据数据处理数据来源 论文报告的一些内容 摘要 多模态数据集是近期如CLIP、Stable …

TCP服务器的演变过程:IO多路复用机制select实现TCP服务器

IO多路复用机制select实现TCP服务器 一、前言二、新增使用API函数2.1、select()函数2.2、FD_*系列函数 三、实现步骤四、完整代码五、TCP客户端5.1、自己实现一个TCP客户端5.2、Windows下可以使用NetAssist的网络助手工具 小结 一、前言 手把手教你从0开始编写TCP服务器程序&a…

洛谷——【数据结构1-2】二叉树

文章目录 题目【深基16.例1】淘汰赛题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1基本思路&#xff1a;代码 【深基16.例3】二叉树深度题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1基本思路&#xff1a;代码 [USACO3.4] 美国血统 American Heritage题目描…

算符优先语法分析设计原理与实现

前言&#xff1a; 作者的词法分析程序以及算符优先语法分析设计程序仓库链接 1、目标任务 **[实验项目] **以专题 1 词法分析程序的输出为语法分析的输入&#xff0c;实现算符优先分析算法&#xff0c;完成以下描述算术表达式的算符优先文法的算符优先分析过程。 G[E]:E→E…

Spark编程实验三:Spark SQL编程

目录 一、目的与要求 二、实验内容 三、实验步骤 1、Spark SQL基本操作 2、编程实现将RDD转换为DataFrame 3、编程实现利用DataFrame读写MySQL的数据 四、结果分析与实验体会 一、目的与要求 1、通过实验掌握Spark SQL的基本编程方法&#xff1b; 2、熟悉RDD到DataFram…

代码随想录算法训练营第二十七天 | 回溯算法part4

力扣题目 用时&#xff1a;未知 1、93.复原IP地址 2、78.子集 3、90.子集II 力扣题目记录 93.复原IP地址 这个题和上一个题差不多&#xff0c;只是4段数字&#xff0c;只有三个句点&#xff0c;所以第四段要单独处理判断子串是否合法 class Solution { private:vector<s…

Qt/QML编程学习之心得:实现一个图片浏览器(十八)

QML中有个重要控件,经常使用就是image,通常可以用它来显示一张图片。如果想结合openfiledialog来让image显示图片,也就是做一个简易的图片浏览器,怎么弄呢? DefaultFileDialog.qml: import QtQuick 2.0 import QtQuick.Dialogs 1.0FileDialog {id: fileDialogtitle: &qu…

2024免费的数据恢复软件EasyRecovery14自己操作就能恢复的方法

而今天小编为大家还是带来了同系列软件easyrecovery14&#xff0c;这是easyrecovery数据恢复软件中的技术员版本&#xff0c;不仅包含家庭版和专业版的所有功能&#xff0c;而且还旨在简化技术人员的数据恢复过程。软件拥有强大的数据恢复功能&#xff0c;支持使用的恢复场景有…

KNN与KD树博客总结

目录 总结小结&#xff1a; 总结 原始篇&#xff1a;KNN算法及其优缺点算法思想改进篇&#xff1a;KD树&#xff08;KNN的plus版算法实现第一篇&#xff1a;平衡二叉树的构建&#xff08;递归算法实现第二篇&#xff1a;KD树的构建&#xff08;递归算法实现第三篇&#xff1a;…

CentOS 7 设置网络

CentOS 7 设置网络 正常情况 ①登陆进去之后使用下面的命令修改文件 echo ONBOOTyes >> /etc/sysconfig/network-scripts/ifcfg-ens33②如果是虚拟机重启后使用如下命令进行查看IP地址 ip addr注&#xff1a;到这里如果显示有两部分&#xff0c;则代表网络设置成功&a…

华为设备VRP系统管理

为了满足企业业务对网络的需求&#xff0c;网络设备中的系统文件需要不断进行升级。另外&#xff0c;网络设备中的配置文件也需要时常进行备份&#xff0c;以防设备故障或其他灾害给业务带来损害。在升级和备份系统文件或配置文件时&#xff0c;经常会使用FTP和TFTP来传输文件。…

服务器系统时间不同步如何处理

在分布式计算环境中&#xff0c;服务器系统时间的同步至关重要。然而&#xff0c;由于各种原因&#xff0c;服务器系统时间不同步的问题时有发生,这可能会导致严重的问题&#xff0c;如日志不准确、证书验证失败等。下面我们可以一起探讨下造成服务器系统时间不同的原因以及解决…