有一个具有自定义流的流,在某个阶段,我想拆分该流,并有两个替代的数据处理,稍后将再次合并。
例如。
-> F3 -> F6
Src -> F1 -> F2 > Merge -> Sink
-> F4 -> F5
F2
应该有一个条件,说明数据是否包含A
格式,则应转到F3
流程,否则转到F4
。据我所知,每个流在每个方向上只能有一个端口(或比迪,则为两个)-那么我如何支持这种流?
最佳答案
您可以使用Broadcast
拆分流,然后可以在每个流上使用filter
或collect
过滤所需的数据。
val split = builder.add(Broadcast[Int](2))
Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
-> filterCondB -> F4 -> F5 -> Merge
另外,还有
Partition
阶段,处理输出端口的数量以及从值到端口号f: T => Int
的映射函数。val portMapper(value: T): Int = value match {
case CondA => 0
case CondB => 1
}
val split = builder.add(Partition[T](2, portMapper))
Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
split -> F4 -> F5 -> Merge
也许有一些更简单的方法。
关于scala - 基于akka流条件的替代流,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36681879/