Netty实现大文件分块传输详解

文章目录

  • 前言
  • Netty如何处理切分大文件的异步传输


前言

在Netty中,切分大文件进行传输主要利用ChunkedWriteHandler以及它的实现类,如ChunkedFile。这种机制允许你将大文件切分成多个小块(chunks),并通过Netty的pipeline进行异步发送。以下是使用Netty切分大文件进行传输的基本步骤:

  1. 设置Netty的ServerBootstrap或Bootstrap
    首先,你需要配置你的ServerBootstrap或Bootstrap,并添加ChunkedWriteHandler到你的pipeline中。ChunkedWriteHandler会处理所有ChunkedInputChunkedStream的写入。

  2. 创建ChunkedFile对象
    使用ChunkedFile类来创建一个代表大文件的ChunkedInput对象。你需要提供文件的路径,以及每个chunk的大小(或者默认使用ChunkedFile的默认chunk大小)。

  3. 写入Channel
    ChunkedFile对象写入Channel。由于ChunkedFile实现了ChunkedInput接口,Netty会自动处理文件的切分和发送。

下面是一个简单的示例代码:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;public class FileServer {private final int port;public FileServer(int port) {this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 添加HTTP编解码器p.addLast(new HttpServerCodec());// 添加ChunkedWriteHandler,它负责处理ChunkedInput的写入p.addLast(new ChunkedWriteHandler());// 添加自定义的处理器p.addLast(new SimpleChannelInboundHandler<FullHttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {// 获取请求的路径String uri = request.uri();// 假设文件都在服务器的某个目录下File file = new File("path/to/your/files" + uri);if (file.exists()) {// 创建ChunkedFile对象ChunkedFile chunkedFile = new ChunkedFile(file);// 设置响应头信息HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);HttpHeaders headers = response.headers();headers.set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");headers.set(HttpHeaderNames.CONTENT_LENGTH, file.length());// 写入响应ctx.write(response);// 写入文件内容ctx.writeAndFlush(chunkedFile);} else {// 文件不存在时的处理HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}}});}});// 绑定端口并开始接收传入的连接ChannelFuture f = b.bind(port).sync();// 等待服务器套接字关闭f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8080;if (args.length > 0) {port = Integer.parseInt(args[0]);}new FileServer(port).run();}
}

Netty如何处理切分大文件的异步传输

在Netty中处理切分大文件的异步传输通常涉及以下几个步骤:

  1. 读取和切分文件
    Netty提供了ChunkedFile类,它实现了ChunkedInput接口,用于异步地读取和发送文件内容。你可以创建一个ChunkedFile对象,并指定要传输的文件路径和每个chunk的大小。

  2. 写入Chunk到Channel
    ChunkedFile对象写入到Channel中。由于ChunkedFile实现了ChunkedInput接口,Netty会异步地处理文件的读取和发送。你可以使用ctx.writeAndFlush(chunkedFile)来将文件写入到Channel,并异步地发送数据。

  3. ChunkedWriteHandler 处理分块写操作
    ctx.writeAndFlush(chunkedFile)->AbstractChannelHandlerContext->writeAndFlush(Object msg)->write->invokeWriteAndFlush

   private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {if (invokeHandler()) {invokeWrite0(msg, promise);invokeFlush0();} else {writeAndFlush(msg, promise);}}

invokeWrite0->ChunkedWriteHandler.write()

    @Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {queue.add(new PendingWrite(msg, promise));}

invokeFlush0()->ChunkedWriteHandler.flush()->doFlush

private void doFlush(final ChannelHandlerContext ctx) {final Channel channel = ctx.channel();if (!channel.isActive()) {discard(null);return;}boolean requiresFlush = true;ByteBufAllocator allocator = ctx.alloc();while (channel.isWritable()) {if (currentWrite == null) {currentWrite = queue.poll();}if (currentWrite == null) {break;}final PendingWrite currentWrite = this.currentWrite;final Object pendingMessage = currentWrite.msg;if (pendingMessage instanceof ChunkedInput) {final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;boolean endOfInput;boolean suspend;Object message = null;try {message = chunks.readChunk(allocator);endOfInput = chunks.isEndOfInput();if (message == null) {// No need to suspend when reached at the end.suspend = !endOfInput;} else {suspend = false;}} catch (final Throwable t) {this.currentWrite = null;if (message != null) {ReferenceCountUtil.release(message);}closeInput(chunks);currentWrite.fail(t);break;}if (suspend) {// ChunkedInput.nextChunk() returned null and it has// not reached at the end of input. Let's wait until// more chunks arrive. Nothing to write or notify.break;}if (message == null) {// If message is null write an empty ByteBuf.// See https://github.com/netty/netty/issues/1671message = Unpooled.EMPTY_BUFFER;}ChannelFuture f = ctx.write(message);if (endOfInput) {this.currentWrite = null;// Register a listener which will close the input once the write is complete.// This is needed because the Chunk may have some resource bound that can not// be closed before its not written.// See https://github.com/netty/netty/issues/303f.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {closeInput(chunks);currentWrite.fail(future.cause());} else {// read state of the input in local variables before closing itlong inputProgress = chunks.progress();long inputLength = chunks.length();closeInput(chunks);currentWrite.progress(inputProgress, inputLength);currentWrite.success(inputLength);}}});} else if (channel.isWritable()) {f.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {closeInput(chunks);currentWrite.fail(future.cause());} else {currentWrite.progress(chunks.progress(), chunks.length());}}});} else {f.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {closeInput(chunks);currentWrite.fail(future.cause());} else {currentWrite.progress(chunks.progress(), chunks.length());if (channel.isWritable()) {resumeTransfer();}}}});}// Flush each chunk to conserve memoryctx.flush();requiresFlush = false;}}}
  1. 处理写入完成事件
    你可以通过监听ChannelFuture的完成事件来确定文件是否已经完全发送。当writeAndFlush方法返回的ChannelFuture完成时,表示数据已经被写入到底层的传输层,并且可以被远程客户端接收。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/820581.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据分析面试】21.Spotify 音乐数据库搭建(SQL主键和外键)

题目&#xff1a;音乐数据库设计 假设现在你在Spotify工作&#xff0c;你需要设计一个用于存储歌曲元数据的关系型数据库。 数据库要求包括歌曲标题、歌曲长度、歌曲添加到平台的日期、艺术家、专辑、曲目号&#xff08;在专辑中的顺序&#xff09;、歌曲的发行年份以及流派等…

FebHost:为什么注册.BE比利时域名?

.be 是比利时的国家代码顶级域名&#xff08;ccTLD&#xff09;&#xff0c;通常用于与该国有关的网网站。这个域名为那些希望在线上建立与比利时有关联系的个人、公司和组织提供了一个重要的网络标识。 .be 域名于1988年创建&#xff0c;由.BE域名注册机构管理&#xff0c;这…

【k8s】:深入理解 Kubernetes 中的污点(Taints)与容忍度(Tolerations)

【k8s】&#xff1a;深入理解 Kubernetes 中的污点&#xff08;Taints&#xff09;与容忍度&#xff08;Tolerations&#xff09; 1、污点&#xff08;Taints&#xff09;2、容忍度&#xff08;Tolerations&#xff09;3、示例演示-测试污点的具体应用场景3.1 给节点打污点&…

HarmonyOS开发案例:【智能煤气检测】

样例简介 智能煤气检测系统通过实时监测环境中烟雾浓度&#xff0c;当一氧化碳浓度超标时&#xff0c;及时向用户发出警报。在连接网络后&#xff0c;配合数字管家应用&#xff0c;用户可以远程配置智能煤气检测系统的报警阈值&#xff0c;远程接收智能煤气检测系统报警信息。…

多轴机械臂/正逆解/轨迹规划/机器人运动学/Matlab/DH法 学习记录02——机械臂几何法与DH表示法

系列文章目录 本科毕设正在做多轴机械臂相关的内容&#xff0c;这里是一个学习机械臂运动学课程的相关记录。 如有任何问题&#xff0c;可发邮件至layraliufoxmail.com问询。 1. 数学基础 2. 机械臂几何法与DH表示法 文章目录 系列文章目录一、手臂几何法1.机械手臂2.机械手臂…

4.配置USART串口实现printf打印

通过TTL转USB实现电脑和单片机连通,是我们调试必不可少的工具 查看原理图,使用USART1,它们的TX和RX分别在PA9和PA10 新建Usart.c存放串口模块的初始化 这段代码是复制了正点原子的工程,添加到前面 #if SYSTEM_SUPPORT_OS #include "includes.h" //ucos 使用 …

每日两题 / 15. 三数之和 73. 矩阵置零(LeetCode热题100)

15. 三数之和 - 力扣&#xff08;LeetCode&#xff09; 先确定一个数t&#xff0c;对于剩下的两个数&#xff0c;要求两数之和为t的负数 三数之和就退化成了两数之和&#xff0c;两数之和可以用双指针 先排序&#xff0c;左右两个指针&#xff0c;指向的数之和大于目标值&…

AI启示录:既是企业安全利器,也是内部隐藏炸弹

目录 发现潜在威胁->成为内部威胁 精准检测威胁->扰乱正常业务 深度学习->数据中毒 随着网络威胁的迅速发展&#xff0c;专家预计&#xff0c;到 2024 年网络犯罪将给美国公司带来高达 4520 亿美元的损失&#xff0c;且这一数字在未来几年还会迅速增长。除了网络安全措…

鱼哥推荐书籍第18期:《推荐系统》算法 案例与大模型

鱼哥推荐书籍第18期&#xff1a;《推荐系统》算法 案例与大模型 什么是推荐系统&#xff1f;推荐系统解决了哪些问题&#xff1f;推荐系统的应用领域常用的推荐算法&#xff1a;基于内容的推荐算法协同过滤算法&#xff1a; 如何系统学习推荐系统&#xff1a;第一部分&#xff…

IO流-IO框架

简介 java的IO流操作提供了最简单的操作&#xff0c;第三方基于日常使用习惯&#xff0c;写了很多IO框架&#xff0c;更加方便操作避免重复造轮子&#xff0c;提高开发效率 Commons-io 简介 Commons-io是apche提供的IO操作的小框架 部分常用的API 引入依赖 <dependency>…

天工杂志社《天工》杂志社2024年第5期目录

业界翘楚 巍巍者昆仑 煌煌者华夏 乔彦鹏; 6-8 工美史话 日月之光照澈幽冥——墓葬铜镜中柿蒂纹意象辨析 周昕怡;刘春芽; 9-11 西藏传统手工艺技能教育发展历程研究 吕元菊; 12-14 探索研究《天工》投稿&#xff1a;cn7kantougao163.com 传统装饰艺术设计美学的…

HashMap的扩容看这一篇足够

在Java中&#xff0c;对于HashMap这样的实现&#xff0c;put方法是用来将一个键值对插入到Map中的核心方法。以下是HashMap类中put方法的大致执行流程&#xff1a; 计算Hash值&#xff1a; 首先&#xff0c;put方法会接收一个键&#xff08;Key&#xff09;和一个值&#xff0…

局域网管理软件哪个好?局域网电脑管理系统实践案例

之前有一个公司案例&#xff0c;是这样的&#xff1a; 公司名称&#xff1a;智慧科技有限公司 背景&#xff1a; 智慧科技有限公司是一家拥有数百名员工的中型企业&#xff0c;随着业务的快速发展&#xff0c;公司面临着网络管理上的挑战。 员工在日常工作中需要频繁地访问…

微信小程序使用iconfont

进入iconfont&#xff0c;添加至项目 进入项目&#xff0c;点击生成代码&#xff0c;或更新代码 点击打开样式 复制内容到小程序的style文件夹下 最后引入到app.wxss

用ChatGPT读了几百篇文献,我总结出了文献综述这些经验

点击下方▼▼▼▼链接直达AIPaperPass &#xff01; AIPaperPass - AI论文写作指导平台 近期小编会将学术论文写作每一个流程需要了解的细节与ChatGPT在这细节的背景下如何提升我们的学术论文进行分章节讨论。最终汇总成一篇长文攻略。宝子们敬请期待哦。今天我们来详细聊聊文…

22 文件系统

了解了被打开的文件&#xff0c;肯定还有没被打开的文件&#xff0c;就是磁盘上的文件。先从磁盘开始认识 磁盘 概念 内存是掉电易失存储介质&#xff0c;磁盘是永久性存储介质 磁盘的种类有SSD&#xff0c;U盘&#xff0c;flash卡&#xff0c;光盘&#xff0c;磁带。磁盘是…

防止企业数据泄密的四种有效措施

防止企业数据泄密的四种有效措施 泄密大案每天都在上演&#xff0c;受害者既有几十人的小型企业&#xff0c;也有上万人的世界500强&#xff0c;为什么这些企业都难逃数据泄露的噩梦呢&#xff1f;我们应该采取什么措施来防止信息泄密呢&#xff1f; 首先我们来看看数据泄密的…

小肥柴慢慢手写数据结构(C篇)(5-5 Huffuman编码)

小肥柴慢慢学习数据结构笔记&#xff08;C篇&#xff09;&#xff08;5-5 Huffman编码&#xff09; 目录5-16 编码案例5-17 Huffman编码原理5-18 Huffman编码/解码实现5-18-1 大致思路5-18-2 编码实现5-18-3 解码实现5-18-4 测试 5-19 实际案例总结参考文献 目录 5-16 编码案例…

✌粤嵌—2024/4/11—合并区间

代码实现&#xff1a; /*** Return an array of arrays of size *returnSize.* The sizes of the arrays are returned as *returnColumnSizes array.* Note: Both returned array and *columnSizes array must be malloced, assume caller calls free().*/// 交换 void swap(i…

YOLOV5检测+追踪使用deepstream部署(c++版)

文章目录 一、Deepstream1.1 简介1.2 图架构&#xff08;Graph architecture&#xff09;1.3 应用架构&#xff08;Application Architecture&#xff09; 二、配置文件方式运行Deepstream2.1 环境准备2.2 主机运行2.3 配置文件解析2.4 docker运行 三、代码方式运行Deepstream3…