问题描述
我正在使用Akka群集,以便在两个pahase中执行分布式计算。首先是 phaseA
,然后是 phaseB
。为了处理阶段,我使用Akka的FSM。
没有硬同步,因此其中一个节点可能达到 phaseB
而其他人仍处于阶段A
。
问题是,一个处于阶段B
向其他人发送与 phaseB相关的
消息(它们处于 phaseA
),这导致它们失去了 phaseB相关的消息。
现在,我使用简单的技巧来推迟未知消息:
任意==>自我!任何
但是IMO这不是正确的方法。我知道我也可以使用akka调度程序来调度任何
,但是我也不喜欢这样。
这里是简化代码:
package不论
import akka.actor._
对象测试扩展了App {
案例对象PhaseA
案例对象PhaseB
类任何扩展Actor {
def phaseA:接收= {
case PhaseA => {
context.become(phaseB)
println( in phaseB now)
}
case any =>自我!任何
}
def phaseB:接收= {
情况PhaseB => println( got phaseB message!)
}
def接收= phaseA
}
val system = ActorSystem( MySystem )
val any = system.actorOf(Props(new Any),name = any)
any! PhaseB
任何! PhaseA
}
在这种情况下推迟消息的正确方法是什么?
您可以隐藏消息以供以后处理。将 akka.actor.Stash
混合到您的actor中,然后将 stash()
您的 phaseB
消息以供日后使用。
当您的FSM处于 phaseA
并收到 phaseB
消息,调用 stash()
。当该参与者随后转换为 phaseB
状态时,调用 unstashAll()
,所有隐藏的消息将重新传递。 / p>
I'm using akka cluster in order to perform distributed computations in two pahses. First phaseA
then phaseB
. To handle phases I use akka's FSM.
There is no hard synchronization so one of the nodes may reach phaseB
while others are still in phaseA
.
The problem is, one in phaseB
sends phaseB-related
messages to others (they are in phaseA
yet) what causes them to loose phaseB-related
messages.
For now I use simple trick to postpone unknown messages:
case any => self ! any
But IMO this is not proper way to do that. I know I can also schedule any
using akka scheduler, but I don't like this either.
Here is simplified code:
package whatever
import akka.actor._
object Test extends App {
case object PhaseA
case object PhaseB
class Any extends Actor {
def phaseA: Receive = {
case PhaseA => {
context.become(phaseB)
println("in phaseB now")
}
case any => self ! any
}
def phaseB: Receive = {
case PhaseB => println("got phaseB message !")
}
def receive = phaseA
}
val system = ActorSystem("MySystem")
val any = system.actorOf(Props(new Any), name = "any")
any ! PhaseB
any ! PhaseA
}
What is the correct way to postpone messages in such a situation?
You can stash messages for later processing. Mix akka.actor.Stash
into your actors and stash()
your phaseB
messages for later.
When your FSM is in phaseA
and receives a phaseB
message, call stash()
. When that actor then transitions into the phaseB
state, call unstashAll()
and all the stashed messages will be redelivered.
这篇关于在Akka中推迟邮件的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!