nodejs stream 手册

https://github.com/jabez128/stream-handbook

在node中,流可以帮助我们将事情的重点分为几份,因为使用流可以帮助我们将实现接口的部分分割成一些连续的接口,这些接口都是可重用的。接着,你可以将一个流的输出口接到另一个流的输入口,然后使用使用一些库来对流实现高级别的控制。(metamask中就是这样,使用了很多流,希望这几天的学习能够让我真正搞懂metamask几个部分之间到底是怎么相互工作的)

为什么应该使用流

在node中,I/O都是异步的,所以在和硬盘以及网络的交互过程中会涉及到传递回调函数的过程。你之前可能会写出这样的代码:

var http = require('http');//node自带
var fs = require('fs'); var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);//浏览器上显示
console.log(data);//终端上显示
});
});
server.listen();

上面的这段代码并没有什么问题,但是在每次请求时,我们都会把整个data.txt文件读入到内存中,然后再把结果返回给客户端。想想看,如果data.txt文件非常大,在响应大量用户的并发请求时,程序可能会消耗大量的内存,这样很可能会造成用户连接缓慢的问题。

其次,上面的代码可能会造成很不好的用户体验,因为用户在接收到任何的内容之前首先需要等待程序将文件内容完全读入到内存中。

所幸的是,(req,res)参数都是流对象,这意味着我们可以使用一种更好的方法来实现上面的需求:

var http = require('http');
var fs = require('fs'); var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
console.log(stream);
});
server.listen();

console.log(stream)结果为:

ReadStream {
_readableState:
ReadableState {
objectMode: false,
highWaterMark: ,
buffer: BufferList { length: },
length: ,
pipes:
ServerResponse {
_events: [Object],
_eventsCount: ,
_maxListeners: undefined,
output: [],
outputEncodings: [],
outputCallbacks: [],
outputSize: ,
writable: true,
_last: false,
chunkedEncoding: false,
shouldKeepAlive: true,
useChunkedEncodingByDefault: true,
sendDate: true,
_removedConnection: false,
_removedContLen: false,
_removedTE: false,
_contentLength: null,
_hasBody: true,
_trailer: '',
finished: false,
_headerSent: false,
socket: [Socket],
connection: [Socket],
_header: null,
_onPendingData: [Function: bound updateOutgoingData],
_sent100: false,
_expect_continue: false,
[Symbol(isCorked)]: false,
[Symbol(outHeadersKey)]: null },
pipesCount: ,
flowing: true,
ended: false,
endEmitted: false,
reading: false,
sync: true,
needReadable: false,
emittedReadable: false,
readableListening: false,
resumeScheduled: true,
emitClose: false,
destroyed: false,
defaultEncoding: 'utf8',
awaitDrain: ,
readingMore: false,
decoder: null,
encoding: null },
readable: true,
_events:
{ end: [ [Function], [Function] ], data: [Function: ondata] },
_eventsCount: ,
_maxListeners: undefined,
path: '/Users/user/stream-learning/data.txt',
fd: null,
flags: 'r',
mode: ,
start: undefined,
end: Infinity,
autoClose: true,
pos: undefined,
bytesRead: ,
closed: false }

在这里,.pipe()方法会自动帮助我们监听dataend事件。上面的这段代码不仅简洁,而且data.txt文件中每一小段数据都将源源不断的发送到客户端。

除此之外,使用.pipe()方法还有别的好处,比如说它可以自动控制后端压力,以便在客户端连接缓慢的时候node可以将尽可能少的缓存放到内存中。

想要将数据进行压缩?我们可以使用相应的流模块完成这项工作!

var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor'); var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
    console.log(oppressor(req));
    console.log(stream);
});
server.listen();

返回:

