我有以下代码(来自my previous question),该代码在远程服务器上安排任务,然后使用ScheduledExecutorService#scheduleAtFixedRate轮询完成。任务完成后,它将下载结果。
我想将Future返回给调用方,以便他们可以决定阻止时间和阻止时间,并为他们提供取消任务的选项。

我的问题是,如果客户端取消了Future方法返回的download,则whenComplete块不会执行。如果删除thenApply,它会删除。很明显,我误解了Future的组成...我应该改变什么?

public Future<Object> download(Something something) {
    String jobId = schedule(something);
    CompletableFuture<String> job = pollForCompletion(jobId);
    return job.thenApply(this::downloadResult);
}

private CompletableFuture<String> pollForCompletion(String jobId) {
    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    CompletableFuture<String> completionFuture = new CompletableFuture<>();

    ScheduledFuture<?> checkFuture = executor.scheduleAtFixedRate(() -> {
            if (pollRemoteServer(jobId).equals("COMPLETE")) {
                completionFuture.complete(jobId);
            }
    }, 0, 10, TimeUnit.SECONDS);
    completionFuture
            .whenComplete((result, thrown) -> {
                System.out.println("XXXXXXXXXXX"); //Never happens unless thenApply is removed
                checkFuture.cancel(true);
                executor.shutdown();
            });
    return completionFuture;
}

同样,如果我这样做:
return completionFuture.whenComplete(...)

代替
completionFuture.whenComplete(...);
return completionFuture;
whenComplete也永远不会执行。这对我来说似乎很违反直觉。从逻辑上讲,由Future返回的whenComplete应该不是我应该坚持的吗?

编辑:

我更改了代码,以明确向后传播取消。这是令人讨厌和难以理解的,但是它可以工作,我找不到更好的方法:
public Future<Object> download(Something something) throws ChartDataGenException, Exception {
        String jobId = schedule(something);
        CompletableFuture<String> job = pollForCompletion(jobId);
        CompletableFuture<Object> resulting = job.thenApply(this::download);
        resulting.whenComplete((result, thrown) -> {
            if (resulting.isCancelled()) { //the check is not necessary, but communicates the intent better
                job.cancel(true);
            }
        });
        return resulting;
}

编辑2:

我发现了tascalate-concurrent,这是一个很棒的库,提供了CompletionStage的合理实现,并支持可以透明地反向传播取消的相关 promise (通过DependentPromise类)。似乎非常适合此用例。

这应该足够了:
DependentPromise
          .from(pollForCompletion(jobId))
          .thenApply(this::download, true); //true means the cancellation should back-propagate

请注意,没有测试这种方法。

最佳答案

您的结构如下:

           ┌──────────────────┐
           │ completionFuture |
           └──────────────────┘
             ↓              ↓
  ┌──────────────┐      ┌───────────┐
  │ whenComplete |      │ thenApply |
  └──────────────┘      └───────────┘

因此,当您取消thenApply的 future 时,原始的completionFuture对象不会受到影响,因为它并不取决于thenApply阶段。但是,如果您不链接thenApply阶段,那么您将返回原始的completionFuture实例,取消此阶段会导致所有相关阶段的取消,从而使whenComplete操作立即执行。

但是,如果取消thenApply阶段,则在满足completionFuture条件时,pollRemoteServer(jobId).equals("COMPLETE")仍可能会完成,因为轮询不会停止。但是我们不知道jobId = schedule(something)pollRemoteServer(jobId)的关系。如果您的应用程序状态以某种方式更改,以致取消下载后再也无法满足此条件,那么将来将永远无法完成…

关于您的最后一个问题,哪个 future 是“我应该坚持的那个 future ?”,实际上,并不需要线性的 future 链,而CompletableFuture的便捷方法使创建这样的链条变得容易,更多通常,这是最没有用的事情,因为如果您具有线性依赖性,则只需编写一段代码即可。您将两个独立阶段链接在一起的模型是正确的,但是取消不能通过它完成,但是也不能通过线性链来完成。

如果您希望能够取消源阶段,则需要对其进行引用,但是,如果想要获得从属阶段的结果,则也需要对该阶段进行引用。

09-04 11:20
查看更多