我有一个 SourceQueue 。当我向 this 提供一个元素时,我希望它通过 Stream 并且当它到达 Sink 时将输出返回到提供此元素的代码(类似于 Sink.head 将一个元素返回给 RunnableGraph.run() 调用)。

我如何实现这一目标?我的问题的一个简单例子是:

val source = Source.queue[String](100, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.ReturnTheStringSomehow
val graph = source.via(flow).to(sink).run()

val x = graph.offer("foo")
println(x) // Output should be "Modified foo"
val y = graph.offer("bar")
println(y) // Output should be "Modified bar"
val z = graph.offer("baz")
println(z) // Output should be "Modified baz"

编辑: 对于我在这个问题中给出的例子 Vladimir Matveev 提供了最好的答案。但是,应该注意的是,此解决方案仅在元素以它们提供给 sink 的相同顺序进入 source 时才有效。如果无法保证,sink 中元素的顺序可能会有所不同,结果可能会与预期不同。

最佳答案

我相信使用现有的原语从名为 Sink.queue 的流中提取值更简单。下面是一个例子:

val source = Source.queue[String](128, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1))

val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run()

def getNext: String = Await.result(sinkQueue.pull(), 1.second).get

sourceQueue.offer("foo")
println(getNext)

sourceQueue.offer("bar")
println(getNext)

sourceQueue.offer("baz")
println(getNext)

它完全符合您的要求。

请注意,为队列接收器设置 inputBuffer 属性对于您的用例可能重要也可能不重要 - 如果您不设置它,缓冲区将为零大小并且数据不会流过流,直到您调用接收器上的 pull() 方法。
sinkQueue.pull() 产生一个 Future[Option[T]] ,如果接收器接收到一个元素,它将使用 Some 成功完成,或者如果流失败则失败。如果流正常完成,它将使用 None 完成。在这个特定示例中,我通过使用 Option.get 忽略了这一点,但您可能希望添加自定义逻辑来处理这种情况。

关于scala - Akka Stream 从 Sink 返回对象,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41035707/

10-09 21:37