我已经编写了如下方法:
public static CompletionStage<Tuple2<ObjectNode, String>> calculateTemplateTreeAndKeys(
String content,
RequestContext context,
MetricsClient metricsClient,
JdbcSession jdbcSession) {
AtomicReference<ObjectNode> templateTreeHolder = new AtomicReference<>();
templateTreeHolder.set(Json.rootNode());
return getTemplateIds(context, metricsClient, jdbcSession, content)
.thenCompose(
templateIds -> {
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
}));
return CompletableFuture.completedFuture(NotUsed.getInstance());
})
.thenApply(
notUsed -> {
String includedTemplateIdsStr =
getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();
System.out.println("From outer function: " + templateTreeHolder.get());
return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
});
我希望内部块在处理和更新templateTreeHolder之前
调用.thenApply,以便templateTreeHolder保留要返回的正确数据。但是,.thenApply块在内部.thenAccept块之前进行处理。
从控制台输出序列:
From outer function: {}
From inner function: {"f9406341-c62a-411a-9389-00a62bd63629":{}}
我不确定在链接CompletionStages时我做错了什么,请告诉我如何确保内部块在外部块之前完成?
最佳答案
传递给thenCompose
的函数将返回一个已经完成的未来,即return CompletableFuture.completedFuture(NotUsed.getInstance());
,它允许从属阶段立即进行。显然,这似乎与传递给templateIds.map(…)
的函数的评估冲突,该评估显然是异步发生的。
通常,应避免将完成阶段和对副作用的依赖性混合在一起,尤其是在未将其异步评估建模为前提完成阶段的情况下。
但是,如果您别无选择,则可以解决此问题:
return getTemplateIds(context, metricsClient, jdbcSession, content)
.thenCompose(
templateIds -> {
// create an initially uncompleted stage
CompletableFuture<Object> subStage = new CompletableFuture<>();
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
// complete when all work has been done
subStage.complete(null);
}));
// use this stage for dependent actions
return subStage;
})
.thenApply(
notUsed -> {
String includedTemplateIdsStr =
getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();
System.out.println("From outer function: " + templateTreeHolder.get());
return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
});
在上面的代码中,如果您的操作因尝试完成前的异常而失败,则将来将永远无法完成。一般模式如下:
CompletableFuture<Type> stage = new CompletableFuture<>();
…
try {
code that will eventually call complete on stage
}
catch(Throwable t) {
stage.completeExceptionally(t);
}
但是,当然,当应该完成该阶段的代码也要进行异步处理时,它将变得更加复杂,因此您必须保护试图提交实际完成代码以及实际完成代码的代码。
因此,内部代码的详细版本如下所示:
CompletableFuture<Object> subStage = new CompletableFuture<>();
try {
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
})
.whenComplete((v,t) -> {
// complete when all work has been done
if(t != null) subStage.completeExceptionally(t);
else subStage.complete(v);
}));
} catch(Throwable t) {
subStage.completeExceptionally(t);
}
// use this stage for dependent actions
return subStage;
(也许,“执行某些操作并返回CompletionStage”也必须使用
try { … } catch(Throwable t) { subStage.completeExceptionally(t); }
进行保护)