我尝试将Java Streams的某些工作并行化。让我们考虑这个简单的例子:
Stream.generate(new Supplier<Integer>() {
@Override
public Integer get() {
return generateNewInteger();
}
})
.parallel()
.forEachOrdered(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println(integer);
}
});
问题是它不为
accept
调用forEachOrdered
方法,仅当我使用forEach
时才有效。我想问题是Stream.generate
在内部创建了没有InfiniteSupplyingSpliterator
特性的ORDERED
。问题是为什么?好像我们知道按什么顺序生成数据。第二个问题是如何在并行化流上生成
forEachOrdered
并生成流元素? 最佳答案
最简单的答案是,Stream.generate
是无序的,因为it’s specification这样说。
这与实现是否尝试在可能的情况下按顺序处理项目不同,实际上恰恰相反。一旦将操作定义为无序的,则实现将尽可能尝试从无序的性质中受益。如果您在无序操作中遇到的情况看起来像是源订单,则可能没有办法从无序处理中受益,或者实现尚未利用所有机会。由于这可能会在将来的版本或替代实现中更改,因此,如果已将操作指定为无序的,则您不得依赖该顺序。
与Stream.generate
(有序)相比,将Stream.iterate
定义为无序的意图可能会更清楚。传递给iterate
的函数将接收其前一个元素,因此元素之间存在一个先后关系,因此有一个顺序。传递给Stream.generate
的供应商没有收到前一个元素,换句话说,仅考虑功能签名时,它与前一个元素没有任何关系。像用例一样适用于Stream.generate(() -> constant)
或Stream.generate(Type::new)
,但不适用于Stream.generate(instance::statefulOp)
,这似乎不是预期的主要用例。如果操作是线程安全的,并且仍然可以保持流的无序性质,它仍然可以工作。
您的示例从未取得进展的原因是,forEachOrdered
的实现实际上并未考虑无序性质,而是尝试按遇到顺序拆分后处理块,即所有子任务都尝试缓冲其元素,以便它们可以一旦完成左侧任务的子任务,就将其传递给操作。当然,缓冲源和无限源不能很好地配合使用,尤其是因为底层的InfiniteSupplyingSpliterator
会分解成无限多个子任务。原则上,有一个最左边的任务可以将其元素直接输入到操作中,但是该任务似乎在队列中的某个位置,等待被激活,这将永远不会发生,因为所有工作线程已经在忙于处理另一个无限子线程-任务。最终,如果您让它运行足够长时间,则整个操作将以OutOfMemoryError
中断…