本文介绍了如何关闭Akka流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在将一些实验喷雾代码更新为akka-stream(2.0.2),因此我将逐步阅读他的文档。我的需求之一是,如果我在流中检测到协议冲突,则需要立即关闭流并启动客户端。
I am updating some experimental spray code to akka-stream (2.0.2), so I'm going through he documentation little by little. One of the needs I have is that if I detect a protocol violation in my stream, I need to close the stream immediately and kick off the client.
什么是适当的立即从Flow内部关闭(终止)流的方法?
What is the proper way of immediately closing (terminating) the stream from inside a Flow?
推荐答案
使用 PushStage
:
import akka.stream.stage._
val closeStage = new PushStage[Tpe, Tpe] {
override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
case elem if shouldCloseStream ⇒
// println("stream closed")
ctx.finish()
case elem ⇒
ctx.push(elem)
}
}
您可以将 PushStage
与 Flow
通过 transform
方法:
Flow[Tpe]
.map(...)
.transform(() ⇒ closeStage)
.map(...)
这篇关于如何关闭Akka流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!