问题陈述:我有一些证券需要以并行方式处理。在Java中,我使用线程池来处理每个安全性,并使用闩锁来倒数计时。完成后,我进行合并等。
因此,我向我的SecurityProcessor(它是一个参与者)发送消息,然后等待所有将来完成。最后,我使用MergeHelper进行后期处理。 SecurityProcessor获取安全性,执行一些I / O和处理并回复安全性
val listOfFutures = new ListBuffer[Future[Security]]()
var portfolioResponse: Portfolio = _
for (security <- portfolio.getSecurities.toList) {
val securityProcessor = actorOf[SecurityProcessor].start()
listOfFutures += (securityProcessor ? security) map {
_.asInstanceOf[Security]
}
}
val futures = Future.sequence(listOfFutures.toList)
futures.map {
listOfSecurities =>
portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities)
}.get
此设计是否正确?是否有更好/更凉爽的方法来使用Akka解决此常见问题?
最佳答案
val futureResult = Future.sequence(
portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] }
) map { securities => MergeHelper.merge(portfolio, securities) }