本文介绍了Akka流中的RestartFlow无法按预期工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用,这似乎不适用于我。

I am using Delayed restarts with backoff stage feature of Akka Streams which doesn't seem to work for me.

我的代码测试是:

object Test {
  import akka.stream.scaladsl.{ Flow, RestartFlow, Sink, Source }
  import scala.concurrent.duration._
  import akka.actor.ActorSystem
  import akka.stream.ActorMaterializer

  implicit val actorSystem = ActorSystem()
  implicit val mat = ActorMaterializer()

  def main(args: Array[String]): Unit = {
    val source = Source(1 to 10)
    val flow = Flow[Int].map { x =>
      println(s"Processing: $x")
      if (x != 6) {
        x * 2
      } else throw new RuntimeException("Baam!!")
    }
    val restartFlow = RestartFlow.onFailuresWithBackoff(10.milliseconds, 20.milliseconds, 0.2, 3)(() => flow)
    source.via(restartFlow).to(Sink.ignore).run()
  }
}

我希望会发生异常最大重启次数的3倍等于3。但是,相反,我只看到一次异常。我在这里想念什么?

I expect the exception to happen three times as the max restarts is equal to 3. But, on the contrary I see exception only once. What am I missing here?

运行它的输出:

Processing: 1
Processing: 2
Processing: 3
Processing: 4
Processing: 5
Processing: 6
[ERROR] [03/20/2018 16:13:59.167] [default-akka.actor.default-dispatcher-2] [RestartWithBackoffFlow(akka://default)] Restarting graph due to failure
java.lang.RuntimeException: Baam!!
    at Test$.$anonfun$main$1(Test.scala:18)
    at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:12)
    at akka.stream.impl.fusing.Map$$anon$9.onPush(Ops.scala:53)
    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
    at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:378)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:585)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:469)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:560)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:742)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:732)
    at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:726)
    at akka.actor.Actor.aroundPreStart(Actor.scala:528)
    at akka.actor.Actor.aroundPreStart$(Actor.scala:528)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:667)
    at akka.actor.ActorCell.create(ActorCell.scala:654)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:525)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


推荐答案

您遇到了报告的问题:

You've encountered a reported issue: https://github.com/akka/akka/issues/24726

更新:此问题已在Akka 修补程序版本。

Update: This issue was fixed in the Akka 2.5.13 patch release.

这篇关于Akka流中的RestartFlow无法按预期工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 22:37