我有一个由sourceStream对象组成的BaseData

我想将此流分叉到n个不同的流中,然后对每个BaseData对象进行过滤和转换,使其符合自己的喜好。

最后,我想让n流仅包含特定类型,并且分叉的流的长度可以变化,因为将来可能会删除或添加数据。

我以为可以通过fork设置它:

import * as _ from 'highland';

interface BaseData {
    id: string;
    data: string;
}

const sourceStream = _([
    {id: 'foo', data: 'poit'},
    {id: 'foo', data: 'fnord'},
    {id: 'bar', data: 'narf'}]);

const partners = [
    'foo',
    'bar',
];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream.fork();

    partnerStream.filter((baseData: BaseData) => {
        return baseData.id === partner;
    });

    partnerStream.each(console.log);
});


我希望现在有两个流,而foo -stream包含两个元素:

{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }


bar -stream包含一个元素:

{ id: 'bar', data: 'narf' }


但是我得到一个错误:

/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
        throw new Error(
        ^

Error: Stream already being consumed, you must either fork() or observe()
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
    at Array.forEach (native)
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
    at Module._compile (module.js:570:32)
    at Object.Module._extensions..js (module.js:579:10)
    at Module.load (module.js:487:32)
    at tryModuleLoad (module.js:446:12)


如何将一个流分成多个流?



我也尝试过链接这些调用,但是随后我只返回一个流的结果:

partners.forEach((partner: string) => {
    console.log(partner);
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
        });

    partnerStream.each((item: BaseData) => {
        console.log(item);
    });
});


仅打印:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar


而不是预期的:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}




可能我误解了fork的全部原因。根据its doc entry


  Stream.fork()派生流,允许您添加其他使用者
  具有共同的背压。分叉给多个消费者的流将
  只能从最慢的消费者那里以最快的速度从其来源中获取价值
  处理他们。
  
  注意:不要依赖于分支之间的一致执行顺序。
  此转换仅保证所有派生将处理值foo
  在此之前,将处理第二个值栏。它不保证
  分叉处理foo的顺序。
  
  提示:在修改分叉中的流值时要小心(或
  使用这样做的库)。由于相同的值将传递给
  每个分叉,在一个分叉中所做的更改将在任何分叉中可见
  之后执行。此外,执行顺序不一致,并且
  您可能会遇到细微的数据损坏错误。如果需要修改
  任何值,您都应制作副本并修改副本。
  
  弃用警告:目前可以在
  消费(例如通过转换)。这将不再可能
  在下一个主要版本中。如果您要分流,请始终
  叫叉。


因此,而不是“如何派生流?”我的实际问题可能是:如何将高地溪流即时复制成不同的溪流?

最佳答案

partnerStream.filter()返回一个新的流。然后,您可以使用partnerStream再次使用partnerStream.each(),而无需调用fork()observe()。因此,要么链接partnerStream.filter().each()调用,要么将partnerStream.filter()的返回值分配给变量,然后在该变量上调用.each()

09-17 20:38