我已经编写了一个人为的代码示例,它可能不是某人应该使用的代码,但是我相信应该可以正常工作。但是,它却陷入僵局。我已经阅读了here中描述的答案,但发现它们不够用。
这是代码示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class Test {
public static void main(String argv[]) throws Exception {
int nThreads = 1;
Executor executor = Executors.newFixedThreadPool( nThreads );
CompletableFuture.completedFuture(true)
.thenComposeAsync((unused)->{
System.err.println("About to enqueue task");
CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
executor.execute(() -> {
// pretend this is some really expensive computation done asynchronously
System.err.println("Inner task");
innerFuture.complete(true);
});
System.err.println("Task enqueued");
return innerFuture;
}, executor).get();
System.err.println("All done");
System.exit(0);
}
}
打印:然后挂起。之所以陷入僵局,是因为执行程序只有一个线程,并且它正在等待innerFuture成为可赎回对象。为什么“thenComposeAsync”阻止其返回值可兑换,而不是返回仍然不完整的将来并在执行程序中释放其线程?
这感觉是完全不直观的,并且javadocs并没有真正的帮助。我是否从根本上误解了CompletionStages的工作方式?还是这是实现中的错误?
最佳答案
首先,让我使用2个静态函数重写您的代码,以更轻松地了解正在发生的情况:
// Make an executor equivalent to Executors.newFixedThreadPool(nThreads)
// that will trace to standard error when a task begins or ends
static ExecutorService loggingExecutor(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.err.println("Executor beginning task on thread: "
+ t.getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.err.println("Executor finishing task on thread: "
+ Thread.currentThread().getName());
}
};
}
和
// same as what you pass to thenComposeAsync
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) {
return b -> {
System.err.println(Thread.currentThread().getName()
+ ": About to enqueue task");
CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
executor.execute(() -> {
System.err.println(Thread.currentThread().getName()
+ ": Inner task");
innerFuture.complete(true);
});
System.err.println(Thread.currentThread().getName()
+ ": Task enqueued");
return innerFuture;
};
}
现在我们可以编写您的测试用例,如下所示:
ExecutorService e = loggingExecutor(1);
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.join();
e.shutdown();
/* Output before deadlock:
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
让我们测试一下您的结论,即在计算出第二个Future的结果之前,不会释放第一个线程:
ExecutorService e = loggingExecutor(2); // use 2 threads this time
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.join();
e.shutdown();
/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
Executor finishing task on thread: pool-1-thread-1
*/
确实,线程1似乎一直保持到线程2完成为止
让我们看看
thenComposeAsync
本身是否阻止了您:ExecutorService e = loggingExecutor(1);
CompletableFuture<Boolean> future =
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e);
System.err.println("thenComposeAsync returned");
future.join();
e.shutdown();
/*
thenComposeAsync returned
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
thenComposeAsync
没有被阻止。它立即返回了CompletableFuture
,并且只有在我们尝试完成时才发生死锁。那么,完成.thenComposeAsync(inner(e), e)
返回的 future 应该怎么做?CompletableFuture<Boolean>
CompletableFuture<Boolean>
也完成。只有这样, future 才算完整。因此,如您所见,它无法执行您的建议并返回不完整的Future。 是 bug 吗?为什么在计算内部任务时CompletionStage保留在线程1上?正如您所指出的,这不是错误,因为文档非常模糊,并且不保证以任何特定顺序释放线程。另外,请注意,Thread1将用于CompletableFuture的任何后续
then*()
方法。考虑以下:ExecutorService e = loggingExecutor(2);
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.thenRun(() -> System.err.println(Thread.currentThread().getName()
+ ": All done"))
.join();
e.shutdown();
/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
pool-1-thread-1: All done
Executor finishing task on thread: pool-1-thread-1
*/
如您所见,.thenRun(...)在线程1上执行。我相信这与CompletableFuture的其他* Async(...,Executor exec)方法一致。
但是,如果您想将
thenComposeAsync
的功能分为两个可单独控制的步骤,而不是将其留给API来处理线程,该怎么办?您可以这样做:ExecutorService e = loggingExecutor(1);
completedFuture(true)
.thenApplyAsync(inner(e), e) // do the async part first
.thenCompose(x -> x) // compose separately
.thenRun(() -> System.err.println(Thread.currentThread().getName()
+ ": All done"))
.join();
e.shutdown();
一切将在1个线程上完美运行,没有死锁。
总之,您所说的这种行为是不直观的吗?我不知道。我无法想象为什么
thenComposeAsync
甚至存在。如果方法返回CompletableFuture
,则不应阻塞该方法,并且没有理由异步调用它。