可读流

包含的事件:data,readable,end,close ,error,pause,resume

常用方法:resume,read,pipe,pause

实现可读流:

const {Readable} = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
highWaterMark: 9,
// 这里传入的read方法,会被写入_read()
read: (size) => {
// size 为highWaterMark大小
// 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),如果没有数据了,push(null)结束流
if (i < 10) {
// push其实是把数据放入缓存区
console.log(1);
rs.push(`当前读取数据: ${i++}`);
} else {
rs.push(null);
}
}
}); rs.on('readable', () => {
const data = rs.read(9)
console.log(data);
})
// 结果: 当前读取数据: 0 ..... 当前读取数据: 9

readable事件

触发时机 :当有数据可从流中读取时,就会触发readable 事件。如果流有新的动态,将会触发readable,比如push了新的内容;所以下面的代码将会触发两次readable;第二次之后缓冲区的内容多于highWaterMark,不再执行read方法,结果流没有变化,进而readable不再触发。

const {Readable} = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
highWaterMark: 9,
read: (size) => {
if (i < 2) {
rs.push(`当前读取数据: ${i++}`);
} else {
rs.push(null);
}
}
});
// 在某些情况下,为 'readable' 事件附加监听器将会导致将一些数据读入内部缓冲区,也就是会调用read方法。
rs.on('readable', () => {
console.log(rs.readableFlowing); // false 进入暂停模式
const data = rs.read(1)
console.log(data);
})
// 结果 : 当 前

当到达流数据的尽头时, readable事件也会触发,但是在end事件之前触发,上面代码修改一下:

rs.on('readable', () => {
console.log('readable');
let data = ''
while (data=rs.read(1)){
console.log(rs.readableLength);
}
console.log(data);
})
// 在缓冲区内没有数据之后rs.readableLength为0,结尾有两个readable打印出来,最后一个代表流数据到达了尽头触发了readable

readable事件表明流有新的动态:要么有新的数据,要么到达流的尽头。 对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null。

data 事件

const {Readable} = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
// 这里传入的read方法,会被写入_read()
read: (size) => {
// size 为highWaterMark大小
// 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),如果没有数据了,push(null)结束流
if (i < 6) {
rs.push(`当前读取数据: ${i++}`);
} else {
rs.push(null);
}
},
destroy(err, cb) {
rs.push(null);
cb(err);
}
}); rs.on('data', (data) => {
console.log(data);
console.log(rs.readableFlowing); // true 进入流动模式
})
const {Readable} = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
highWaterMark: 9,
read: (size) => {
if (i < 10) {
rs.push(`当前读取数据: ${i++}`);
} else {
rs.push(null);
}
}
}); rs.on('readable', () => {
const data = rs.read()
console.log(data);
console.log(rs.readableFlowing); // false
})
rs.on('data', (data) => {
console.log(rs.readableFlowing); // false
console.log(data);
}) // 即便用了data事件,因为由readable事件,可读流一直处于暂停模式

移除readable重新触发data事件

const {Duplex} = require('stream');
class Duplexer extends Duplex {
constructor(props) {
super(props);
this.data = [];
} _read(size) {
const chunk = this.data.shift();
if (chunk == 'stop') {
this.push(null);
} else {
if (chunk) {
this.push(chunk);
}
}
} _write(chunk, encoding, cb) {
this.data.push(chunk);
cb();
}
} const d = new Duplexer({allowHalfOpen: true}); d.write('...大沙发撒地方是.');
d.write('阿斯顿发斯蒂芬');
d.write('阿斯顿发斯蒂芬11');
d.write('stop');
d.end() var a = function (a) {}
d.on('readable', a);
d.on('data', function (data) {
console.log(data.toString());
}); d.removeListener('readable', a)

end 事件

只有在数据被完全消费掉后才会触发; 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。

const {Readable} = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
highWaterMark: 9,
read: (size) => {
if (i < 2) {
rs.push(`当前读取数据: ${i++}`);
} else {
rs.push(null);
}
}
});
// 在某些情况下,为 'readable' 事件附加监听器将会导致将一些数据读入内部缓冲区,也就是会调用read方法。
rs.on('readable', () => {
while (data=rs.read(1)){
console.log(rs.readableLength);
}
})
rs.on('end', () => {
console.log('end');
})

highWaterMark

执行read方法的阈值;如果缓冲区内容长度大于highWaterMark,read方法将不会执行。

const {Readable} = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
highWaterMark: 9,
read: (size) => {
if (i < 10) {
rs.push(`当前读取数据: ${i++}`);
} else {
rs.push(null);
}
}
}); rs.on('readable', () => {
console.log(rs.readableLength);
const data = rs.read(1)
console.log(data); })
// 打印结果: 9 当 17 前

解析:第二次rs.read(1)时候,缓冲区的内容长度为16,大于highWaterMark,导致不能触发内部read方法。

resume pause close事件

这个例子使用了双工流

const {Duplex} = require('stream');

class Duplexer extends Duplex {
constructor(props) {
super(props);
this.data = [];
}
_read(size) {
const chunk = this.data.shift();
if (chunk == 'stop') {
this.push(null);
} else {
if (chunk) {
this.push(chunk);
}
}
} _write(chunk, encoding, cb) {
this.data.push(chunk);
cb();
}
} const d = new Duplexer({allowHalfOpen: true}); d.write('第一行');
d.write('第二行');
d.write('第三行');
d.write('stop');
d.end() d.on('data', function (chunk) {
console.log('read: ', chunk.toString());
d.pause()
setTimeout(() => {
d.resume()
}, 2000)
}); d.on('pause',function () {
console.log('pause');
})
d.on('resume',function () {
console.log('resume');
}) d.on('close',function () {
console.log('close');
})

模式

可读流中分为2种模式流动模式和暂停模式。

暂停模式切换到流动模式i:

流动模式切换到暂停模式:

05-12 19:54