Stream {//oppressor(req)
_events:
{ setHeader: [Function],
writeHead: [Function],
response: [Function] },
_eventsCount: ,
_maxListeners: undefined,
writable: true,
readable: true,
write: [Function: bound ],
end: [Function: bound ],
destroy: [Function: bound destroy],
pause: [Function: bound ],
resume: [Function: bound ],
pipe: [Function],
statusCode: ,
writeContinue: [Function],
writeHead: [Function],
setHeader: [Function],
sendDate: [Function],
getHeader: [Function],
removeHeader: [Function],
addTrailers: [Function] } ReadStream {//stream
_readableState:
ReadableState {
objectMode: false,
highWaterMark: ,
buffer: BufferList { length: },
length: ,
pipes:
Stream {
_events: [Object],
_eventsCount: ,
_maxListeners: undefined,
writable: true,
readable: true,
write: [Function: bound ],
end: [Function: bound ],
destroy: [Function: bound destroy],
pause: [Function: bound ],
resume: [Function: bound ],
pipe: [Function],
statusCode: [Getter/Setter],
writeContinue: [Function],
writeHead: [Function],
setHeader: [Function],
sendDate: [Function],
getHeader: [Function],
removeHeader: [Function],
addTrailers: [Function] },
pipesCount: ,
flowing: true,
ended: false,
endEmitted: false,
reading: false,
sync: true,
needReadable: false,
emittedReadable: false,
readableListening: false,
resumeScheduled: true,
emitClose: false,
destroyed: false,
defaultEncoding: 'utf8',
awaitDrain: ,
readingMore: false,
decoder: null,
encoding: null },
readable: true,
_events:
{ end: [ [Function], [Function] ], data: [Function: ondata] },
_eventsCount: ,
_maxListeners: undefined,
path: '/Users/user/stream-learning/data.txt',
fd: null,
flags: 'r',
mode: ,
start: undefined,
end: Infinity,
autoClose: true,
pos: undefined,
bytesRead: ,
closed: false }

通过上面的代码,我们成功的将发送到浏览器端的数据进行了gzip压缩。我们只是使用了一个oppressor模块来处理这件事情。

一旦你学会使用流api,你可以将这些流模块像搭乐高积木或者像连接水管一样拼凑起来,从此以后你可能再也不会去使用那些没有流API的模块获取和推送数据了。

流模块基础

在node中,一共有五种类型的流:readable,writable,transform,duplex以及"classic"

pipe

无论哪一种流,都会使用.pipe()方法来实现输入和输出。

.pipe()函数很简单,它仅仅是接受一个源头src并将数据输出到一个可写的流dst中:

src.pipe(dst)

.pipe(dst)将会返回dst因此你可以链式调用多个流:

a.pipe(b).pipe(c).pipe(d)

上面的代码也可以等价为:

a.pipe(b);
b.pipe(c);
c.pipe(d);

这和你在unix中编写流代码很类似:

a | b | c | d

只不过此时你是在node中编写而不是在shell中!

readable流

Readable流可以产出数据,你可以将这些数据传送到一个writable,transform或者duplex流中,只需要调用pipe()方法:

readableStream.pipe(dst)

创建一个readable流

现在我们就来创建一个readable流!

var http = require('http');
var fs = require('fs');
var Readable = require('stream').Readable; var server = http.createServer(function (req, res) {
var rs = new Readable;
console.log(rs);
rs.push('beep ');
rs.push('boop\n');
rs.push(null);//告诉rs输出数据应该结束了
console.log(rs);
rs.pipe(res);
});
server.listen();

返回:

Readable {
_readableState:
ReadableState {
objectMode: false,
highWaterMark: ,
buffer: BufferList { length: },
length: ,
pipes: null,
pipesCount: ,
flowing: null,
ended: false,
endEmitted: false,
reading: false,
sync: true,
needReadable: false,
emittedReadable: false,
readableListening: false,
resumeScheduled: false,
emitClose: true,
destroyed: false,
defaultEncoding: 'utf8',
awaitDrain: ,
readingMore: false,
decoder: null,
encoding: null },
readable: true,
_events: {},
_eventsCount: ,
_maxListeners: undefined }
Readable {
_readableState:
ReadableState {
objectMode: false,
highWaterMark: ,
buffer: BufferList { length: },//push进来的beep boop存放在BufferList缓存中
length: ,
pipes: null,
pipesCount: ,
flowing: null,
ended: true,
endEmitted: false,
reading: false,
sync: true,
needReadable: false,
emittedReadable: true,
readableListening: false,
resumeScheduled: false,
emitClose: true,
destroyed: false,
defaultEncoding: 'utf8',
awaitDrain: ,
readingMore: true,
decoder: null,
encoding: null },
readable: true,
_events: {},
_eventsCount: ,
_maxListeners: undefined }

