node中的Stream-Readable和Writeable解读

在node中,只要涉及到文件IO的场景一般都会涉及到一个类-Stream。Stream是对IO设备的抽象表示,其在JAVA中也有涉及,主要体现在四个类-InputStream、Reader、OutputStream、Writer,其中InputStream和OutputStream类针对字节数据进行读写;Reader和Writer针对字符数据读写。同时Java中有多种针对这四种类型的扩展类,如节点流、缓冲流和转换流等。比较而言,node中Stream类型也和Java中的类似,同样提供了支持字节和字符读写的Readable和Writeable类,也存在转换流Transform类,本文主要分析node中Readable和Writeable的实现机制,从底层的角度更好的理解Readable和Writeable实现机制,解读在读写过程中发生的一些重要事件。

Readable类

Readable对应于Java中的InputStream和Reader两个类,针对Readable设置encode编码可完成内部数据由Buffer到字符的转换。Readable Stream有两种模式,即flowing和paused模式。这两种模式对于用户而言区别在于是否需要手动调用Readable.prototype.read(n),读取缓冲区的数据。查询node API文档可知触发flowing模式有三种方式:

  • 侦听data事件
  • readable.resume()
  • readable.pipe()
    而触发paused模式同样有几种方式:
  • 移除data事件
  • readable.pause()
  • readable.unpipe()
    可能这样讲解大家仍不明白Readable Stream这两种模式的区别,那么下文从更深层次分析两种模式的机制。

深入Readable的实现

Readable继承EventEmitter,大家也都知道。但是相信大家应该不怎么熟悉Readable的实例属性**_readableState**。该属性是一个ReadableState类型的对象,保存了Readable实例的重要信息,如读取模式(是否为对象模式)、highWaterMark(缓冲区存放的最大字节数)、缓冲区、flowing模式等。在Readable的实现中,处处使用ReadableState对象记录当前读取状态,并设置缓冲区保证读操作的顺利进行。

首先需要针对Readable.prototype.read方法进行特别解读:

  if (n === 0 &&state.needReadable &&(state.length >= state.highWaterMark || state.ended)) {debug('read: emitReadable', state.length, state.ended);if (state.length === 0 && state.ended)endReadable(this);elseemitReadable(this);return null;}

当读入的数据为0时,执行emitReadable操作。这意味着,针对Readable Stream执行read(0)方法会触发readable事件,但是不会读当前缓冲区。因此使用read(0)可以完成一些比较巧妙的事情,如在readable处理函数中可以使用read(0)触发下一次readable事件,可选的操作读缓冲区。

继续分析代码,如果读入的数据并不是0,则计算读取缓冲区的具体字节数,

