问题描述
在 2.10 之前的 Scala 中,我可以在 defaultForkJoinPool 中设置并行度(如本答案 scala 并行集合的并行度).在 Scala 2.10 中,该 API 不再存在.有据可查,我们可以在单个集合上设置并行度 (http://docs.scala-lang.org/overviews/parallel-collections/configuration.html) 通过分配给它的 taskSupport 属性.
In Scala before 2.10, I can set the parallelism in the defaultForkJoinPool (as in this answer scala parallel collections degree of parallelism). In Scala 2.10, that API no longer exists. It is well documented that we can set the parallelism on a single collection (http://docs.scala-lang.org/overviews/parallel-collections/configuration.html) by assigning to its taskSupport property.
然而,我在我的代码库中使用并行集合,并且不想在每个集合实例化中添加额外的两行.有没有办法配置全局默认线程池大小,以便 someCollection.par.map(f(_))
自动使用默认线程数?
However, I use parallel collections all over my codebase and would not like to add an extra two lines to every single collection instantiation. Is there some way to configure the global default thread pool size so that someCollection.par.map(f(_))
automatically uses the default number of threads?
推荐答案
我知道这个问题已经有一个多月了,但我刚刚遇到了完全相同的问题.谷歌搜索无济于事,我在新 API 中找不到任何看起来半途而废的东西.
I know that the question is over a month old, but I've just had exactly the same question. Googling wasn't helpful and I couldn't find anything that looked halfway sane in the new API.
按照此处的建议设置 -Dscala.concurrent.context.maxThreads=n:为 Scala 2.10 中的所有集合设置并行度级别? 似乎根本没有效果,但我不确定我是否正确使用它(我使用java"运行我的应用程序在没有明确安装 'scala' 的环境中,这可能是原因).
Setting -Dscala.concurrent.context.maxThreads=n as suggested here: Set the parallelism level for all collections in Scala 2.10? seemingly had no effect at all, but I'm not sure if I used it correctly (I run my application with 'java' in an environment without 'scala' installed explicitly, it might be the cause).
我不知道为什么 scala-people 从适当的包对象中删除了这个必要的 setter.
I don't know why scala-people removed this essential setter from the appropriate package object.
但是,通常可以使用反射来解决不完整/奇怪的界面:
However, it's often possible to use reflection to work around an incomplete/weird interface:
def setParallelismGlobally(numThreads: Int): Unit = {
val parPkgObj = scala.collection.parallel.`package`
val defaultTaskSupportField = parPkgObj.getClass.getDeclaredFields.find{
_.getName == "defaultTaskSupport"
}.get
defaultTaskSupportField.setAccessible(true)
defaultTaskSupportField.set(
parPkgObj,
new scala.collection.parallel.ForkJoinTaskSupport(
new scala.concurrent.forkjoin.ForkJoinPool(numThreads)
)
)
}
对于那些不熟悉 Scala 更晦涩的特性的人,这里有一个简短的解释:
For those not familiar with the more obscure features of Scala, here is a short explanation:
scala.collection.parallel.`package`
使用 defaultTaskSupport 变量访问包对象(它看起来有点像 Java 的静态变量,但它实际上是包对象的成员变量).标识符需要反引号,因为 package
是保留关键字.然后我们得到我们想要的私有 final 字段(getField("defaultTaskSupport") 由于某种原因不起作用?...),告诉它可以访问以便能够修改它,然后将它的值替换为我们自己的 ForkJoinTaskSupport.
accesses the package object with the defaultTaskSupport variable (it looks somewhat like Java's static variable, but it's actually a member variable of the package object). The backticks are required for the identifier, because package
is a reserved keyword. Then we get the private final field that we want (getField("defaultTaskSupport") didn't work for some reason?...), tell it to be accessible in order to be able to modify it, and then replace it's value by our own ForkJoinTaskSupport.
我还不了解创建并行集合的确切机制,但是 Combiner trait 的源代码表明 defaultTaskSupport 的值应该以某种方式渗透到并行集合中.
I don't yet understand the exact mechanism of the creation of parallel collections, but the source code of the Combiner trait suggests that the value of defaultTaskSupport should percolate to the parallel collections somehow.
请注意,该问题与一个更老的问题在性质上属于同一类型:我的代码库中到处都是 Math.random(),如何将种子设置为固定数字以进行调试?"(参见例如:在 Math.random() 上设置种子).在这两种情况下,我们都有某种全局静态"变量,我们在数百万个不同的地方隐式使用了它,我们想更改它,但是这个变量没有设置器 => 我们使用反射.
Notice that the question is qualitatively of the same sort as a much older question: "I have Math.random() all over my codebase, how can I set the seed to a fixed number for debugging purposes?" (See e.g. : Set seed on Math.random() ). In both cases, we have some sort of global "static" variable that we implicitly use in a million different places, we want to change it, but there are no setters for this variable => we use reflection.
丑得像地狱,但似乎工作得很好.如果您需要限制线程总数,请不要忘记垃圾收集器在单独的线程上运行.
Ugly as hell, but seems to work just fine. If you need to limit the total number of threads, don't forget that the garbage collector runs on separate thread.
这篇关于如何为 Scala 2.10 并行集合设置默认线程数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!