在这种情况下,我需要使用迭代器,对于每个项目,都会调用一个函数f(item)并返回一个Future[Unit]

但是,我需要使每个f(item)调用顺序执行,它们不能并行运行。

for(item <- it)
  f(item)

将无法工作,因为这会同时启动所有 call 。

我该怎么做,以便他们按顺序进行?

最佳答案

如果您不介意本地化的var,则可以按以下方式序列化异步处理(每个f(item))(flatMap进行序列化):

val fSerialized = {
  var fAccum = Future{()}
  for(item <- it) {
    println(s"Processing ${item}")
    fAccum = fAccum flatMap { _ => f(item) }
  }
  fAccum
}

fSerialized.onComplete{case resTry => println("All Done.")}

通常,请避免Await操作-它们会阻塞(有点破坏异步点,消耗资源,对于草率的设计,可能会死锁)

绝招1:

您可以通过通常的怀疑者Futuresflatmap链接在一起-它序列化异步操作。有什么不能做的吗? ;-)
def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}

val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)

fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}

以上所有都不是-主线程在几十纳秒内直通。在所有情况下,都使用 future 来执行并行线程并跟踪异步状态/结果并链接逻辑。
fSerialized表示两个不同的异步操作链在一起的组合。一旦评估了val,它将立即启动f1(异步运行)。 f1与任何Future一样运行-最终完成时,将其称为onComplete回调块。这很酷-flatMap将其参数安装为f1 onComplete回调块-因此f2f1完成后立即启动,没有阻塞,轮询或浪费资源的使用。当f2完成时,然后fSerialized完成-因此它运行fSerialized.onComplete回调块-打印“两个都完成”。

不仅如此,而且您还可以使用整洁的非意大利面条式代码随意链接平面图
 f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...

如果要通过Future.onComplete进行此操作,则必须将后续操作作为嵌套在onComplete层上进行嵌入:
f1.onComplete{case res1Try =>
  f2
  f2.onComplete{case res2Try =>
    f3
    f3.onComplete{case res3Try =>
      f4
      f4.onComplete{ ...
      }
    }
  }
}

不太好

测试以证明:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))

fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

酷技巧2:

这样的理解:
for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr

除了语法糖外,什么也没有:
aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }

那是一串flatMap,然后是最终 map 。

那意味着
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")

等同于
for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"

测试以证明(根据之前的测试):
val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

不太酷的把戏3:

不幸的是,您不能将迭代器和 future 混为一谈。编译错误:
val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last

而嵌套fors带来了挑战。以下代码不会序列化,而是并行运行异步块(嵌套的理解不会将后续的Future链接到flatMap / Map,而是链接为Iterable.flatMap {item => f(item)}-不一样!)
val fSerial = {for {nextItem <- itemIterable} yield
                 for {nextRes <- f(nextItem)} yield "Did It"}.last

同样使用foldLeft / foldRight和flatMap不能按您期望的那样工作-似乎是一个错误/局限;所有异步块都是并行处理的(因此Iterator.foldLeft/RightFuture.flatMap不兼容):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)

//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}

fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

但这有效(涉及var):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)

var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem))

关于scala - 如何在scala中顺序执行 future ,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/20414500/

10-15 10:19