本文介绍了并行运行多个期货,超时时返回默认值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须并行运行多个期货,并且程序不应崩溃或挂起。

I have to run multiple futures in parallel and the program shouldn't crash or hang.

现在我逐一等待期货,并使用后备值

For now I wait on futures one by one, and use fallback value if there is TimeoutException.

val future1 = // start future1
val future2 = // start future2
val future3 = // start future3

// <- at this point all 3 futures are running

// waits for maximum of timeout1 seconds
val res1 = toFallback(future1, timeout1, Map[String, Int]())
// .. timeout2 seconds 
val res2 = toFallback(future2, timeout2, List[Int]())
// ... timeout3 seconds
val res3 = toFallback(future3, timeout3, Map[String, BigInt]()) 

def toFallback[T](f: Future[T], to: Int, default: T) = {
  Try(Await.result(f, to seconds))
    .recover { case to: TimeoutException => default }
}

如我所见,此代码段的最大等待时间为 timeout1 + timeout2 + timeout3

As I can see, maximum wait time of this snippet is timeout1 + timeout2 + timeout3

我的问题是:我如何一次等待所有这些期货,所以我可以将等待时间减少到 max(timeout1,timeout2,timeout3)

My question is: how can I wait on all of those futures at once, so I can reduce wait time to max(timeout1, timeout2, timeout3)?

编辑:最后,我使用了@Jatin和@senia答案:

In the end I used modification of @Jatin and @senia answers:

private def composeWaitingFuture[T](fut: Future[T], 
                                    timeout: Int, default: T) =
  future { Await.result(fut, timeout seconds) } recover {
    case e: Exception => default
  }

之后,它的用法如下:

// starts futures immediately and waits for maximum of timeoutX seconds
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]())
val res2 = composeWaitingFuture(future2, timeout2, List[Int]())
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]()) 

// takes the maximum of max(timeout1, timeout2, timeout3) to complete
val combinedFuture =
  for {
    r1 <- res1
    r2 <- res2
    r3 <- res3
  } yield (r1, r2, r3)

我认为合适的话使用 combinedFuture

推荐答案

def toFallback[T](f: Future[T], to: Int, default: T) = {
  future{
  try{
        Await.result(f, to seconds)
   }catch{
        case e:TimeoutException => default
  }
 }

您甚至可以使此块异步,并且每个请求等待其最大时间。如果线程过多,则可能只有一个线程可以使用Akka的系统调度程序来检查其他期货。 @Senia在下面对此做了回答。

You can even make this block asynchronous and each request waits for its maximum time. If there are too many threads, probably have a single thread that keeps checking for other futures using Akka's system scheduler. @Senia has answered below on this one.

这篇关于并行运行多个期货,超时时返回默认值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-25 09:23