需要注意的一点是我们在将数据输出到process.stdout之前已经将内容推送进readable流rs中,但是所有的数据依然是可写的。

这是因为在你使用.push()将数据推进一个readable流中时,一直要到另一个东西来消耗数据之前,数据都会存在一个缓存中。

然而,在更多的情况下,我们想要的是当需要数据时数据才会产生,以此来避免大量的缓存数据。

我们可以通过定义一个._read函数(即内部函数)来实现按需推送数据:

var http = require('http');
var fs = require('fs');
var Readable = require('stream').Readable; var server = http.createServer(function (req, res) {
var rs = new Readable;
var c = ;
console.log(rs);
rs._read = function () {//通过重写_read()方法实现了只有在数据接受者请求数据才向可读流中压入数据
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt()) rs.push(null);
};
console.log(rs);
rs.pipe(res);
});
server.listen();

返回:

Readable {
_readableState:
ReadableState {
objectMode: false,
highWaterMark: ,
buffer: BufferList { length: },
length: ,
pipes: null,
pipesCount: ,
flowing: null,
ended: false,
endEmitted: false,
reading: false,
sync: true,
needReadable: false,
emittedReadable: false,
readableListening: false,
resumeScheduled: false,
emitClose: true,
destroyed: false,
defaultEncoding: 'utf8',
awaitDrain: ,
readingMore: false,
decoder: null,
encoding: null },
readable: true,
_events: {},
_eventsCount: ,
_maxListeners: undefined }
Readable {
_readableState:
ReadableState {
objectMode: false,
highWaterMark: ,
buffer: BufferList { length: },//这里与上面不同,可以看出并没有存放在缓存中
length: ,
pipes: null,
pipesCount: ,
flowing: null,
ended: false,
endEmitted: false,
reading: false,
sync: true,
needReadable: false,
emittedReadable: false,
readableListening: false,
resumeScheduled: false,
emitClose: true,
destroyed: false,
defaultEncoding: 'utf8',
awaitDrain: ,
readingMore: false,
decoder: null,
encoding: null },
readable: true,
_events: {},
_eventsCount: ,
_maxListeners: undefined,
_read: [Function] }//自定义的_read函数

在这里我们将字母az推进了rs中,但是只有当数据消耗者出现时,数据才会真正实现推送。

出错:

如果将_read函数的_去掉,通过浏览器连接服务器访问时会出现错误:

events.js:
throw er; // Unhandled 'error' event
^ Error [ERR_STREAM_PUSH_AFTER_EOF]: stream.push() after EOF
at readableAddChunk (_stream_readable.js::)
at Readable.push (_stream_readable.js::)
at Readable.rs.read (/Users/user/stream-learning/index.js::)
at flow (_stream_readable.js::)
at resume_ (_stream_readable.js::)
at process._tickCallback (internal/process/next_tick.js::)
Emitted 'error' event at:
at readableAddChunk (_stream_readable.js::)
at Readable.push (_stream_readable.js::)
[... lines matching original stack trace ...]
at process._tickCallback (internal/process/next_tick.js::)
Program node --harmony index.js exited with code
ERR_STREAM_PUSH_AFTER_EOF的意思是:
An attempt was made to call stream.push() after a null(EOF,end of file,表示文件末尾) had been pushed to the stream.
表示在rs.push(null)之后,还是有数据继续push,这是因为非内部函数的read在遇见push(null)后并没有退出该函数的原因

_read函数也可以获取一个size参数来指明消耗者想要读取多少比特的数据,但是这个参数是可选的。

需要注意到的是你可以使用util.inherit()来继承一个Readable流。

为了说明只有在数据消耗者出现时,_read函数才会被调用,我们可以将上面的代码简单的修改一下

为了说明只有在数据接受者请求数据时_read()方法才被调用,我们在向可读流压入数据时做一个延时setTimeout:

