我想了解Java中嵌套流之间的排序约束。
示例1:

public static void main(String[] args) {
    IntStream.range(0, 10).forEach(i -> {
        System.out.println(i);
        IntStream.range(0, 10).forEach(j -> {
            System.out.println("    " + i + " " + j);
        });
    });
}
这段代码是确定性地执行的,因此内部循环在每个forEach上运行j,然后外部循环在下一个forEach上运行自己的i:
0
    0 0
    0 1
    0 2
    0 3
    0 4
    0 5
    0 6
    0 7
    0 8
    0 9
1
    1 0
    1 1
    1 2
    1 3
    1 4
    1 5
    1 6
    1 7
    1 8
    1 9
2
    2 0
    2 1
    2 2
    2 3
...
示例2:
public static void main(String[] args) {
    IntStream.range(0, 10).parallel().forEach(i -> {
        System.out.println(i);
        IntStream.range(0, 10).parallel().forEach(j -> {
            System.out.println("    " + i + " " + j);
        });
    });
}
如果像在第二个示例中那样将流变成parallel(),我可以想象内部工作人员在等待线程在外部工作队列中可用时会阻塞,因为外部工作队列线程必须在内部流完成时阻塞,并且默认线程池只有有限数量的线程。但是,似乎不会发生死锁:
6
5
8
    8 6
0
1
    6 2
7
    1 6
    8 5
    7 6
    8 8
2
    0 6
    0 2
    0 8
    5 2
    5 4
    5 6
    0 5
    2 6
    7 2
    7 5
    7 8
    6 4
    8 9
    1 5
 ...
两个流共享相同的默认线程池,但是它们生成不同的工作单元。每个外部工作单元只能在该外部工作单元的所有内部单元完成后才能完成,因为在每个并行流的末尾都有一个完成屏障。
如何在共享的工作线程共享池中管理这些内部和外部流之间的协调,而又没有任何死锁?

最佳答案

并行流背后的线程池是公共池,您可以使用ForkJoinPool.commonPool()获得。它通常使用NumberOfProcessors-1个工人。为了解决您所描述的依赖关系,如果(某些)当前工作程序被阻止并且可能出现死锁,它可以动态创建其他工作程序。
但是,这不是您的情况的答案。ForkJoinPool中的任务具有两个重要功能:

  • 他们可以创建子任务,并将当前任务拆分为较小的碎片(叉子)。
  • 他们可以等待子任务(加入)。

  • 当线程执行这样的任务A并加入子任务B时,它不仅会等待阻止子任务完成其执行,还会同时执行另一个任务C。 C完成后,线程返回A并检查B是否完成。请注意,B和C可以(并且很可能是)相同的任务。如果B完成,则A已成功等待/加入它(无阻塞!)。如果前面的解释不清楚,请查阅this指南。
    现在,当您使用并行流时,该流的范围将递归地分为多个任务,直到任务变得如此之小以至于它们可以更有效地顺序执行。这些任务被放入公共池中的工作队列(每个工作人员一个)。因此,IntStream.range(0, 100).parallel().forEach所做的是递归地划分范围,直到不再值得为止。可以使用forEach中提供的代码按顺序执行每个最终任务,或者一堆迭代。此时,公共池中的工作程序可以执行这些任务,直到所有任务都完成并且流可以返回为止。请注意,调用线程通过加入子任务来帮助执行!
    现在,在您的情况下,这些任务中的每一个都使用并行流。程序是一样的。将其拆分为较小的任务,然后将这些任务放入公共池的工作队列中。从ForkJoinPool的角度来看,这些只是已经存在的任务之外的其他任务。工人只是继续执行/加入任务,直到所有工作都完成并且外部流可以返回为止。
    这是您在输出中看到的结果:没有确定性的行为,没有固定的顺序。同样也不会发生死锁,因为在给定的用例中不会阻塞线程。
    您可以使用以下代码查看说明:
        public static void main(String[] args) {
            IntStream.range(0, 10).parallel().forEach(i -> {
                IntStream.range(0, 10).parallel().forEach(j -> {
                    for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
                    System.out.printf("%d %d %s\n", i, j, Thread.currentThread().getName());
                    for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
                });
            });
        }
    
    您应该注意,内部线程的执行涉及主线程,因此它不会被(!)阻塞。普通池工作人员只是一个接一个地选择任务,直到所有任务都完成为止。

    07-24 15:28