说我有两个来源:

val ticks = Source(1 to 10)
val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable])

我想在Akka Stream中创建一个Graph[...]处理步骤,基于ticks流的当前值,它在值流中消耗的尽可能多。因此,例如,当值匹配时,我想返回第二个源中所有匹配的元素,否则继续打勾,得到类似以下的输出:
(1, None)
(2, None)
(3, Some(Seq(3)))
(4, Some(Seq(4, 4)))
(5, None)
(6, None)
(7, Some(Seq(7)))
(8, Some(Seq(8,8,8,8)))
(9, Some(Seq(9)))
(10, None)

您将如何实现这种行为?

最佳答案



根据该站点,您可以像这样实现GraphStage:

final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")

override def shape = FlowShape(in, out)
}

关于此主题还有一篇博客文章:http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/

希望这可以帮助 :)

关于akka-stream - 一个如何基于另一流控制Akka流的流,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40767098/

10-12 00:13
查看更多