在我的场景中,我有2位 Actor :

  • watchee(我使用TestProbe)
  • watcher(包裹在Watcher中的TestActorRef公开了我在测试中跟踪的一些内部state)
  • watchee死亡时,观察者应采取一些措施。

    这是到目前为止我编写的完整测试用例:
    class TempTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll {
    
      def this() = this(ActorSystem("TempTest"))
    
      override def afterAll {
        TestKit.shutdownActorSystem(system)
      }
    
      class WatcherActor(watchee: ActorRef) extends Actor {
    
        var state = "initial"
        context.watch(watchee)
    
        override def receive: Receive = {
          case "start" =>
            state = "start"
          case _: Terminated =>
            state = "terminated"
        }
    
      }
    
      test("example") {
        val watchee = TestProbe()
        val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))
    
        assert(watcher.underlyingActor.state === "initial")
    
        watcher ! "start" // "start" will be sent and handled by watcher synchronously
        assert(watcher.underlyingActor.state === "start")
    
        system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
        Thread.sleep(100) // what is the best way to avoid blocking here?
        assert(watcher.underlyingActor.state === "terminated")
      }
    
    }
    

    现在,由于所有参与者都使用CallingThreadDispatcher(所有Akka的测试助手都使用带有.withDispatcher(CallingThreadDispatcher.Id)的props构造),因此我可以放心地假设该语句返回时:
    watcher ! "start"
    

    ...“开始”消息已由WatchingActor处理,因此我可以基于watcher.underlyingActor.state进行断言

    但是,根据我的观察,当我停止使用watchee或通过向其发送system.stop停止Kill时,作为Terminated死亡的副作用而产生的watchee消息将在另一个线程中异步执行。

    一种解决方案不是停止watchee,将线程阻塞一段时间并在此之后验证Watcher状态,但是我想知道如何正确执行此操作(即,如何确保在杀死actor之后,它是观察者收到并处理了表示死亡的 Terminated消息)?

    最佳答案

    编辑:
    经过与OP的讨论和测试后,我们发现发送PoisonPill作为终止被监视的actor的方式达到了所需的行为,因为PPill的Terminated是同步处理的,而stop或kill的终止是异步处理的。

    尽管我们不确定原因,但最好的选择是这是由于杀死 Actor 会引发异常(exception),而PPilling不会异常(exception)。

    显然,这与我最初的建议使用gracefulStop模式无关,我将在下面进行报告。

    总之,解决OP问题的方法是终止被监视的 Actor 发送PPill而不是发送Kill消息或通过执行system.stop来终止。

    旧的答案从这里开始:

    我可能会建议一些无关紧要的内容,但我认为它可能适用。

    如果我正确理解了,您想要做的基本上是同步终止一个 Actor ,即执行仅在该 Actor 正式死亡且已记录其死亡(观察者为您的情况)后才返回的操作。

    通常,死亡通知以及akka中的其他大多数通知都是异步的。尽管如此,仍可以使用gracefulStop模式(akka.pattern.gracefulStop)获得同步死亡确认。

    为此,代码应类似于以下内容:

    val timeout = 5.seconds
    val killResultFuture = gracefulStop(victimRef, timeout, PoisonPill)
    Await.result(killResultFuture, timeout)
    

    这样做是向受害者发送PoisonPill(请注意:您可以使用自定义消息),一旦受害者死亡,后者会答复 future 。使用Await.result可确保您保持同步。

    不幸的是,这仅在以下情况下可用:a)您正在积极杀死受害者,而又不想响应外部死亡原因b)您可以在代码中使用超时和阻塞。但是也许您可以适应这种情况。

    关于scala - Akka:测试监视\死亡监视,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/26433494/

    10-11 21:19