我有一个演员“ ItemProvider”,可以接收“ getItems”消息。
ItemProvider管理项目的项目。因此,我可以有多个“ getItems”消息为项目A请求项目,而其他“ getItems”消息为项目B请求项目。
第一次“ itemProvider”收到这样的消息时,它需要调用服务才能实际获取商品
(这可能需要一分钟,服务会返回未来,因此它不会阻止actor)。在此等待期间,其他“ getItems”消息可以到达。
项目“ ItemProvider”缓存从服务接收到的“ Items”。
因此,在1分钟的装载时间后,它可以立即提供物品。
我很确定“ ItemProvider”应该使用Akka的“成为”功能。但是,该如何处理无法立即服务的客户呢?
我可以想到以下选项:
ItemProvider拥有一个列表未决消息。它将无法提供的消息添加到此列表中。当ItemProvider为“就绪”时,它将处理未决的客户端
ItemProvider将消息发送回其父级。家长会重新发出邮件
ItemProvider使用调度程序。并在将来再次获得消息。
也许不使用成为而是使用AbstractFSM类?
有人知道实现ItemProvider的最佳Akka方法吗?
最佳答案
在下面,您将找到一种构造角色以满足您的要求的可能方法。在此解决方案中,我将在每个项目中使用actor实例来缓存该项目的特定项。然后,我将使用路由角色,该角色将接收获取项目项目的请求,并委派给处理该项目缓存的正确子角色。在实际的缓存参与者中,您将看到我已使用存储/取消存储来推迟请求,直到要缓存的项目已加载(我在代码中模拟)。代码如下:
import akka.actor._
import scala.concurrent.Future
import akka.pattern._
import concurrent.duration._
import akka.util.Timeout
class ItemProviderRouter extends Actor{
import ItemProvider._
def receive = {
case get @ GetItems(project) =>
//Lookup the child for the supplied project. If one does not
//exist, create it
val child = context.child(project).getOrElse(newChild(project))
child.forward(get)
}
def newChild(project:String) = {
println(s"creating a new child ItemProvider for project $project")
context.actorOf(Props[ItemProvider], project)
}
}
object ItemProvider{
case class GetItems(project:String)
case class Item(foo:String)
case class LoadedItems(items:List[Item])
case object ClearCachedItems
case class ItemResults(items:List[Item])
}
class ItemProvider extends Actor with Stash{
import ItemProvider._
//Scheduled job to drop the cached items and force a reload on subsequent request
import context.dispatcher
context.system.scheduler.schedule(5 minutes, 5 minutes, self, ClearCachedItems)
def receive = noCachedItems
def noCachedItems:Receive = {
case GetItems(project) =>
stash()
fetchItems(project)
context.become(loadingItems)
case ClearCachedItems =>
//Noop
}
def loadingItems:Receive = {
case get:GetItems => stash
case LoadedItems(items) =>
println(s"Actor ${self.path.name} got items to cache, changing state to cachedItems")
context.become(cachedItems(items))
unstashAll()
case ClearCachedItems => //Noop
}
def cachedItems(items:List[Item]):Receive = {
case GetItems(project) =>
sender ! ItemResults(items)
case ClearCachedItems =>
println("Clearing out cached items")
context.become(noCachedItems)
case other =>
println(s"Received unexpected request $other when in state cachedItems")
}
def fetchItems(project:String){
println(s"Actor ${self.path.name} is fetching items to cache")
//Simulating doing something that results in a Future
//representing the items to cache
val fut = Future{
Thread.sleep(5000)
List(Item(s"hello $project"), Item(s"world $project"))
}
fut.map(LoadedItems(_)).pipeTo(self)
}
}
然后对其进行测试:
object ItemProviderTest extends App{
import ItemProvider._
val system = ActorSystem("test")
import system.dispatcher
val provider = system.actorOf(Props[ItemProviderRouter])
implicit val timeout = Timeout(10 seconds)
for(i <- 1 until 20){
val afut = provider ? GetItems("a")
val bfut = provider ? GetItems("b")
afut onSuccess{
case ItemResults(items) => println(s"got items list of $items for project a")
}
bfut onSuccess{
case ItemResults(items) => println(s"got items list of $items for project b")
}
}
}
为简单起见,我使用实际的actor进行路由,而不是使用自定义路由器,但是如果性能(即邮箱命中率)对您很重要,您也可以在此处实现自定义路由器。