我尝试将Java Streams的某些工作并行化。让我们考虑这个简单的例子:

Stream.generate(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return generateNewInteger();
        }
    })
    .parallel()
    .forEachOrdered(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) {
            System.out.println(integer);
        }
    });

问题是它不为accept调用forEachOrdered方法,仅当我使用forEach时才有效。我想问题是Stream.generate在内部创建了没有InfiniteSupplyingSpliterator特性的ORDERED

问题是为什么?好像我们知道按什么顺序生成数据。第二个问题是如何在并行化流上生成forEachOrdered并生成流元素?

最佳答案

最简单的答案是,Stream.generate是无序的,因为it’s specification这样说。

这与实现是否尝试在可能的情况下按顺序处理项目不同,实际上恰恰相反。一旦将操作定义为无序的,则实现将尽可能尝试从无序的性质中受益。如果您在无序操作中遇到的情况看起来像是源订单,则可能没有办法从无序处理中受益,或者实现尚未利用所有机会。由于这可能会在将来的版本或替代实现中更改,因此,如果已将操作指定为无序的,则您不得依赖该顺序。

Stream.generate(有序)相比,将Stream.iterate定义为无序的意图可能会更清楚。传递给iterate的函数将接收其前一个元素,因此元素之间存在一个先后关系,因此有一个顺序。传递给Stream.generate的供应商没有收到前一个元素,换句话说,仅考虑功能签名时,它与前一个元素没有任何关系。像用例一样适用于Stream.generate(() -> constant)Stream.generate(Type::new),但不适用于Stream.generate(instance::statefulOp),这似乎不是预期的主要用例。如果操作是线程安全的,并且仍然可以保持流的无序性质,它仍然可以工作。

您的示例从未取得进展的原因是,forEachOrdered的实现实际上并未考虑无序性质,而是尝试按遇到顺序拆分后处理块,即所有子任务都尝试缓冲其元素,以便它们可以一旦完成左侧任务的子任务,就将其传递给操作。当然,缓冲源和无限源不能很好地配合使用,尤其是因为底层的InfiniteSupplyingSpliterator会分解成无限多个子任务。原则上,有一个最左边的任务可以将其元素直接输入到操作中,但是该任务似乎在队列中的某个位置,等待被激活,这将永远不会发生,因为所有工作线程已经在忙于处理另一个无限子线程-任务。最终,如果您让它运行足够长时间,则整个操作将以OutOfMemoryError中断…

09-27 18:14