我正在研究一种数据接收器,该数据接收器最终将用于nodejs中的键值对象流。
我偶然发现了双工流,并开始与它们玩耍以弄湿自己的脚,但我尝试的所有操作似乎均不起作用。
目前,我有以下双工流:
class StorageStream extends stream.Duplex {
constructor() {
super({
objectMode: true
})
this.values = new Map();
this.entries = this.values.entries();
}
_write(data, encoding, next) {
const { key, value } = data;
this.values.set(key, value);
next();
}
_read() {
const next = this.entries.next();
if (next.value) {
this.push(next.value);
}
}
}
这是一个SUPER CONTRIVED示例,但从本质上讲,当我向该流写入内容时,它应该将键和值存储在Map中,而当我从该流中读取时,它应该开始从映射中读取并将它们向下传递。但是,这不起作用,基本上执行以下操作
const kvstream = createKVStreamSomeHow(); // a basic, readable stream with KV Pairs
const logger = createLoggerStreamSomeHow(); // writable stream, logs the data coming through
const storage = new StorageStream();
kvstream.pipe(storage).pipe(logger);
使过程刚刚结束。因此,我想对于
_read
方法内部应该做的事情有些困惑。 最佳答案
从OP提供的代码中可以看到以下几点:
在read()
中设置任何键之前,会生成遍历this.entries = this.values.entries();
返回的键的迭代器。因此,调用read()永远不会产生输出。
如果在Map中设置了新键,则不会将其推入读取缓冲区中以供后续可写操作处理
使用内置的Transform (docs)构造函数可以简化双工的实现。转换构造函数非常适合存储转发方案。
这是在这种情况下如何应用流转换的示例。请注意,pipeline()
函数不是必需的,在本示例中已使用它来简化等待可读对象发出其所有数据的过程:
const { Writable, Readable, Transform, pipeline } = require('stream');
class StorageStream extends Transform {
constructor() {
super({
objectMode: true
})
this.values = new Map();
}
_transform(data, encoding, next) {
const { key, value } = data;
this.values.set(key, value);
console.log(`Setting Map key ${key} := ${value}`)
next(null, data);
}
}
(async ()=>{
await new Promise( resolve => {
pipeline(
new Readable({
objectMode: true,
read(){
this.push( { key: 'foo', value: 'bar' } );
this.push( null );
}
}),
new StorageStream(),
new Writable({
objectMode: true,
write( chunk, encoding, next ){
console.log("propagated:", chunk);
next();
}
}),
(error) => {
if( error ){
reject( error );
}
else {
resolve();
}
}
);
});
})()
.catch( console.error );
这将产生以下输出
> Setting Map key foo := bar
> propagated: { key: 'foo', value: 'bar' }
并且可以用作
kvstream.pipe(new StorageStream()).pipe(logger);