我已经编写了一个人为的代码示例,它可能不是某人应该使用的代码,但是我相信应该可以正常工作。但是,它却陷入僵局。我已经阅读了here中描述的答案,但发现它们不够用。
这是代码示例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Test {

    public static void main(String argv[]) throws Exception {

        int nThreads = 1;
        Executor executor = Executors.newFixedThreadPool( nThreads );



        CompletableFuture.completedFuture(true)
            .thenComposeAsync((unused)->{

                System.err.println("About to enqueue task");
                CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
                executor.execute(() -> {

                    // pretend this is some really expensive computation done asynchronously

                    System.err.println("Inner task");
                    innerFuture.complete(true);
                });
                System.err.println("Task enqueued");

                return innerFuture;
            }, executor).get();

        System.err.println("All done");
        System.exit(0);

    }

}
打印:

然后挂起。之所以陷入僵局,是因为执行程序只有一个线程,并且它正在等待innerFuture成为可赎回对象。为什么“thenComposeAsync”阻止其返回值可兑换,而不是返回仍然不完整的将来并在执行程序中释放其线程?
这感觉是完全不直观的,并且javadocs并没有真正的帮助。我是否从根本上误解了CompletionStages的工作方式?还是这是实现中的错误?

最佳答案

首先,让我使用2个静态函数重写您的代码,以更轻松地了解正在发生的情况:

// Make an executor equivalent to Executors.newFixedThreadPool(nThreads)
// that will trace to standard error when a task begins or ends
static ExecutorService loggingExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>()) {

                @Override
                protected void beforeExecute(Thread t, Runnable r) {
                    System.err.println("Executor beginning task on thread: "
                       + t.getName());
                }

                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    System.err.println("Executor finishing task on thread: "
                       + Thread.currentThread().getName());
                }

            };
}


// same as what you pass to thenComposeAsync
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) {
    return b -> {
        System.err.println(Thread.currentThread().getName()
                   + ": About to enqueue task");
        CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
        executor.execute(() -> {
            System.err.println(Thread.currentThread().getName()
                   + ": Inner task");
            innerFuture.complete(true);
        });
        System.err.println(Thread.currentThread().getName()
                   + ": Task enqueued");

        return innerFuture;
    };
}

现在我们可以编写您的测试用例,如下所示:
ExecutorService e = loggingExecutor(1);

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .join();

e.shutdown();

/* Output before deadlock:
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/

让我们测试一下您的结论,即在计算出第二个Future的结果之前,不会释放第一个线程:
ExecutorService e = loggingExecutor(2);  // use 2 threads this time

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .join();

e.shutdown();

/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
Executor finishing task on thread: pool-1-thread-1
*/

确实,线程1似乎一直保持到线程2完成为止

让我们看看thenComposeAsync本身是否阻止了您:
ExecutorService e = loggingExecutor(1);

CompletableFuture<Boolean> future =
        CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e);

System.err.println("thenComposeAsync returned");

future.join();

e.shutdown();

/*
thenComposeAsync returned
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
thenComposeAsync没有被阻止。它立即返回了CompletableFuture,并且只有在我们尝试完成时才发生死锁。那么,完成.thenComposeAsync(inner(e), e)返回的 future 应该怎么做?
  • API需要等待innner(e)返回CompletableFuture<Boolean>
  • ,它需要等待返回的CompletableFuture<Boolean>也完成。只有这样, future 才算完整。因此,如您所见,它无法执行您的建议并返回不完整的Future。

  • 是 bug 吗?为什么在计算内部任务时CompletionStage保留在线程1上?正如您所指出的,这不是错误,因为文档非常模糊,并且不保证以任何特定顺序释放线程。另外,请注意,Thread1将用于CompletableFuture的任何后续then*()方法。考虑以下:
    ExecutorService e = loggingExecutor(2);
    
    CompletableFuture.completedFuture(true)
            .thenComposeAsync(inner(e), e)
            .thenRun(() -> System.err.println(Thread.currentThread().getName()
                             + ": All done"))
            .join();
    
    e.shutdown();
    
    /*
    Executor beginning task on thread: pool-1-thread-1
    pool-1-thread-1: About to enqueue task
    pool-1-thread-1: Task enqueued
    Executor beginning task on thread: pool-1-thread-2
    pool-1-thread-2: Inner task
    Executor finishing task on thread: pool-1-thread-2
    pool-1-thread-1: All done
    Executor finishing task on thread: pool-1-thread-1
    */
    

    如您所见,.thenRun(...)在线程1上执行。我相信这与CompletableFuture的其他* Async(...,Executor exec)方法一致。

    但是,如果您想将thenComposeAsync的功能分为两个可单独控制的步骤,而不是将其留给API来处理线程,该怎么办?您可以这样做:
    ExecutorService e = loggingExecutor(1);
    
    completedFuture(true)
            .thenApplyAsync(inner(e), e) // do the async part first
            .thenCompose(x -> x)         // compose separately
            .thenRun(() -> System.err.println(Thread.currentThread().getName()
                            + ": All done"))
            .join();
    
    e.shutdown();
    

    一切将在1个线程上完美运行,没有死锁。

    总之,您所说的这种行为是不直观的吗?我不知道。我无法想象为什么thenComposeAsync甚至存在。如果方法返回CompletableFuture,则不应阻塞该方法,并且没有理由异步调用它。

    09-25 22:04