我在为需要编排不同任务的程序实现优雅的函数式解决方案时遇到了麻烦。这是我想要实现的目标。
我有三个类,我想编排它们的方法(为简洁起见进行了简化):
class TaskA {
public ResultA call() {
return new ResultA();
}
}
class TaskB {
public ResultB call(ResultA a) {
return new ResultB();
}
}
class TaskC {
public ResultC call(List<ResultB> resultBs) {
return new ResultC();
}
}
我需要并行执行
TaskA
'n' 次,对于 TaskA
的每次执行,我需要使用相应 TaskB
的结果执行 TaskA
'n' 次。最后,我需要使用 TaskC
的所有调用结果执行一次 TaskB
。实现这一点的一种方法是创建一个
Callable
来封装对 TaskA
和 TaskB
的调用,最后在我的主线程中,收集 List
的 Future
的 ResultB
以执行 TaskC
:class TaskATaskBCallable implements Callable<ResultB> {
private TaskA taskA ...;
private TaskB taskB ...;
public ResultB call() {
return taskB.call(taskA.call());
}
}
在我的主线程中:
private ResultC orchestrate() {
ExecutorService service = ...;
List<Callable<ResultB>> callables = ...;
taskC.call(callables.map(callable ->
service.submit(callable)).map(Future::get).collect(Collectors.toList());
}
我不喜欢这个解决方案的一件事是
TaskATaskBCallable
。这可能是一个不必要的类耦合 TaskA
和 TaskB
。此外,如果我必须将另一个任务链接到 TaskA
和 TaskB
,我将不得不修改 TaskATaskBCallable
也可能修改其名称。我觉得我可以通过更聪明地使用 Java 并发库类(如 CompletableFuture
或 Phaser
)来摆脱它。任何指针?
最佳答案
我找到了一种使用 CompletableFuture
做到这一点的方法:
private ResultC orchestrate() {
ExecutorService service = ...;
int taskCount = ...;
List<CompletableFuture<ResultB>> resultBFutures = IntStream.rangeClosed(1, taskCount)
.mapToObj((i) -> CompletableFuture.supplyAsync(() -> new TaskA().call(), service))
.map(resultAFuture -> resultAFuture.thenApplyAsync(resultA -> new TaskB().call(resultA),
service))
.collect(Collectors.toList());
return new TaskC().call(CompletableFuture.allOf(resultBFutures.toArray(new CompletableFuture[resultBFutures.size()]))
.thenApply(v -> resultBFutures.stream().map(CompletableFuture::join)
.collect(Collectors.toList()))
.join());
}
关于java - 任务编排实现,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38687963/