我有一个由sourceStream
对象组成的BaseData
。
我想将此流分叉到n
个不同的流中,然后对每个BaseData
对象进行过滤和转换,使其符合自己的喜好。
最后,我想让n
流仅包含特定类型,并且分叉的流的长度可以变化,因为将来可能会删除或添加数据。
我以为可以通过fork
设置它:
import * as _ from 'highland';
interface BaseData {
id: string;
data: string;
}
const sourceStream = _([
{id: 'foo', data: 'poit'},
{id: 'foo', data: 'fnord'},
{id: 'bar', data: 'narf'}]);
const partners = [
'foo',
'bar',
];
partners.forEach((partner: string) => {
const partnerStream = sourceStream.fork();
partnerStream.filter((baseData: BaseData) => {
return baseData.id === partner;
});
partnerStream.each(console.log);
});
我希望现在有两个流,而
foo
-stream包含两个元素:{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
-stream包含一个元素:{ id: 'bar', data: 'narf' }
但是我得到一个错误:
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
throw new Error(
^
Error: Stream already being consumed, you must either fork() or observe()
at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
at Array.forEach (native)
at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
如何将一个流分成多个流?
我也尝试过链接这些调用,但是随后我只返回一个流的结果:
partners.forEach((partner: string) => {
console.log(partner);
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
});
partnerStream.each((item: BaseData) => {
console.log(item);
});
});
仅打印:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
而不是预期的:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}
可能我误解了
fork
的全部原因。根据its doc entry:Stream.fork()派生流,允许您添加其他使用者
具有共同的背压。分叉给多个消费者的流将
只能从最慢的消费者那里以最快的速度从其来源中获取价值
处理他们。
注意:不要依赖于分支之间的一致执行顺序。
此转换仅保证所有派生将处理值foo
在此之前,将处理第二个值栏。它不保证
分叉处理foo的顺序。
提示:在修改分叉中的流值时要小心(或
使用这样做的库)。由于相同的值将传递给
每个分叉,在一个分叉中所做的更改将在任何分叉中可见
之后执行。此外,执行顺序不一致,并且
您可能会遇到细微的数据损坏错误。如果需要修改
任何值,您都应制作副本并修改副本。
弃用警告:目前可以在
消费(例如通过转换)。这将不再可能
在下一个主要版本中。如果您要分流,请始终
叫叉。
因此,而不是“如何派生流?”我的实际问题可能是:如何将高地溪流即时复制成不同的溪流?
最佳答案
partnerStream.filter()
返回一个新的流。然后,您可以使用partnerStream
再次使用partnerStream.each()
,而无需调用fork()
或observe()
。因此,要么链接partnerStream.filter().each()
调用,要么将partnerStream.filter()
的返回值分配给变量,然后在该变量上调用.each()
。