是否有Akka流组合器来执行以下操作(或达到此目的的某些操作)? (现在我们称它为and。)

(flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat]

语义是无论源如何,其元素都将传递给Flow,并且它们的输出将被合并为一个新的Flow作为元组。 (对于那些熟悉类别理论风格的函数式编程的箭头的人,我正在寻找类似&&&的东西。)

库中有两个看起来很相关的组合器,即zipalsoTo。但是前者接受SourceShape,而后者接受SinkShape。都不会接受GraphShape。为什么会这样呢?

我的用例如下:
someSource
  .via(someFlowThatReturnsUnit.and(Flow.apply))
  .runWith(someSink)

找不到类似.and的东西,我这样修改了原始的Flow:
someSource
  .via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs)
  .runWith(someSink)

这行得通,但我正在寻找更清洁,更合成的解决方案。

最佳答案

公告

正如Viktor Klang在评论中指出的那样:仅当已知两个流Tuple2[O,O2]flow1相对于输入元素计数和输出元素计数都是1:1时,才可以将zipt压缩为flow2

基于图的解决方案

可以在Graph内部创建一个元组构造。实际上,您的问题几乎完全符合以下示例:

scala - 如何并排组成两个流程?-LMLPHP

在链接中扩展示例代码,可以使用BroadcastZip

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(1 to 10)
  val out = Sink.ignore

  val bcast = builder.add(Broadcast[Int](2))

  val merge = builder.add(Zip[Int, Int]()) //different than link

  val f1, f2, f4 = Flow[Int].map(_ + 10)

  val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
              bcast ~> f4 ~> merge
  ClosedShape
})//end RunnableGraph.fromGraph

某种Hacky流解决方案

如果您正在寻找纯流解决方案,则可以使用中间流,但是Mat不会被维护,并且它涉及每个输入元素2个流的实现:
def andFlows[I, O, O2] (maxConcurrentSreams : Int)
                       (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
                       (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] =
  Flow[I].mapAsync(maxConcurrentStreams){ i =>

    val o  : Future[O]  = Source
                           .single(i)
                           .via(flow1)
                           .to(Sink.head[O])
                           .run()

    val o2 : Future[O2] = Source
                           .single(i)
                           .via(flow2)
                           .to(Sink.head[O2])
                           .run()

    o zip o2
  }//end Flow[I].mapAsync

通用压缩

如果要使这种压缩通用,则对于大多数Flows而言,输出类型必须为(Seq[O], Seq[O2])。可以通过使用Sink.seq而不是上面的Sink.head函数中的andFlows来生成此类型:
def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int)
                              (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
                              (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] =
  Flow[I].mapAsync(maxConcurrentStreams){ i =>

    val o  : Future[Seq[O]]  = Source
                                .single(i)
                                .via(flow1)
                                .to(Sink.seq[O])
                                .run()

    val o2 : Future[Seq[O2]] = Source
                                .single(i)
                                .via(flow2)
                                .to(Sink.seq[O2])
                                .run()

    o zip o2
  }//end Flow[I].mapAsync

关于scala - 如何并排组成两个流程?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47367859/

10-11 22:35
查看更多