我注意到Java 8中带有Streaming的CompleteableFutures有一些异常行为。
String [] arr = new String[]{"abc", "def", "cde", "ghj"};
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<String> lst =
Arrays.stream(arr)
.map(r ->
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "e";
} catch (Exception e) {
e.printStackTrace();
}
return null;
}, executorService)
)
.map(CompletableFuture::join)
.collect(Collectors.toList());
上面的这段代码执行需要4 * 5000 = 20秒,因此这意味着期货正在彼此等待。
String [] arr = new String[]{"abc", "def", "cde", "ghj"};
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<CompletableFuture<String>> lst =
Arrays.stream(arr)
.map(r ->
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "d";
} catch (Exception e) {
e.printStackTrace();
}
return null;
}, executorService)
)
.collect(Collectors.toList());
List<String> s =
lst
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(s);
但是,此代码在5秒钟内运行,这意味着期货正在并行运行。
我不明白的是:在第二个示例中,我明确地获得了一个期货列表,然后进行了连接,这需要5秒钟,第一个示例中,我一直在等待它。
这背后的原因是什么?
最佳答案
流不一定要先执行一个阶段,然后再执行下一个阶段。他们可以按照选择的任何顺序进行操作。
例如
Arrays.stream(array).map(e -> f(e)).map(e -> g(e)).collect(toList());
最终可能会以与
Arrays.stream(array).map(e -> g(f(e))).collect(toList());
...您将看到结果:一次生成一个期货并立即加入,而不是先生成所有期货然后加入。
实际上,如果您不执行异步操作,通常使用第二种方法会更有效。这样,流框架不必存储f的所有结果,而后存储g的所有结果:它只能存储g(f(e))的结果。流框架不知道您正在执行异步代码,因此它可以完成正常的高效工作。