多个ReadableStreams传递到单个WriteStrea

多个ReadableStreams传递到单个WriteStrea

本文介绍了如何将多个ReadableStreams传递到单个WriteStream?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理一个防火墙限制,我一次只能发布10MB的内存.为了处理较大的上载,我想使用类似 http://www.resumablejs.com 之类的东西. ,将多个块写入磁盘,最后将它们重新组合.

I'm dealing with a firewall limit where I can only POST 10MB at a time. In order to handle larger uploads, I'd like to use something like http://www.resumablejs.com, write multiple chunks to disk, and recombine them at the end.

我现在正在编写测试,但是我的实现中有问题.

I'm just writing tests now, but something in my implementation is wrong.

首先,我像这样分割文件:

First, I split the file like this:

    const splitFile = async () => {
      const chunkSize = 1024 * 1024;
      const photo = fs.createReadStream(path.resolve(FIXTURES, 'hello-tron.jpg'));

      // Write to 2 files
      photo.on('readable', function() {
        const file1 = path.resolve(TEMP, 'chunk.jpg.1');
        const file2 = path.resolve(TEMP, 'chunk.jpg.2');
        let data;
        while (data = this.read(chunkSize)) {
          if (!fs.existsSync(file1)) {
            const output1 = fs.createWriteStream(file1);
            output1.write(data);
            output1.close();
            return;
          }
          const output2 = fs.createWriteStream(file2);
          output2.write(data);
          if (data === null) {
            output2.close();
          }
        }
      });

      return new Promise(resolve => {
        photo.on('end', resolve);
      });
    };

然后我像这样重新组装它:

Then I reassemble it like this:

const recombine = async () => {
  const output = fs.createWriteStream(path.resolve(TEMP, 'recombined.jpg'));
  const file1 = path.resolve(TEMP, 'chunk.jpg.1');
  const file2 = path.resolve(TEMP, 'chunk.jpg.2');
  return new Promise(resolve => {
    const stream1 = fs.createReadStream(file1);
    const stream2 = fs.createReadStream(file2);

    const recombinator = new Recombinator({
      readables: [stream1, stream2]
    });

    stream1.on('readable', () => {
      stream2.on('readable', () => {
        recombinator.pipe(output);
      });
    });

    stream1.on('end', () => {
      stream2.on('end', () => {
        resolve();
      });
    });
  })
};

这是Recombinator类:

/* Takes multiple readable streams and returns a single
 * readable stream that can be piped to a writable stream
 */
const {Readable} = require('stream');

class Recombinator extends Readable {
  constructor(opts) {
    super({...opts, readables: undefined});
    const self = this;
    self.readables = opts.readables || [];
  }

  _read(size) {
    this.push(this._getChunk(size));
  }

  _getChunk(size) {
    const reader = this.readables.find(r => !r.closed);
    if (!reader) {
      return null;
    }
    const data = reader.read(size);
    if (!data) {
      reader.closed = true;
      return this._getChunk(size);
    }
    return data;
  }
}

module.exports = Recombinator;

这是原始图片:

这是重新组合的图像:

推荐答案

部分问题在于认为readable事件仅触发一次,但每次要读取数据时都会触发.嵌套事件处理程序也可能不是很好.

Part of the problem was in thinking that the readable event is only fired once but it's fired each time there's data to be read. Nesting the event handlers probably wasn't great either.

我正在使用的解决方案是像这样更改Recombinator构造函数:

The solution I'm using is to change the Recombinator constructor like so:

constructor(opts) {
    super({...opts, readables: undefined});
    const self = this;
    self.readables = opts.readables || [];

    self._readableCount = 0;
    self._endedCount = 0;

    // Attach listeners to know when all readables are open and closed
    self.readables.forEach(r => {
      r.on('readable', () => {
        if (r._markedReadable) {
          return;
        }
        r._markedReadable = true;
        self._readableCount++;
      });
      r.on('end', () => {
        if (r._markedEnded) {
          return;
        }
        r._markedEnded = true;
        self._endedCount++;
      });
    })
  }

并添加异步方法,以便等到所有阅读器都打开后,像这样:

and adding async methods in order to wait until all readers are open like so:

  async ready(retry = 10) {
    if (this._readableCount === this.readables.length) {
      return Promise.resolve();
    }
    if (retry === 0) {
      return Promise.reject(`Timeout waiting for ${this.readables.length} readables to open - got ${this._readableCount}`);
    }
    await delay(500);
    return this.ready(retry - 1);
  }

  async done(retry = 10) {
    if (this._endedCount === this.readables.length) {
      return Promise.resolve();
    }
    if (retry === 0) {
      return Promise.reject(`Timeout waiting for ${this.readables.length} readables to end - got ${this._endedCount}`);
    }
    await delay(500);
    return this.done(retry - 1);
  }

这篇关于如何将多个ReadableStreams传递到单个WriteStream?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-24 13:07