在我的场景中,我有2位 Actor :
watchee
(我使用TestProbe
)watcher
(包裹在Watcher
中的TestActorRef
公开了我在测试中跟踪的一些内部state
)watchee
死亡时,观察者应采取一些措施。这是到目前为止我编写的完整测试用例:
class TempTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TempTest"))
override def afterAll {
TestKit.shutdownActorSystem(system)
}
class WatcherActor(watchee: ActorRef) extends Actor {
var state = "initial"
context.watch(watchee)
override def receive: Receive = {
case "start" =>
state = "start"
case _: Terminated =>
state = "terminated"
}
}
test("example") {
val watchee = TestProbe()
val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))
assert(watcher.underlyingActor.state === "initial")
watcher ! "start" // "start" will be sent and handled by watcher synchronously
assert(watcher.underlyingActor.state === "start")
system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
Thread.sleep(100) // what is the best way to avoid blocking here?
assert(watcher.underlyingActor.state === "terminated")
}
}
现在,由于所有参与者都使用
CallingThreadDispatcher
(所有Akka的测试助手都使用带有.withDispatcher(CallingThreadDispatcher.Id)
的props构造),因此我可以放心地假设该语句返回时:watcher ! "start"
...“开始”消息已由
WatchingActor
处理,因此我可以基于watcher.underlyingActor.state
进行断言但是,根据我的观察,当我停止使用
watchee
或通过向其发送system.stop
停止Kill
时,作为Terminated
死亡的副作用而产生的watchee
消息将在另一个线程中异步执行。一种解决方案不是停止
watchee
,将线程阻塞一段时间并在此之后验证Watcher
状态,但是我想知道如何正确执行此操作(即,如何确保在杀死actor之后,它是观察者收到并处理了表示死亡的 Terminated
消息)? 最佳答案
编辑:
经过与OP的讨论和测试后,我们发现发送PoisonPill作为终止被监视的actor的方式达到了所需的行为,因为PPill的Terminated是同步处理的,而stop或kill的终止是异步处理的。
尽管我们不确定原因,但最好的选择是这是由于杀死 Actor 会引发异常(exception),而PPilling不会异常(exception)。
显然,这与我最初的建议使用gracefulStop模式无关,我将在下面进行报告。
总之,解决OP问题的方法是终止被监视的 Actor 发送PPill而不是发送Kill消息或通过执行system.stop来终止。
旧的答案从这里开始:
我可能会建议一些无关紧要的内容,但我认为它可能适用。
如果我正确理解了,您想要做的基本上是同步终止一个 Actor ,即执行仅在该 Actor 正式死亡且已记录其死亡(观察者为您的情况)后才返回的操作。
通常,死亡通知以及akka中的其他大多数通知都是异步的。尽管如此,仍可以使用gracefulStop模式(akka.pattern.gracefulStop)获得同步死亡确认。
为此,代码应类似于以下内容:
val timeout = 5.seconds
val killResultFuture = gracefulStop(victimRef, timeout, PoisonPill)
Await.result(killResultFuture, timeout)
这样做是向受害者发送PoisonPill(请注意:您可以使用自定义消息),一旦受害者死亡,后者会答复 future 。使用Await.result可确保您保持同步。
不幸的是,这仅在以下情况下可用:a)您正在积极杀死受害者,而又不想响应外部死亡原因b)您可以在代码中使用超时和阻塞。但是也许您可以适应这种情况。
关于scala - Akka:测试监视\死亡监视,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/26433494/