n = howMuchToRead(n, state);function howMuchToRead(n, state) {if (state.length === 0 && state.ended)return 0;if (state.objectMode)return n === 0 ? 0 : 1;if (n === null || isNaN(n)) {// only flow one buffer at a timeif (state.flowing && state.buffer.length)return state.buffer[0].length;// 若是paused状态,则读全部的缓冲区elsereturn state.length;}if (n <= 0)return 0;if (n > state.highWaterMark)state.highWaterMark = computeNewHighWaterMark(n);// don't have that much.  return null, unless we've ended.if (n > state.length) {if (!state.ended) {state.needReadable = true;return 0;} else {return state.length;}}return n;
}

针对对象模式的读取,每次只读一个;对于处在flowing模式下的读取,每次只读缓冲区中第一个buffer的长度;在paused模式下则读取全部缓冲区的长度;若读取的字节数大于设置的缓冲区最大值,则适当扩大缓冲区的大小(默认为16k,最大为8m);若读取的长度大于当前缓冲区的大小,设置needReadable属性并准备数据等待下一次读取。

接下来,判断是否需要准备数据。在这里,依赖于needReadable的值,

var doRead = state.needReadable;debug('need readable', doRead);if (state.length === 0 || state.length - n < state.highWaterMark) {doRead = true;debug('length less than watermark', doRead);}// reading, then it's unnecessary.if (state.ended || state.reading) {doRead = false;debug('reading or ended', doRead);}

如果当前缓冲区为空,或者缓冲区并未超出我们设定的最大值,那么就可以继续准备数据;如果此时正在准备数据或者已经结束读取,那么就放弃准备数据。一旦doRead为true,那么进入准备数据阶段,

if (doRead) {debug('do read');state.reading = true;state.sync = true;// if the length is currently zero, then we *need* a readable event.if (state.length === 0)state.needReadable = true;// call internal read method// 默认Readable未实现_read,抛出Error// 针对自定义的Readable子类,_read可修改state.buffer的数量,进行预处理,// 然后由下面的fromList读出去缓存中的相关数据this._read(state.highWaterMark);state.sync = false;}

接下来设置相关的标志位,进行_read处理。针对这个私有方法_read,文档上有特殊说明,自定义的Readable实现类需要实现这个方法,在该方法中手动添加数据到Readable对象的读缓冲区,然后进行Readable的读取。可以理解为_read函数为读取数据前的准备工作(准备数据),针对的是流的实现者而言。

  if (doRead && !state.reading)n = howMuchToRead(nOrig, state);var ret;if (n > 0)ret = fromList(n, state);elseret = null;if (ret === null) {state.needReadable = true;n = 0;}state.length -= n;if (state.length === 0 && !state.ended)state.needReadable = true;if (nOrig !== n && state.ended && state.length === 0)endReadable(this);// flowing模式下的数据读取依赖于 read函数// data事件触发的次数,依赖于howMuchToRead计算的次数if (ret !== null)this.emit('data', ret);

一旦在_read中更新了缓冲区,那么我们需要重新计算(消费者,即可写流)读取的字节数。fromList方法完成了读缓冲区的slice,如果是objectMode下的读,则只读缓冲区的第一个对象;针对未传参数的read方法而言,默认读取全部缓冲区等等。从读缓冲区读取完数据之后设置相关flag,如needReadable,最终,触发data事件,结束!

上节提到,设置data事件的执行函数会进入flowing模式的读,而上文看到正是read方法触发了data事件,而默认条件下Readable处于paused状态,因此在paused状态读取数据需要手动执行read函数,每次read读取完毕触发一次data事件。从这点看出,flowing和paused状态区别在于是否需要手动执行read()来获取数据。flowing状态下,我们无需执行read,仅需要设置data事件处理函数或者设定导流目标pipe;而在paused状态下,不仅仅是简单的执行read方法,因为读缓冲区的内容时刻在改变,一旦读缓冲区又有新数据,简单执行read()就没法满足需求(因为我们无法知道是否又有新数据到来),因此需要侦听读缓冲区的相关事件,即readable事件,在该事件处理函数中进行read相关数据。

那么,什么情况下会触发readable事件呢?在实现_read私有方法中,我们使用stream.push(chunk)或stream.unshift(chunk)方法注入数据到读缓冲区,那么push和unshift方法都实现了下面的逻辑,

if (state.flowing && state.length === 0 && !state.sync) {stream.emit('data', chunk);stream.read(0);
} else {// update the buffer info.state.length += state.objectMode ? 1 : chunk.length;if (addToFront)state.buffer.unshift(chunk);elsestate.buffer.push(chunk);if (state.needReadable)emitReadable(stream);
}function emitReadable(stream) {var state = stream._readableState;state.needReadable = false;if (!state.emittedReadable) {debug('emitReadable', state.flowing);state.emittedReadable = true;if (state.sync)process.nextTick(emitReadable_, stream);elseemitReadable_(stream);}
}function emitReadable_(stream) {debug('emit readable');stream.emit('readable');flow(stream);
}
// 在flowing状态下,自动读取流(替代paused状态下手动read)
function flow(stream) {var state = stream._readableState;debug('flow', state.flowing);if (state.flowing) {do {var chunk = stream.read();} while (null !== chunk && state.flowing);}
}

一旦处于flowing模式并且当前缓冲区没有数据,那么就立即将预处理的push(unshift)数据传递给data事件处理函数,并执行stream.read(0)。前文已经交代过,read(0)仅仅用来触发readable事件,并不读取缓冲区,这就是触发readable的第一种情况。

第二种则是第一种情况之外的所有情景,即根据操作(push、unshift)的不同将数据插入读缓冲区的不同位置。最后执行emitReadable函数,触发readable事件。针对emitReadable函数,它的作用就是异步触发readable事件,并执行flow函数。flow函数则针对flowing状态的Readable做自适应读取,免去了手动执行read函数和何时执行read函数的苦恼。

这样,对于Readable的实现者,一旦在_read函数插入有效数据到读缓冲区,都会触发readable事件,在paused状态下,设置readable事件处理函数并手动执行read函数,便可完成数据的读取;而在flowing状态下,通过设置data事件处理函数或者定义pipe目标流同样可以实现读取。

既然pipe同样可以触发Readable进入flowing状态,那么pipe方法具体做了什么呢?其实pipe针对Readable和Writeable做了限流,首先针对Readable的data事件进行侦听,并执行Writeable的write函数,当Writeable的写缓冲区大于一个临界值(highWaterMark),导致write函数返回false(此时意味着Writeable无法匹配Readable的速度,Writeable的写缓冲区已经满了),此时,pipe修改了Readable模式,执行pause方法,进入paused模式,停止读取读缓冲区。而同时Writeable开始刷新写缓冲区,刷新完毕后异步触发drain事件,在该事件处理函数中,设置Readable为flowing状态,并继续执行flow函数不停的刷新读缓冲区,这样就完成了pipe限流。需要注意的是,Readable和Writeable各自维护了一个缓冲区,在实现的上有区别:Readable的缓冲区是一个数组,存放Buffer、String和Object类型;而Writeable则是一个有向链表,依次存放需要写入的数据。

Writeable解读

Writeable对应Java的OutputStream和Writer类,实现字节和字符数据的写。与Readable类似,Writeable的实例对象同样维护了一个状态对象-WriteableState,记录了当前输出流的状态信息,如写缓冲区的最大值(hightWaterMark)、缓冲区(有向链表)和缓冲区长度等信息。在本节中,主要分析输出流的关键方法write和事件drain,并解析输出流的实现者需要实现的方法**_writewrite**的关系。

function write
----------------------------
if (state.ended)writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {state.pendingcb++;ret = writeOrBuffer(this, state, chunk, encoding, cb);}return ret;

在write方法中,判断写入数据的格式并执行writeOrBuffer函数,并返回执行结果,该返回值标示当前写缓冲区是否已满。真正执行写入逻辑的是writeOrBuffer函数,该函数的作用在于刷新或者更新写缓冲区,下面看看主要做了什么,

function writeOrBuffer(stream, state, chunk, encoding, cb) {chunk = decodeChunk(state, chunk, encoding);if (chunk instanceof Buffer)encoding = 'buffer';var len = state.objectMode ? 1 : chunk.length;state.length += len;// 如果缓存的长度大于highWaterMark,需要刷新缓冲,所以设置needDrain标志var ret = state.length < state.highWaterMark;// we must ensure that previous needDrain will not be reset to false.if (!ret)state.needDrain = true;// 缓存未处理的写请求,在clearBuffer中执行缓存// 由此看出,Readable和Writeable都有缓存,Readable 中缓存的方式是数组(项为Buffer,字符串或对象),Writeable的// 缓存则是对象链表if (state.writing || state.corked) {var last = state.lastBufferedRequest;state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);if (last) {last.next = state.lastBufferedRequest;} else {state.bufferedRequest = state.lastBufferedRequest;}state.bufferedRequestCount += 1;} else {doWrite(stream, state, false, len, chunk, encoding, cb);}return ret;
}

writeOrBuffer首先针对数据进行编码,字符串转换成Buffer类型,如果设置了Writeable的ObjectMode模式则仍为Object类型;接下来更新写缓冲区的长度,并判断写缓冲区长度是否超过设定的Writeable的最大值(默认16k),如果超过超过则ret=false并更新WriteableState的属性needDrain=true。ret的结果其实就是write方法返回值,因此一旦write返回值为false,意味着当前写缓冲区已满,需要停止继续写入数据。

在Readable的pipe方法中,涉及到了Writeable的drain事件。该事件的触发意味着写缓冲区已可以继续缓存数据,可见drain事件与写缓冲区严格相关。继续分析writeOrBuffer函数,若当前输出流正在写数据,那么则当前数据缓存至写缓冲区(创建WriteReq对象);否则执行doWrite函数,刷新缓冲区。

function doWrite(stream, state, writev, len, chunk, encoding, cb) {state.writelen = len;state.writecb = cb;state.writing = true;state.sync = true;if (writev)stream._writev(chunk, state.onwrite);elsestream._write(chunk, encoding, state.onwrite);state.sync = false;
}

doWrite函数设置了需要写入数据的长度、写入状态等信息,并执行输出流实现者需要实现的_write函数。在_write函数中,针对数据流向做最后的处理,这里分析_write函数的具体实现。_write函数有三个参数,分别为chunk,encoding和state.onwrite回调函数,对该回调函数稍后分析,先着重讲解_write函数的实现。在node的fs模块中,可以通过fs.createWriteStream创建Writeable实例,通过执行

var writeStream = fs.createWriteStream('./output',{decodeStrings: false});
console.log(writeStream._write.toString());-----------------输出-----------------function (data, encoding, cb) {if (!(data instanceof Buffer))return this.emit('error', new Error('Invalid data'));if (typeof this.fd !== 'number')return this.once('open', function() {this._write(data, encoding, cb);});var self = this;fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {if (er) {self.destroy();return cb(er);}self.bytesWritten += bytes;cb();});if (this.pos !== undefined)this.pos += data.length;
}

看出,在_write实现中,只接受Buffer类型的数据,接着执行fs.write操作,写入到对应文件描述符fd对应的文件中,写入成功或失败后执行回调函数,即state.onwrite函数。

function onwrite(stream, er) {var state = stream._writableState;var sync = state.sync;var cb = state.writecb;onwriteStateUpdate(state);// 默认未重写_write方法,会收到er值if (er)onwriteError(stream, state, sync, er, cb);else {// Check if we're actually ready to finish, but don't emit yetvar finished = needFinish(state);// 写缓存的数据if (!finished &&!state.corked &&!state.bufferProcessing &&state.bufferedRequest) {clearBuffer(stream, state);}// 异步触发drain事件if (sync) {process.nextTick(afterWrite, stream, state, finished, cb);} else {afterWrite(stream, state, finished, cb);}}
}

在state.onwrite函数中主要工作有两个:

  • 写缓冲区的数据
  • 写完缓冲区的数据后,异步触发drain事件

第一步,在clearBuffer函数中,就是取出写缓冲区(有向链表)的第一个WriteReq对象,执行doWrite函数,写入缓冲区的第一个数据;这样循环往复最终清空写缓冲区,重置一些标志位。

第二步,异步执行afterWrite函数,触发drain事件,并判断是否写操作完毕触发“finish”事件。这里之所以强调异步触发drain事件,是因为为了保证先获得write()返回值为false,给用户绑定drain处理函数的时隙,然后再触发drain事件。

至此,Writeable的重要流程已全部走通。可以看出来,在核心的write()中,判断写缓冲区是否已满并返回该值,在适当条件下缓存数据或调用_write()写数据,在Writeable实现者需要实现的** _write() 中,主要任务是数据写入方向控制,完成最基本的任务**。

总结

对比Readable的read()和_read(),我总结了下这四个函数在“读写过程”中的执行顺序与关系,如下图所示:
Readable和Writeable的函数执行顺序

转载于:https://www.cnblogs.com/accordion/p/5560531.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/395816.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

新Rider预览版发布,对F#的支持是亮点

JetBrains一直在改进自己的跨平台.NET IDE产品Rider&#xff0c;努力使其成为Visual Studio家族产品可承担职能的重要替代者。于今年四月发布的Rider预览版&#xff08;EAP 21&#xff09;提供了一些新特性&#xff0c;其中的亮点在于对函数式编程语言F#的支持。\\鉴于这是Ride…

java代码整合_java合并多个文件的实例代码

在实际项目中&#xff0c;在处理较大的文件时&#xff0c;常常将文件拆分为多个子文件进行处理&#xff0c;最后再合并这些子文件。下面就为各位介绍下Java中合并多个文件的方法。Java中合并子文件最容易想到的就是利用BufferedStream进行读写。具体的实现方式如下&#xff0c;…

正则表达式的一些规则

1.限定修饰符只对其紧前的元字符有效 String rex8 "\\d\\D"; 上式中&#xff0c;只对\\D有效&#xff0c;即有至少有1个&#xff08;1个或多个&#xff09;非数字&#xff0c;\\d仍然只许有一个数字。 2.[1,2,3]和[123]是一样的转载于:https://www.cnblogs.com/Sabr…

2016版单词的减法_在2016年最大的电影中,女性只说了27%的单词。

2016版单词的减法by Amber Thomas通过琥珀托马斯 在2016年最大的电影中&#xff0c;女性只说了27&#xff05;的单词。 (Women only said 27% of the words in 2016’s biggest movies.) Movie trailers in 2016 promised viewers so many strong female characters. Jyn Erso…

软件工程博客---团队项目---个人设计2(算法)

针对分析我们团队项目的需求&#xff0c;我们选定Dijkstra算法。 算法的基本思想&#xff1a; Dijkstra算法是由E.W.Dijkstra于1959年提出&#xff0c;又叫迪杰斯特拉算法&#xff0c;它应用了贪心算法模式&#xff0c;是目前公认的最好的求解最短路径的方法。算法解决的是有向…

UWP 杂记

UWP用选取文件对话框 http://blog.csdn.net/u011033906/article/details/65448394 文件选取器、获取文件属性、写入和读取、保存读取和删除应用数据 https://yq.aliyun.com/articles/839 UWP判断文件是否存在 http://blog.csdn.net/lindexi_gd/article/details/51387901…

微信上传素材 java_微信素材上传(JAVA)

public String uploadMaterial(String url,InputStream sbs,String filelength,String filename, String type) throws Exception {try {DataInputStream innew DataInputStream(sbs);url url.replace("TYPE", type);URL urlObj new URL(url);// 创建Http连接HttpU…

SQL Server读写分离之发布订阅

一、发布 上面有多种发布方式&#xff0c;这里我选择事物发布&#xff0c;具体区别请自行百度。 点击下一步、然后继续选择需要发布的对象。 如果需要筛选发布的数据点击添加。 根据自己的计划选择发布的时间。 点击安全设置&#xff0c;设置代理信息。 最后单击完成系统会自动…

码农和程序员的几个重要区别!

如果一个企业老板大声嚷嚷说&#xff0c;“我要招个程序员”&#xff0c;那么十之八九指的是“码农”——一种纯粹为了钱而写代码的技术人员。这其实是一种非常狭隘和错误的做法&#xff0c;原因么&#xff0c;且听我一一道来。1、码农写代码&#xff0c;程序员写系统从本质上讲…

sql server2008禁用远程连接

1.打开SQL Server 配置管理器&#xff0c;双击左边 SQL Server 网络配置&#xff0c;点击TCP/IP协议,在协议一栏中,找到 全部侦听,修改为否&#xff0c;然后点击IP地址,将IP地址为127.0.0.1(IPV4)或::1(IPV6)的已启用修改为是,其它的IP地址的已启用修改为否 注意&#xff1a;如…

snapchat注册不到_从Snapchat获得开发人员职位中学到的经验教训

snapchat注册不到Here are three links worth your time:这是三个值得您花费时间的链接&#xff1a; I just got a developer job at Snapchat. Here’s what I learned and how it can help you with your job search (15 minute read) 我刚刚在Snapchat获得开发人员职位。 这…

java bitmap jar_Java面试中常用的BitMap代码

引言阿里内推面试的时候被考了一道编程题&#xff1a;10亿个范围为1~2048的整数&#xff0c;将其去重并计算数字数目。我看到这个题目就想起来了《编程珠玑》第一章讲的叫做BitMap的数据结构&#xff0c;但是我并没有在java上实现过&#xff0c;这就比较尴尬了&#xff0c;再加…

移动端工程架构与后端工程架构的思想摩擦之旅(1)

此文已由作者黎星授权网易云社区发布。欢迎访问网易云社区&#xff0c;了解更多网易技术产品运营经验记资源投放后端工程的架构调整与优化 架构思考一直以来对软件工程架构有着极大的兴趣&#xff0c;无论是之前负责的移动端Android工程&#xff0c;亦或是现在转到后端开发后维…

View野指针问题分析报告

【问题描述】 音乐组同事反馈了一个必现Native Crash问题&#xff0c;tombstone如下&#xff1a; pid: 5028, tid: 5028, name: com.miui.player >>> com.miui.player <<< signal 11 (SIGSEGV), code 2 (SEGV_ACCERR), fault addr 79801f28r0 7ac59c98 r1 …

SicilyFunny Game

一、题目描述 Two players, Singa and Suny, play, starting with two natural numbers. Singa, the first player, subtracts any positive multiple of the lesser of the two numbers from the greater of the two numbers, provided that the resulting number must be non…

java 分布式同步_Java Web分布式集群搭建(三)——Session同步

对于一个业务系统的Tomcat集群来说&#xff0c;必须保证同一个用户访问到任一台服务器上都可以维持之前操作的身份。比如在服务器A进行了登陆&#xff0c;那么在服务器B中也要同步该用户已登录的状态&#xff0c;这里就用到了Session的同步。同步方式sticky模式、复制模式、Ter…

移动应用程序和网页应用程序_如何不完全破坏您的移动应用程序的用户界面

移动应用程序和网页应用程序by Luke Konior卢克科尼尔(Luke Konior) 如何不完全破坏您的移动应用程序的用户界面 (How to not utterly ruin your mobile app’s user interface) There’s no single universal formula for designing a great user interface (if you discover…

logging记录日志

日志是一个系统的重要组成部分&#xff0c;用以记录用户操作、系统运行状态和错误信息。日志记录的好坏直接关系到系统出现问题时定位的速度。logging模块Python2.3版本开始成为Python标准库的一部分。 日志级别 在最简单的使用中&#xff0c;我们直接导入logging模块&#xff…

C#编程之接口

1.定义 接口是把公共方法和属性组合起来&#xff0c;以封装特定功能的一个集合。&#xff08;一旦定义了接口&#xff0c;就可以在类中实现它。这样类就可以支持接口所指定的所有属性和成员&#xff09; 注意1&#xff1a;接口不能单独存在。不能像实例化一个类那样实例化一个接…

supervisor守护进程

2019独角兽企业重金招聘Python工程师标准>>> supervisor 是一个client/server系统,把不是守护进程的进程变成守护进程,并监控和控制类 Unix 操作系统上的进程。 upervisor就是用Python开发的一套通用的进程管理程序&#xff0c;能将一个普通的命令行进程变为后台dae…