有一个具有自定义流的流,在某个阶段,我想拆分该流,并有两个替代的数据处理,稍后将再次合并。

例如。

                  -> F3 -> F6
Src -> F1 -> F2                > Merge -> Sink
                  -> F4 -> F5
F2应该有一个条件,说明数据是否包含A格式,则应转到F3流程,否则转到F4

据我所知,每个流在每个方向上只能有一个端口(或比迪,则为两个)-那么我如何支持这种流?

最佳答案

您可以使用Broadcast拆分流,然后可以在每个流上使用filtercollect过滤所需的数据。

val split = builder.add(Broadcast[Int](2))

Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
                   -> filterCondB -> F4 -> F5 -> Merge

另外,还有Partition阶段,处理输出端口的数量以及从值到端口号f: T => Int的映射函数。
val portMapper(value: T): Int = value match {
  case CondA => 0
  case CondB => 1
}

val split = builder.add(Partition[T](2, portMapper))

Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
             split -> F4 -> F5 -> Merge

也许有一些更简单的方法。

关于scala - 基于akka流条件的替代流,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36681879/

10-13 00:29