本文介绍了如何限制Scala中未处理的期货数量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果无法限制Scala中未处理的期货数量,我将无法提供资金.例如下面的代码:

I cannot fund if there is way to limit number of unprocessed Futures in Scala. For example in following code:

import ExecutionContext.Implicits.global    
for (i <- 1 to N) {
  val f = Future {
    //Some Work with bunch of object creation
  }
}

如果N太大,它将最终引发OOM.有没有一种方法可以通过队列式等待或异常来限制未处理的期货以太币的数量?

if N is too big, it will eventually throw OOM. Is there a way to limit number of unprocessed Futures ether with queue-like wait or with exception?

推荐答案

因此,最简单的答案是,您可以创建一个ExecutionContext,该ExecutionContext阻止或限制新任务的执行超出一定限制.参见此博客文章 .有关阻塞的Java ExecutorService的更多实例,请参见示例. [如果需要,您可以直接使用它,Maven Central上的库为此处.]这包装了一些非阻塞的ExecutorService,您可以使用java.util.concurrent.Executors的工厂方法来创建.

So, the simplest answer is that you can create an ExecutionContext that blocks or throttles the execution of new tasks beyond a certain limit. See this blog post. For a more fleshed out example of a blocking Java ExecutorService, here is an example. [You can use it directly if you want, the library on Maven Central is here.] This wraps some nonblocking ExecutorService, which you can create using the factory methods of java.util.concurrent.Executors.

要将Java ExecutorService转换为Scala ExecutionContext只是ExecutionContext.fromExecutorService( executorService ).因此,使用上面链接的库,您可能会有类似...的代码

To convert a Java ExecutorService into a Scala ExecutionContext is just ExecutionContext.fromExecutorService( executorService ). So, using the library linked above, you might have code like...

import java.util.concurrent.{ExecutionContext,Executors}
import com.mchange.v3.concurrent.BoundedExecutorService

val executorService = new BoundedExecutorService(
  Executors.newFixedThreadPool( 10 ), // a pool of ten Threads
  100,                                // block new tasks when 100 are in process
  50                                  // restart accepting tasks when the number of in-process tasks falls below 50
 )

implicit val executionContext = ExecutionContext.fromExecutorService( executorService )

// do stuff that creates lots of futures here...

如果您想要一个有界的ExecutorService可以持续到整个应用程序一样长的时间,那很好.但是,如果您要在代码的本地化点中创建大量期货,则在完成ExecutorService后将要关闭它.我在Scala中定义贷款模式方法 [ Maven Central ]既创建上下文,又在完成后将其关闭.代码最终看起来像...

That's fine if you want a bounded ExecutorService that will last as long as your whole application. But if you are creating lots of futures in a localized point in your code, and you will want to shut down the ExecutorService when you are done with it. I define loan-pattern methods in Scala [maven central] that both create the context and shut it down after I'm done. The code ends up looking like...

import com.mchange.sc.v2.concurrent.ExecutionContexts

ExecutionContexts.withBoundedFixedThreadPool( size = 10, blockBound = 100, restartBeneath = 50 ) { implicit executionContext =>
    // do stuff that creates lots of futures here...

    // make sure the Futures have completed before the scope ends!
    // that's important! otherwise, some Futures will never get to run
}

您可以使用实例来强制执行任务调度(Future创建)Thread来执行任务,而不是异步运行它,而不是使用完全阻塞的ExecutorService实例.您可以使用ThreadPoolExecutor.CallerRunsPolicy来创建java.util.concurrent.ThreadPoolExecutor.但是 ThreadPoolExecutor 的构建相当复杂直接.

Rather than using an ExecutorService, that blocks outright, you can use an instance that slows things down by forcing the task-scheduling (Future-creating) Thread to execute the task rather than running it asynchronously. You'd make a java.util.concurrent.ThreadPoolExecutor using ThreadPoolExecutor.CallerRunsPolicy. But ThreadPoolExecutor is fairly complex to build directly.

所有这一切的更新,更性感,更以Scala为中心的替代方法是查看 Akka Streams 作为Future的替代方案,可通过反压"并发执行以防止OutOfMemoryErrors.

A newer, sexier, more Scala-centric alternative to all of this would be to check out Akka Streams as an alternative to Future for concurrent execution with "back-pressure" to prevent OutOfMemoryErrors.

这篇关于如何限制Scala中未处理的期货数量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-15 23:34