

我正在使用Akka群集,以便在两个pahase中执行分布式计算。首先是 phaseA ,然后是 phaseB 。为了处理阶段,我使用Akka的FSM。

没有硬同步,因此其中一个节点可能达到 phaseB 而其他人仍处于阶段A

问题是,一个处于阶段B 向其他人发送与 phaseB相关的消息(它们处于 phaseA ),这导致它们失去了 phaseB相关的消息。






import akka.actor._

对象测试扩展了App {


类任何扩展Actor {

def phaseA:接收= {
case PhaseA => {
println( in phaseB now)
case any =>自我!任何

def phaseB:接收= {
情况PhaseB => println( got phaseB message!)

def接收= phaseA


val system = ActorSystem( MySystem )
val any = system.actorOf(Props(new Any),name = any)
any! PhaseB
任何! PhaseA



您可以隐藏消息以供以后处理。将 akka.actor.Stash 混合到您的actor中,然后将 stash()您的 phaseB 消息以供日后使用。

当您的FSM处于 phaseA 并收到 phaseB 消息,调用 stash()。当该参与者随后转换为 phaseB 状态时,调用 unstashAll(),所有隐藏的消息将重新传递。 / p>

I'm using akka cluster in order to perform distributed computations in two pahses. First phaseA then phaseB. To handle phases I use akka's FSM.

There is no hard synchronization so one of the nodes may reach phaseB while others are still in phaseA.

The problem is, one in phaseB sends phaseB-related messages to others (they are in phaseA yet) what causes them to loose phaseB-related messages.

For now I use simple trick to postpone unknown messages:

case any => self ! any

But IMO this is not proper way to do that. I know I can also schedule any using akka scheduler, but I don't like this either.

Here is simplified code:

package whatever

import akka.actor._

object Test extends App {

  case object PhaseA
  case object PhaseB

  class Any extends Actor {

    def phaseA: Receive = {
      case PhaseA => {
        println("in phaseB now")
      case any => self ! any

    def phaseB: Receive = {
      case PhaseB => println("got phaseB message !")

    def receive = phaseA


  val system = ActorSystem("MySystem")
  val any = system.actorOf(Props(new Any), name = "any")
  any ! PhaseB
  any ! PhaseA

What is the correct way to postpone messages in such a situation?


You can stash messages for later processing. Mix akka.actor.Stash into your actors and stash() your phaseB messages for later.

When your FSM is in phaseA and receives a phaseB message, call stash(). When that actor then transitions into the phaseB state, call unstashAll() and all the stashed messages will be redelivered.


10-29 15:57