我正在尝试使用CompleteableFutures将数据库快速加载到内存中。我在方法级别启动Spring事务:

@Transactional()
    private void loadErUp() {
        StopWatch sw = StopWatch.createStarted();
        List<CompletableFuture<Void>> calls = new ArrayList<>();
        final ZonedDateTime zdt = ZonedDateTime.now(ZoneId.of(ZoneOffset.UTC.getId())).minusMinutes(REFRESH_OVERLAP);

        for (long i = 1; i < 12 + 1; i++) {
            Long holder = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                this.loadPartition(holder, zdt);
            }, this.forkJoinPool);
            calls.add(future);
        }
        CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();
        log.info("All data refreshed in ({}ms) since:{}", sw.getTime(), zdt.format(DateTimeFormatter.ISO_INSTANT));
    }


然后通过将每个线程附加到主事务


  TransactionSynchronizationManager.setActualTransactionActive(true);


private <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
        log.debug("Refresh thread start:{}", partitionKey);
        TransactionSynchronizationManager.setActualTransactionActive(true);
        StopWatch sw = StopWatch.createStarted();

        try (Stream<Authority> authorityStream = aSqlRepo.findByPartitionKeyAndLastUpdatedTimeStampAfter(partitionKey, zdt)) {
            long count = authorityStream.peek(a -> {
                this.authorityRepository.set(this.GSON.fromJson(a.getJsonData(), AssetAuthority.class));
            }).count();
            log.info("Partition {} refreshed in ({}ms) with {} items.", partitionKey, sw.getTime(), count);
            return count;
        }
    }


因此,我每30秒运行一次该批处理作业,在第9次运行中,我得到4个线程,然后挂起(12 * 8运行= 96),因为它正在等待池打开。我得到:


  无法获取JDBC连接;无法提取连接
  30秒,无可用[大小:100;繁忙:100;空闲:0;
  lastwait:30000]。


因此很明显,连接没有提交。我以为是因为我有自己的ForkJoinPool,但是我关闭了所有这些线程,这似乎没有帮助。我还把另一个方法放在loadPartition()方法下面,但这似乎也没有帮助。还有另一个主题讨论如何使事务正常工作,但是我的工作却只是不提交。

最佳答案

如果要使每个#loadPartition在其自己的线程和自己的事务中运行,则需要:


#loadPartition标记为@Transactional
调用代理的#loadPartition方法,以便@Transactional起作用。您可以通过自动装配或从另一个代理类中调用方法来执行此操作


由于(重要!)that method is not getting proxied,事务未传播到异步线程。

因此它看起来像:

@Component
public class MyLoaderClass {

    // Autowire in this with constructor injection or @Autowired
    private MyLoaderClass myLoaderClass;

    // Removed @Transactional annotation
    public void loadErUp() {
        myLoaderClass.loadPartition(holder, zdt);
        ...
    }

    // 1) Add the @Transactional annotation to #loadPartition
    // 2) Make public to use self-autowiring (or refactored class, per link above)
    @Transactional
    public <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
        ...
        // Can remove TransactionSyncManager call
        ...
    }

}


您还需要确保在不确保最后一个作业完成的情况下,批处理作业不会运行。您可以通过using the @Scheduled annotation轻松解决此表加载问题,以确保运行不会“重叠”。

09-27 18:48