问题描述
我想使用自定义的ForkJoinPool与ArrayList.parallelStream()进行更多的并行处理(默认情况下,它使用公共池).
I want to use my custom ForkJoinPool to have more parallelism with ArrayList.parallelStream() (by default it uses common pool).
我这样做:
List<String> activities = new ArrayList<>();
for (int i = 0; i < 3000; i++) {
activities.add(String.valueOf(i));
}
ForkJoinPool pool = new ForkJoinPool(10);
pool.submit(() ->
activities.parallelStream()
.map(s -> {
try {
System.out.println("Start task = " + s);
Thread.sleep(100);
System.out.println("End task = " + s);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
})
.collect(toList())
).get();
当我观察in在VisualVM中的工作方式时,我看到:具有并行性10的VisualVM
and when I observe how in works in VisualVM, I see:VisualVM with parallelism 10
在某些时候,一些线程被停放,而其他线程则完成其余的工作.当我查看转储时,我发现什么都不做的线程处于停车状态.
At some point some threads are parked and the other threads do the rest of the work. When I looked at the dump, I saw that threads, which does nothing, was in parking state.
实验表明,如果您创建一个具有并行度参数的ForkJoinPool,它是2的幂,那么一切都很好...
Experimentally, it was revealed that if you create a ForkJoinPool with parallelism parameter what is a power of two, everything is OK...
List<String> activities = new ArrayList<>();
for (int i = 0; i < 3000; i++) {
activities.add(String.valueOf(i));
}
ForkJoinPool pool = new ForkJoinPool(16);
pool.submit(() ->
activities.parallelStream()
.map(s -> {
try {
System.out.println("Start task = " + s);
Thread.sleep(100);
System.out.println("End task = " + s);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
})
.collect(toList())
).get();
它可以是小于64的2的幂(我不知道为什么,但是它不超过ForkJoinPool的34个线程),但是如果不是2的幂,我们会得到奇怪的行为.
It can be any of powers of two less than 64 (I don't know why but it's not more than 34 threads of the ForkJoinPool), but if it is not power of two, we get the strange behaviour.
为什么会这样?如何使用它?
Why it happens? How to work with it?
推荐答案
而不是实例化您自己的ForkJoinPool,您应该调用工厂方法ForJoinPool.commonPool()
,该方法应与可用的内核相适应.
Instead of instanciating your own ForkJoinPool, you should invoque the factory method ForJoinPool.commonPool()
which should be adapted with your available cores.
可以尝试一下.
无论如何,我认为您最好使用Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
代替.
Anyway, I think you'd better have to use Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
instead.
这篇关于自定义ForkJoinPool中的嵌套ArrayList.ParallelStream()不均匀地使用线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!