问题描述
想法
我有一种处理方法,该方法接收项目列表并使用外部Web服务异步处理它们.处理步骤还可以在处理过程中保留数据.在整个过程的最后,我也希望将整个过程以及每个处理的结果都保留下来.
I have a processing method which takes in a list of items and processes them asynchronously using external web service. The process steps also persist data while processing. At the end of whole process, I want to persist the whole process along with each processed results as well.
问题
我将列表中的每个项目转换为CompletableFuture
,并对它们执行处理任务,然后将它们放回期货数组中.现在,使用其.ofAll
方法(顺序方法)来完成所有提交的任务后的将来,并返回另一个保存结果的CompletableFuture
.
I convert each item in the list into CompletableFuture
and run a processing task on them, and put them back into an array of futures. Now using its .ofAll
method (in sequence method) to complete future when all the submitted tasks are completed and return another CompletableFuture
which holds the result.
当我想获得该结果时,我调用.whenComplete(..)
,并希望将返回的结果作为数据设置到我的实体中,然后保存到数据库中,但是存储库save调用什么都不做,仅继续执行线程继续运行,不会超过存储库保存调用.
When I want to get that result, I call .whenComplete(..)
, and would want to set the returned result into my entity as data, and then persist to the database, however the repository save call just does nothing and continues threads just continue running, it's not going past the repository save call.
@Transactional
public void process(List<Item> items) {
List<Item> savedItems = itemRepository.save(items);
final Process process = createNewProcess();
final List<CompletableFuture<ProcessData>> futures = savedItems.stream()
.map(item -> CompletableFuture.supplyAsync(() -> doProcess(item, process), executor))
.collect(Collectors.toList());
sequence(futures).whenComplete((data, throwable) -> {
process.setData(data);
processRepository.save(process); // <-- transaction lost?
log.debug("Process DONE"); // <-- never reached
});
}
排序方法
private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
}
发生了什么事?为什么持久调用未通过.是启动事务的线程不能提交事务,或者在哪里丢失事务?所有处理过的数据都返回正常并且都很好.我尝试了不同的交易策略,但是如果是这样的话,如何控制哪个线程可以完成交易?
What is happening? Why is the persist call not passing. Is the thread that started the transaction not able to commit the transaction or where does it get lost? All the processed data returns fine and is all good. I've tried different transaction strategies, but how is it possible to control which thread is gonna finish the transaction, if it's the case?
有什么建议吗?
推荐答案
您遇到问题的原因是,如上所述,交易结束了当方法process(..)的返回值到达时.
The reason of your problem is, as said above, that the transaction endswhen the return of method process(..) is reached.
您可以做的是手动创建交易,从而使您全力以赴控制它的开始和结束时间.
What you can do, is create the transaction manually, that gives you fullcontrol over when it starts and ends.
删除@Transactional
Remove @Transactional
然后自动连接TransactionManager并进行处理(..):
Autowire the TransactionManager then in process(..) :
TransactionDefinition txDef = new DefaultTransactionDefinition();
TransactionStatus txStatus = transactionManager.getTransaction(txDef);
try {
//do your stuff here like
doWhateverAsync().then(transactionManager.commit(txStatus);)
} catch (Exception e) {
transactionManager.rollback(txStatus);
throw e;
}
这篇关于CompletableFuture与Spring交易的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!