问题陈述:我有一些证券需要以并行方式处理。在Java中,我使用线程池来处理每个安全性,并使用闩锁来倒数计时。完成后,我进行合并等。

因此,我向我的SecurityProcessor(它是一个参与者)发送消息,然后等待所有将来完成。最后,我使用MergeHelper进行后期处理。 SecurityProcessor获取安全性,执行一些I / O和处理并回复安全性

  val listOfFutures = new ListBuffer[Future[Security]]()
  var portfolioResponse: Portfolio = _
  for (security <- portfolio.getSecurities.toList) {
    val securityProcessor = actorOf[SecurityProcessor].start()
    listOfFutures += (securityProcessor ? security) map {
      _.asInstanceOf[Security]
    }
  }
  val futures = Future.sequence(listOfFutures.toList)
  futures.map {
    listOfSecurities =>
      portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities)
  }.get


此设计是否正确?是否有更好/更凉爽的方法来使用Akka解决此常见问题?

最佳答案

val futureResult = Future.sequence(
                  portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] }
                ) map { securities => MergeHelper.merge(portfolio, securities) }

09-26 21:01