今天小编就为大家分享一篇关于Node.js Stream ondata触发时机与顺序的探索,写的十分的全面细致,具有一定的参考价值,对此有需要的朋友可以参考学习下。如有不足之处,欢迎批评指正。
无用逻辑
当时研究pipe细节是基于Node.js v8.11.1的源码,其中针对上游的ondata事件处理有如下一段代码:
// If the user pushes more data while we're writing to dest then we'll end up
// in ondata again. However, we only want to increase awaitDrain once because
// dest will only emit one 'drain' event for the multiple writes.
// => Introduce a guard on increasing awaitDrain.
var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {
if (((state.pipesCount === 1 && state.pipes === dest) ||
(state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
!cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);
src._readableState.awaitDrain++;
increasedAwaitDrain = true;
}
//在此我向大家推荐一个前端全栈开发交流圈:619586920 突破技术瓶颈,提升思维能力
src.pause();
}
}
重点关注increasedAwaitDrain变量,理解这个变量期望达到什么目的,然后仔细阅读代码,会发现if (false === ret && !increasedAwaitDrain)语句中increasedAwaitDrain变量肯定是false,因为前一行才将该变量赋值为false,这样一来这个变量就变得毫无意义。
increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {}
以上就是关键的三行代码,因为Node.js是单线程且dest.write(chunk)内部没有修改变量increasedAwaitDrain
的值,那么if语句中increasedAwaitDrain
的值肯定还是false,即increasedAwaitDrain
相关逻辑没有达到所期望的目标。
无用代码出现的原因
前段虽已经分析出increasedAwaitDrain
没起到作用,但作者为什么写了这样一段逻辑呢?其实在定义increasedAwaitDrain语句的上方,作者说可能存在这样一种情况:“当我们接收到一次上游的ondata事件并尝试将数据写到下游时,上游可能同时又有一个data事件触发,而这两个ondata的数据在写入下游时可能都返回false,从而导致src._readableState.awaitDrain++
执行两次”。awaitDrain++执行两次是作者不希望看到的情况,因为下游触发drain事件时awaitDrain
相应减1,直到其值为0时才让上游重新流动,如果awaitDrain++
执行两次,下游却只触发一次drain
事件,awaitDrain
就不会为0,上游不重新流动也就无法继续读取数据。
真相的探索过程
虽然从理性上认为increasedAwaitDrain
没起到作用,但也无法肯定加绝对,自己尝试去求助,没有出现高手指点出问题所在,但一个同事听我描述后,说可能这就是个BUG,虽心中觉得可能性不大,但还是抱着试试看的心态切换到master分支上去瞅瞅,随即发现最新的代码里并没有与increasedAwaitDrain
类似的逻辑,间接说明v8.11.1分支上increasedAwaitDrain
相关逻辑的确无用。虽然比较肯定这里存在一段无用代码,但应该如何理解作者在increasedAwaitDrain
上方的注释呢?为了进一步揭露真相,自己继续花时间去看了看stream.Readable
相关代码,想知道data事件的触发时机与顺序是如何决定的。
readable流的简单原理
在进一步解释data事件的触发顺序前,简单讲一下readable流的实现原理,如果需要自己实现一个readable
流,可以使用new stream.Readable(options)
方法,其中options可包含四个属性:highWaterMark、encoding、objectMode、read。最主要的是read属性,当流的使用者需要数据时,read方法被用来从数据源获取数据,然后通过this.push(chunk)将数据传递给使用者,如果没有更多数据可供读取时使用this.push(null)表示读取结束。
const Readable = require('stream').Readable;
let letter = 'ABCDEFG'.split('');
let index = 0;
const rs = new Readable({
read(size) {
this.push(letter[index++] || null);
}
});
rs.on('data', chunk => {
console.log(chunk.toString());
});
//在此我向大家推荐一个前端全栈开发交流圈:619586920 突破技术瓶颈,提升思维能力
// 输出
// A
// B
// C
// ...
这里ondata虽然没有明显调用read方法,但内部依旧是通过调用read方法结合this.push输出数据,并且在源代码内部可以发现通过参数传递的read方法实际上被赋值给this._read,然后在Readable.prototype.read中调用this._read获取数据。
灵魂代码
为了进一步说明stream.Readable的data事件触发顺序与场景,将有关官方源码经过修改和删减成如下:
function Readable(options) {
this._read = options.read; // 将参数传递的read函数赋值到this._read
}
// 使用者通过调用read方法获取数据
Readable.prototype.read = function (size) {
var state = this._readableState;
// 模拟锁,一次_read如果没有返回(this.push),后续read不会继续调用_read读取数据
if (!state.reading) {
state.reading = true;
state.sync = true; // sync用于在push方法中指示_read内部是否同步调用了push
this._read(size);
state.sync = false;
}
// _read内部如果是同步调用push,数据会放入缓冲区
// _read内部如果是异步调用push且缓冲区没有内容,数据可能emit data返回
// 尝试从缓冲区(state.buffer)中获取大小为size的数据,如果获取成功则触发data事件
if (ret)
this.emit('data', ret);
return ret;
};
// 在this._read执行过程中通过this.push输出数据
Readable.prototype.push = function (chunk, encoding) {
var state = this._readableState;
// 本次_read获取到数据,打开锁
state.reading = false;
// 流动模式 & 缓冲区没有数据 & 非同步返回,则直接触发data事件
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0); // 触发下一次读取,_read异步push的话还是会到这里,类似flow中的保持流出于流动
}
else {
// 将数据放入缓冲区
state.length += chunk.length;
state.buffer.push(chunk);
}
};
// 暂停流动
Readable.prototype.pause = function() {
if (this._readableState.flowing !== false) {
this._readableState.flowing = false;
this.emit('pause');
}
return this;
};
function flow(stream) {
const state = stream._readableState;
while (state.flowing && stream.read() !== null);
}
//在此我向大家推荐一个前端全栈开发交流圈:619586920 突破技术瓶颈,提升思维能力
data事件的触发时机与顺序
时机
data的触发只有两处:
- 流如果处于流动模式 & 缓冲区没有数据 & 异步调用push,此时数据不经过缓冲区,直接触发data事件
- 不满足上述情况时,push的数据会被放入缓冲区,然后再尝试从缓冲区读取指定size的数据并触发data事件
顺序
关于data的触发顺序,实际是由emit顺序决定,为讨论原始问题:“increasedAwaitDrain相关逻辑为什么可以被删除?”,将代码简化:
let count = 0;
src.on('data', chunk => {
let ret = dest.write(chunk);
if (!ret) {
count++;
src.pause();
}
});
当监听流的data事件时,流最终会通过resume并调用flow函数进入流动模式模式,即不断的调用read方法读取数据。接下来分析以下几种场景,当dest.write(chunk)返回false时++count会执行几次,注意结合前文的灵魂代码。
//在此我向大家推荐一个前端全栈开发交流圈:619586920 突破技术瓶颈,提升思维能力
场景一:每次_read同步push一次数据
场景二:每次_read异步push一次数据
场景三:每次_read多次同步push数据
场景四:每次_read多次异步push数据
场景五:_read操作可能同步或异步push
小结
文章最终写出来的内容与我最开始的初衷所偏离,而且自己不知道如何评价这篇文章的好坏,但为了写这文章花了两天业余时间去深入理解stream.Readable却是非常有收获的一件事情,更坚定自己在写文章的路途上可以走的更远。PS:猜测为什么有烂电影的存在,可能是因为导演长时间投入的创作会让他迷失在内部而无法发现问题,写文章也是,难以通过阅读去优化费心思写的文章。
PS:下图是美团博客的,也许我写了这么多却抵不上这张图,说明方式很重要。
结语
感谢您的观看,如有不足之处,欢迎批评指正。