在Spring Batch作业运行之前。我有一个导入表,其中包含需要导入我们系统的所有项目。此时已被验证为仅包含我们系统中不存在的项目。

接下来,我有一个Spring Batch Job,它使用JpaPagingItemReader从此导入表中读取。
工作完成后,它将使用ItemWriter写入db。

我的页面大小和块大小为10000。
现在,在MySQL innoDB上运行时,这绝对可以正常工作。我什至可以使用多个线程,并且一切正常。

但是现在我们正在迁移到PostgreSQL,并且同一批处理作业遇到了一些非常奇怪的问题
发生的是,它试图将重复项插入我们的系统。这自然会被唯一的索引约束所拒绝,并引发错误。
由于在批处理作业开始之前已验证导入数据库表仅包含不存在的表,因此我能想到的唯一原因是,当我在Postgres上运行时,JpaPagingItemReader多次从导入数据库表中读取某些行。但是为什么要这么做呢?

我已经尝试了很多设置。将块和页面大小减小到大约100只会使导入速度变慢,但仍然是相同的错误。运行单线程而不是多线程只会使错误稍晚发生。
那么到底是什么原因导致我的JpaPagingItemReader仅在PostgresSQL上多次读取相同的项目?
支持阅读器的select语句很简单,它是一个NamedQuery:

@NamedQuery(name = "ImportDTO.findAllForInsert",
            query = "select h from ImportDTO h where h.toBeImported = true")


另请注意,批处理作业在运行时根本不会更改toBeImported标志,因此此查询的结果应始终在批处理作业之前,之下和之后返回相同的结果。

任何见解,技巧或​​帮助都非常有用!

这是批处理配置代码:

import javax.persistence.EntityManagerFactory;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private OrganizationItemWriter organizationItemWriter;
    @Autowired
    private EntityManagerFactory entityManagerFactory;
    @Autowired
    private OrganizationUpdateProcessor organizationUpdateProcessor;
    @Autowired
    private OrganizationInsertProcessor organizationInsertProcessor;

    private Integer organizationBatchSize = 10000;
    private Integer organizationThreadSize = 3;
    private Integer maxThreadSize = organizationThreadSize;

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(jobRepository);
        return launcher;
    }

    @Bean
    public JpaPagingItemReader<ImportDTO> findNewImportsToImport() throws Exception {
        JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
        databaseReader.setEntityManagerFactory(entityManagerFactory);
        JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
        jpaQueryProvider.setQuery("ImportDTO.findAllForInsert");
        databaseReader.setQueryProvider(jpaQueryProvider);
        databaseReader.setPageSize(organizationBatchSize);
        // must be set to false if multi threaded
        databaseReader.setSaveState(false);
        databaseReader.afterPropertiesSet();
        return databaseReader;
    }

    @Bean
    public JpaPagingItemReader<ImportDTO> findImportsToUpdate() throws Exception {
        JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
        databaseReader.setEntityManagerFactory(entityManagerFactory);
        JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
        jpaQueryProvider.setQuery("ImportDTO.findAllForUpdate");
        databaseReader.setQueryProvider(jpaQueryProvider);
        databaseReader.setPageSize(organizationBatchSize);
        // must be set to false if multi threaded
        databaseReader.setSaveState(false);
        databaseReader.afterPropertiesSet();
        return databaseReader;
    }

    @Bean
    public OrganizationItemWriter writer() throws Exception {
        return organizationItemWriter;
    }

    @Bean
    public StepExecutionNotificationListener stepExecutionListener() {
        return new StepExecutionNotificationListener();
    }

    @Bean
    public ChunkExecutionListener chunkListener() {
        return new ChunkExecutionListener();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(maxThreadSize);
        return taskExecutor;
    }

    @Bean
    public Job importOrganizationsJob(JobCompletionNotificationListener listener) throws Exception {
        return jobBuilderFactory.get("importAndUpdateOrganizationJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .start(importNewOrganizationsFromImports())
                .next(updateOrganizationsFromImports())
                .build();
    }

    @Bean
    public Step importNewOrganizationsFromImports() throws Exception {
        return stepBuilderFactory.get("importNewOrganizationsFromImports")
                .<ImportDTO, Organization> chunk(organizationBatchSize)
                .reader(findNewImportsToImport())
                .processor(organizationInsertProcessor)
                .writer(writer())
                .taskExecutor(taskExecutor())
                .listener(stepExecutionListener())
                .listener(chunkListener())
                .throttleLimit(organizationThreadSize)
                .build();
    }


    @Bean
    public Step updateOrganizationsFromImports() throws Exception {
        return stepBuilderFactory.get("updateOrganizationsFromImports")
                .<ImportDTO, Organization> chunk(organizationBatchSize)
                .reader(findImportsToUpdate())
                .processor(organizationUpdateProcessor)
                .writer(writer())
                .taskExecutor(taskExecutor())
                .listener(stepExecutionListener())
                .listener(chunkListener())
                .throttleLimit(organizationThreadSize)
                .build();
    }
}

最佳答案

您需要添加order by子句才能选择

关于java - Spring Batch-JpaPagingItemReader-在MySQL中工作-在PostgreSQL中重复,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48684224/

10-09 21:56