本文介绍了在Akka中推迟邮件的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Akka群集,以便在两个pahase中执行分布式计算。首先是 phaseA ,然后是 phaseB 。为了处理阶段,我使用Akka的FSM。



没有硬同步,因此其中一个节点可能达到 phaseB 而其他人仍处于阶段A



问题是,一个处于阶段B 向其他人发送与 phaseB相关的消息(它们处于 phaseA ),这导致它们失去了 phaseB相关的消息。



现在,我使用简单的技巧来推迟未知消息:

 任意==>自我!任何

但是IMO这不是正确的方法。我知道我也可以使用akka调度程序来调度任何,但是我也不喜欢这样。



这里是简化代码:

  package不论

import akka.actor._

对象测试扩展了App {

案例对象PhaseA
案例对象PhaseB

类任何扩展Actor {

def phaseA:接收= {
case PhaseA => {
context.become(phaseB)
println( in phaseB now)
}
case any =>自我!任何
}

def phaseB:接收= {
情况PhaseB => println( got phaseB message!)
}

def接收= phaseA

}

val system = ActorSystem( MySystem )
val any = system.actorOf(Props(new Any),name = any)
any! PhaseB
任何! PhaseA
}

在这种情况下推迟消息的正确方法是什么?

解决方案

您可以隐藏消息以供以后处理。将 akka.actor.Stash 混合到您的actor中,然后将 stash()您的 phaseB 消息以供日后使用。



当您的FSM处于 phaseA 并收到 phaseB 消息,调用 stash()。当该参与者随后转换为 phaseB 状态时,调用 unstashAll(),所有隐藏的消息将重新传递。 / p>

I'm using akka cluster in order to perform distributed computations in two pahses. First phaseA then phaseB. To handle phases I use akka's FSM.

There is no hard synchronization so one of the nodes may reach phaseB while others are still in phaseA.

The problem is, one in phaseB sends phaseB-related messages to others (they are in phaseA yet) what causes them to loose phaseB-related messages.

For now I use simple trick to postpone unknown messages:

case any => self ! any

But IMO this is not proper way to do that. I know I can also schedule any using akka scheduler, but I don't like this either.

Here is simplified code:

package whatever

import akka.actor._

object Test extends App {

  case object PhaseA
  case object PhaseB

  class Any extends Actor {

    def phaseA: Receive = {
      case PhaseA => {
        context.become(phaseB)
        println("in phaseB now")
      }
      case any => self ! any
    }

    def phaseB: Receive = {
      case PhaseB => println("got phaseB message !")
    }

    def receive = phaseA

  }

  val system = ActorSystem("MySystem")
  val any = system.actorOf(Props(new Any), name = "any")
  any ! PhaseB
  any ! PhaseA
}

What is the correct way to postpone messages in such a situation?

解决方案

You can stash messages for later processing. Mix akka.actor.Stash into your actors and stash() your phaseB messages for later.

When your FSM is in phaseA and receives a phaseB message, call stash(). When that actor then transitions into the phaseB state, call unstashAll() and all the stashed messages will be redelivered.

这篇关于在Akka中推迟邮件的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-29 15:57