var Readable = require('stream').Readable;
var rs = new Readable;
var c = 97 - 1; rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);     setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
}; rs.pipe(process.stdout);//1 当head的5或7次结束后,操作系统将会发送一个SIGPIPE信号 process.on('exit', function () {//3 下面的error的process.exit将会触发这个exit事件,然后在终端输出下面的语句
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);// 2 此时process.stdout将会捕获到一个EPIPE错误

运行上面的代码我们可以发现如果我们只请求5比特的数据,那么_read只会运行5次:

userdeMacBook-Pro:stream-learning user$ node index.js | head -c5  //-c5即运行5次
abcde
_read() called 5 times
userdeMacBook-Pro:stream-learning user$ node index.js | head -c7
abcdefg
_read() called 7 times

在上面的代码中,setTimeout很重要,因为操作系统需要花费一些时间来发送程序结束信号。

另外,process.stdout.on('error',fn)处理器也很重要,因为当head不再关心我们的程序输出时,操作系统将会向我们的进程发送一个SIGPIPE信号,此时process.stdout将会捕获到一个EPIPE错误。

上面这些复杂的部分在和操作系统相关的交互中是必要的,但是如果你直接和node中的流交互的话,则可有可无。

如果你创建了一个readable流,并且想要将任何的值推送到其中的话,确保你在创建流的时候指定了objectMode参数,Readable({ objectMode: true })

消耗一个readable流

大部分时候,将一个readable流直接pipe到另一种类型的流或者使用through或者concat-stream创建的流中,是一件很容易的事情。但是有时我们也会需要直接来消耗一个readable流。

process.stdin.on('readable', function () {//从echo得到输入流后直接在终端输出以消耗,sleep是停止1秒
var buf = process.stdin.read();
console.dir(buf);
});

返回:

userdeMacBook-Pro:stream-learning user$ (echo abc; sleep ; echo def; sleep ; echo ghi) | node index.js
Buffer [Uint8Array] [ , , , ]
Buffer [Uint8Array] [ , , , ]
Buffer [Uint8Array] [ , , , ]
null

当数据可用时,readable事件将会被触发,此时你可以调用.read()方法来从缓存中获取这些数据。

当流结束时,.read()将返回null,因为此时已经没有更多的字节可以供我们获取了。

你也可以告诉.read()方法来返回n个字节的数据。虽然所有核心对象中的流都支持这种方式,但是对于对象流来说这种方法并不可用。

下面是一个例子,在这里我们制定每次读取3个字节的数据:

process.stdin.on('readable', function () {
var buf = process.stdin.read();//之前一次输入为3个字节,但是还有一个字节做结束符,即10
console.dir(buf);//这里改成3个后,结束字符将会留到下一次输出
});

返回:

userdeMacBook-Pro:stream-learning user$ (echo abc; sleep ; echo def; sleep ; echo ghi) | node index.js
Buffer [Uint8Array] [ , , ]
Buffer [Uint8Array] [ , , ]
Buffer [Uint8Array] [ , , ]
Buffer [Uint8Array] [ , , ]

writable流

一个writable流指的是只能流进不能流出的流:

src.pipe(writableStream)

创建一个writable流

只需要定义一个._write(chunk,enc,next)函数,你就可以将一个readable流的数据释放到其中:

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
}; process.stdin.pipe(ws);

代码运行结果如下所示:

userdeMacBook-Pro:stream-learning user$ (echo beep; sleep ; echo boop) | node index.js
Buffer [Uint8Array] [ , , , , ]
Buffer [Uint8Array] [ , , , , ]

第一个参数,chunk代表写进来的数据,readable流。

第二个参数enc代表编码的字符串,但是只有在opts.decodeStringfalse的时候你才可以写一个字符串。

第三个参数,next(err)是一个回调函数,使用这个回调函数你可以告诉数据消耗者可以写更多的数据。你可以有选择性的传递一个错误对象error,这时会在流实体上触发一个emit事件。

在从一个readable流向一个writable流传数据的过程中,数据会自动被转换为Buffer对象,除非你在创建writable流的时候制定了decodeStrings参数为false,Writable({decodeStrings: false})

