假设我有一系列消息,其中包含标识客户端的ID。不同客户端的消息可以并行处理。同一客户端的消息一次只能处理一个。

使用Akka,我希望能够将消息路由到参与者池,但每个客户端id约束最多只能满足一个并发参与者。

解决此问题的最佳方法是什么?我是在误解Akka背后的任何概念,还是试图应用不适合演员模型的传统概念?

最佳答案

我相信这可以很简单地适合演员模型。顾名思义,单个参与者按顺序处理其传入消息,因此,一种简单的实现方法是为每个客户端创建一个参与者。

以这个简单的示例为例,您使用一个角色作为路由器,每个现有客户端使用一个工作线程。

  class Boss extends Actor {

    override def receive: Receive = jobHandler(Map.empty[ClientId, ActorRef])

    def jobHandler(workers: Map[ClientId, ActorRef]): Receive = {

      case j@Job(id, ...) if workers contains id =>
        workers(id) ! j

      case j@Job(id, ...) =>
        val worker = context.actorOf(Props[Worker])
        worker ! j
        context.become(jobHandler(workers + (id -> worker)))
    }
  }

  class Worker extends Actor {
    override def receive: Receive = {
      case Job(...) => doStuff(...)
    }
  }


请记住,演员非常轻巧,因此即使您需要跟踪很多演员,这也很重要。

关于java - 每个ID具有单个运行实例的Akka路由消息,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41967086/

10-11 22:20
查看更多