我有一个 SourceQueue
。当我向 this 提供一个元素时,我希望它通过 Stream
并且当它到达 Sink
时将输出返回到提供此元素的代码(类似于 Sink.head
将一个元素返回给 RunnableGraph.run()
调用)。
我如何实现这一目标?我的问题的一个简单例子是:
val source = Source.queue[String](100, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.ReturnTheStringSomehow
val graph = source.via(flow).to(sink).run()
val x = graph.offer("foo")
println(x) // Output should be "Modified foo"
val y = graph.offer("bar")
println(y) // Output should be "Modified bar"
val z = graph.offer("baz")
println(z) // Output should be "Modified baz"
编辑: 对于我在这个问题中给出的例子 Vladimir Matveev 提供了最好的答案。但是,应该注意的是,此解决方案仅在元素以它们提供给
sink
的相同顺序进入 source
时才有效。如果无法保证,sink
中元素的顺序可能会有所不同,结果可能会与预期不同。 最佳答案
我相信使用现有的原语从名为 Sink.queue
的流中提取值更简单。下面是一个例子:
val source = Source.queue[String](128, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1))
val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run()
def getNext: String = Await.result(sinkQueue.pull(), 1.second).get
sourceQueue.offer("foo")
println(getNext)
sourceQueue.offer("bar")
println(getNext)
sourceQueue.offer("baz")
println(getNext)
它完全符合您的要求。
请注意,为队列接收器设置
inputBuffer
属性对于您的用例可能重要也可能不重要 - 如果您不设置它,缓冲区将为零大小并且数据不会流过流,直到您调用接收器上的 pull()
方法。sinkQueue.pull()
产生一个 Future[Option[T]]
,如果接收器接收到一个元素,它将使用 Some
成功完成,或者如果流失败则失败。如果流正常完成,它将使用 None
完成。在这个特定示例中,我通过使用 Option.get
忽略了这一点,但您可能希望添加自定义逻辑来处理这种情况。关于scala - Akka Stream 从 Sink 返回对象,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41035707/