鉴于我有一个很长的事件流正在流过某些东西,如下所示。经过很长时间后,将创建许多不再需要的子流。
case class Wid(id: Int, v: String, expires: LocalDateTime)
test("Substream with scan") {
val (pub, sub) = TestSource.probe[Wid]
.groupBy(Int.MaxValue, _.id)
.scan("")((a: String, b: Wid) => a + b.v)
.mergeSubstreams
.toMat(TestSink.probe[String])(Keep.both)
.run()
}
最佳答案
TL; DR 您可以在一段时间后关闭子流。但是,使用输入来通过内置阶段动态设置时间是另一回事。
关闭子流
要关闭流,通常(从上游)完成它,但也可以(从下游)取消它。例如,一旦take(n: Int)
元素通过,n
流将取消。
现在,在groupBy
情况下,您不能完成一个子流,因为所有子流都共享上游,但是您可以取消它。如何取决于要结束的条件。
但是,请注意groupBy
会删除已经关闭的子流的输入:如果在关闭3
-substream之后,一个ID为groupBy
的新元素从上游到3
,则它将被忽略,而下一个元素将被拉出原因可能是在关闭和重新打开子流之间的过程中可能丢失了某些元素。另外,如果您的流应该运行很长时间,这将影响性能,因为在转发到相关(实时)子流之前,将根据已关闭子流的列表检查每个元素。如果您对此不满意,则可能需要实现自己的状态过滤器(例如,使用Bloom过滤器)。
要关闭子流,我通常使用take
(如果您只想要给定数量的元素,但是在无限流上可能不是这种情况),或者某种超时:completionTimeout
,如果您想要从实现到实现的固定时间如果您想在一段时间内没有任何元素通过时关闭,请使用close或idleTimeout
。请注意,这些流不会取消流但会使流失败,因此您必须使用recover
或recoverWith
阶段捕获异常,以将失败更改为取消(recoverWith
允许您通过不发送任何最后一个元素来取消,方法是使用Source.empty
恢复) 。
动态设置超时
现在,您想要的是根据第一个传递的元素动态设置关闭时间。这更加复杂,因为流的物化与通过流的元素无关。确实,在通常的情况下(没有groupBy
),流在任何元素通过它们之前就已经实现了,因此使用元素来实现它们是没有意义的。
我在that question中遇到了类似的问题,最终使用带签名的groupBy
的修改版
paramGroupBy[K, OO, MM](maxSubstreams: Int, f: Out => K, paramSubflow: K => Flow[Out, OO, MM])
允许使用定义子流的键来定义每个子流。可以将其修改为将第一个元素(而不是键)作为参数。
另一种方法(可能是更简单的方法)是编写自己的阶段,该阶段可以完全满足您的要求:从第一个元素获取结束时间并在该时间取消流。这是为此的示例实现(我使用调度程序而不是设置状态):
object CancelAfterTimer
class CancelAfter[T](getTimeout: T => FiniteDuration) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("CancelAfter.in")
val out = Outlet[T]("CancelAfter.in")
override val shape: FlowShape[T, T] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (!isTimerActive(CancelAfterTimer))
scheduleOnce(CancelAfterTimer, getTimeout(elem))
push(out, elem)
}
override def onTimer(timerKey: Any): Unit =
completeStage() //this will cancel the upstream and close the downstrean
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
关于scala - 如何清理连续Akka流中的子流,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44016410/