本文介绍了如何关闭Akka流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将一些实验喷雾代码更新为akka-stream(2.0.2),因此我将逐步阅读他的文档。我的需求之一是,如果我在流中检测到协议冲突,则需要立即关闭流并启动客户端。

I am updating some experimental spray code to akka-stream (2.0.2), so I'm going through he documentation little by little. One of the needs I have is that if I detect a protocol violation in my stream, I need to close the stream immediately and kick off the client.

什么是适当的立即从Flow内部关闭(终止)流的方法?

What is the proper way of immediately closing (terminating) the stream from inside a Flow?

推荐答案

使用 PushStage

import akka.stream.stage._

val closeStage = new PushStage[Tpe, Tpe] {
  override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
    case elem if shouldCloseStream ⇒
      // println("stream closed")
      ctx.finish()
    case elem ⇒
      ctx.push(elem)
  }
}

您可以将 PushStage Flow 通过 transform 方法:

Flow[Tpe]
.map(...)
.transform(() ⇒ closeStage)
.map(...)

这篇关于如何关闭Akka流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-31 06:36