如果你需要传递对象,需要指定objectMode参数为trueWritable({ objectMode: true })

向一个writable流中写东西

如果你需要向一个writable流中写东西,只需要调用.write(data)即可。

process.stdout.write('beep boop\n');

为了告诉一个writable流你已经写完毕了,只需要调用.end()方法。你也可以使用.end(data)在结束前再写一些数据。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt'); ws.write('beep '); setTimeout(function () {
ws.end('boop\n');
}, );

返回:

userdeMacBook-Pro:stream-learning user$ node index.js
userdeMacBook-Pro:stream-learning user$ cat message.txt
beep boop

如果你需要调整内部缓冲区大小,那么需要在创建可写流对象时设置highWaterMark。在调用.write()方法返回false时,说明写入的数据大小超过了该值。

为了避免读写速率不匹配而造成内存上涨,可以监听drain事件,等待可写流内部缓存被清空再继续写入。

transform流

你可以将transform流想象成一个流的中间部分,它可以读也可写,但是并不保存数据,它只负责处理流经它的数据。

duplex流

Duplex流是一个可读也可写的流,就好像一个电话,可以接收也可以发送语音。一个rpc交换是一个duplex流的最好的例子。如果你看到过下面这样的代码:

a.pipe(b).pipe(a)

那么你需要处理的就是一个duplex流对象。

classic流

Classic流是一个古老的接口,最早出现在node 0.4中。虽然现在不怎么用,但是我们最好还是来了解一下它的工作原理。

无论何时,只要一个流对象注册了一个data监听器,它就会自动的切换到classic模式,并且根据旧API的方式运行。

classic readable流

Classic readable流只是一个事件发射器,当有数据消耗者出现时发射emit事件,当输出数据完毕时发射end事件。

我们可以同构检查stream.readable来检查一个classic流对象是否可读。

下面是一个简单的readable流对象的例子,程序的运行结果将会输出AJ

var Stream = require('stream');
var stream = new Stream;
stream.readable = true; var c = ;
var iv = setInterval(function () {//隔100ms运行这个函数一次
if (++c >= ) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, ); stream.pipe(process.stdout);

运行结果如下所示:

userdeMacBook-Pro:stream-learning user$ node index.js //结果是一个个出来的
ABCDEFGHIJ

为了从一个classic readable流中读取数据,你可以注册dataend监听器。下面是一个使用旧readable流方式从process.stdin中读取数据的例子:

process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});

运行结果如下所示:

ABCDEFGHIJuserdeMacBook-Pro:stream-learning user$ (echo beep; sleep ; echo boop) | node index.js
<Buffer 0a>
<Buffer 6f 6f 0a>
__END__

需要注意的一点是当你在一个流对象上注册了一个data监听器,你就将这个流放在了兼容模式下,此时你不能使用两个stream2的api。

如果你自己创建流对象,永远不要绑定dataend监听器。如果你需要和旧版本的流兼容,最好使用第三方库来实现.pipe()方法。

例如,你可以使用through模块来避免显式的使用dataend监听器:

var through = require('through');
process.stdin.pipe(through(write, end)); function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}

运行结果是一样的

你也可以使用concat-stream模块来将整个流的内容缓存起来:

var concat = require('concat-stream');//install
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));

返回:

userdeMacBook-Pro:stream-learning user$ echo '{"beep":"boop"}' | node index.js
{ beep: 'boop' }

Classic readable流拥有.pause().resume()逻辑来暂停一个流,但是这都是可选的。如果你想要使用.pause().resume()方法,你应该使用through模块来帮助你处理缓存。

classic writable流

Classic writable流非常简单。其中只定义了.write(buf).end(buf),以及.desctory()方法。其中.end(buf)的参数buf是可选参数,但是一般来说node程序员还是喜欢使用.end(buf)这种写法。

然后接下来就是去看自己之前写的node文档学习的stream部分——stream-nodejs,本博客nodejs-stream部分

之前看过现在基本上没有什么感觉了,好好回顾一下,把stream部分学好

04-13 20:58