我正在研究一种数据接收器,该数据接收器最终将用于nodejs中的键值对象流。

我偶然发现了双工流,并开始与它们玩耍以弄湿自己的脚,但我尝试的所有操作似乎均不起作用。

目前,我有以下双工流:

class StorageStream extends stream.Duplex {
  constructor() {
    super({
      objectMode: true
    })

    this.values = new Map();
    this.entries = this.values.entries();
  }

  _write(data, encoding, next) {
    const { key, value } = data;

    this.values.set(key, value);

    next();
  }

  _read() {
    const next = this.entries.next();

    if (next.value) {
      this.push(next.value);
    }
  }
}


这是一个SUPER CONTRIVED示例,但从本质上讲,当我向该流写入内容时,它应该将键和值存储在Map中,而当我从该流中读取时,它应该开始从映射中读取并将它们向下传递。但是,这不起作用,基本上执行以下操作

const kvstream = createKVStreamSomeHow(); // a basic, readable stream with KV Pairs

const logger = createLoggerStreamSomeHow(); // writable stream, logs the data coming through

const storage = new StorageStream();

kvstream.pipe(storage).pipe(logger);


使过程刚刚结束。因此,我想对于_read方法内部应该做的事情有些困惑。

最佳答案

从OP提供的代码中可以看到以下几点:


read()中设置任何键之前,会生成遍历this.entries = this.values.entries();返回的键的迭代器。因此,调用read()永远不会产生输出。
如果在Map中设置了新键,则不会将其推入读取缓冲区中以供后续可写操作处理


使用内置的Transform (docs)构造函数可以简化双工的实现。转换构造函数非常适合存储转发方案。

这是在这种情况下如何应用流转换的示例。请注意,pipeline()函数不是必需的,在本示例中已使用它来简化等待可读对象发出其所有数据的过程:

const { Writable, Readable, Transform, pipeline } = require('stream');

class StorageStream extends Transform {
  constructor() {
    super({
      objectMode: true
    })

    this.values = new Map();
  }

  _transform(data, encoding, next) {
    const { key, value } = data;

    this.values.set(key, value);
    console.log(`Setting Map key ${key} := ${value}`)

    next(null, data);
  }
}

(async ()=>{
  await new Promise( resolve => {
    pipeline(
      new Readable({
        objectMode: true,
        read(){
          this.push( { key: 'foo', value: 'bar' } );
          this.push( null );
        }
      }),
      new StorageStream(),
      new Writable({
        objectMode: true,
        write( chunk, encoding, next ){
          console.log("propagated:", chunk);
          next();
        }
      }),
      (error) => {
        if( error ){
          reject( error );
        }
        else {
          resolve();
        }
      }
    );
  });
})()
  .catch( console.error );


这将产生以下输出

> Setting Map key foo := bar
> propagated: { key: 'foo', value: 'bar' }


并且可以用作

kvstream.pipe(new StorageStream()).pipe(logger);

09-17 02:18