在Spring Batch作业运行之前。我有一个导入表,其中包含需要导入我们系统的所有项目。此时已被验证为仅包含我们系统中不存在的项目。
接下来,我有一个Spring Batch Job,它使用JpaPagingItemReader从此导入表中读取。
工作完成后,它将使用ItemWriter写入db。
我的页面大小和块大小为10000。
现在,在MySQL innoDB上运行时,这绝对可以正常工作。我什至可以使用多个线程,并且一切正常。
但是现在我们正在迁移到PostgreSQL,并且同一批处理作业遇到了一些非常奇怪的问题
发生的是,它试图将重复项插入我们的系统。这自然会被唯一的索引约束所拒绝,并引发错误。
由于在批处理作业开始之前已验证导入数据库表仅包含不存在的表,因此我能想到的唯一原因是,当我在Postgres上运行时,JpaPagingItemReader多次从导入数据库表中读取某些行。但是为什么要这么做呢?
我已经尝试了很多设置。将块和页面大小减小到大约100只会使导入速度变慢,但仍然是相同的错误。运行单线程而不是多线程只会使错误稍晚发生。
那么到底是什么原因导致我的JpaPagingItemReader仅在PostgresSQL上多次读取相同的项目?
支持阅读器的select语句很简单,它是一个NamedQuery:
@NamedQuery(name = "ImportDTO.findAllForInsert",
query = "select h from ImportDTO h where h.toBeImported = true")
另请注意,批处理作业在运行时根本不会更改toBeImported标志,因此此查询的结果应始终在批处理作业之前,之下和之后返回相同的结果。
任何见解,技巧或帮助都非常有用!
这是批处理配置代码:
import javax.persistence.EntityManagerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private OrganizationItemWriter organizationItemWriter;
@Autowired
private EntityManagerFactory entityManagerFactory;
@Autowired
private OrganizationUpdateProcessor organizationUpdateProcessor;
@Autowired
private OrganizationInsertProcessor organizationInsertProcessor;
private Integer organizationBatchSize = 10000;
private Integer organizationThreadSize = 3;
private Integer maxThreadSize = organizationThreadSize;
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
return launcher;
}
@Bean
public JpaPagingItemReader<ImportDTO> findNewImportsToImport() throws Exception {
JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
databaseReader.setEntityManagerFactory(entityManagerFactory);
JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
jpaQueryProvider.setQuery("ImportDTO.findAllForInsert");
databaseReader.setQueryProvider(jpaQueryProvider);
databaseReader.setPageSize(organizationBatchSize);
// must be set to false if multi threaded
databaseReader.setSaveState(false);
databaseReader.afterPropertiesSet();
return databaseReader;
}
@Bean
public JpaPagingItemReader<ImportDTO> findImportsToUpdate() throws Exception {
JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
databaseReader.setEntityManagerFactory(entityManagerFactory);
JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
jpaQueryProvider.setQuery("ImportDTO.findAllForUpdate");
databaseReader.setQueryProvider(jpaQueryProvider);
databaseReader.setPageSize(organizationBatchSize);
// must be set to false if multi threaded
databaseReader.setSaveState(false);
databaseReader.afterPropertiesSet();
return databaseReader;
}
@Bean
public OrganizationItemWriter writer() throws Exception {
return organizationItemWriter;
}
@Bean
public StepExecutionNotificationListener stepExecutionListener() {
return new StepExecutionNotificationListener();
}
@Bean
public ChunkExecutionListener chunkListener() {
return new ChunkExecutionListener();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(maxThreadSize);
return taskExecutor;
}
@Bean
public Job importOrganizationsJob(JobCompletionNotificationListener listener) throws Exception {
return jobBuilderFactory.get("importAndUpdateOrganizationJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(importNewOrganizationsFromImports())
.next(updateOrganizationsFromImports())
.build();
}
@Bean
public Step importNewOrganizationsFromImports() throws Exception {
return stepBuilderFactory.get("importNewOrganizationsFromImports")
.<ImportDTO, Organization> chunk(organizationBatchSize)
.reader(findNewImportsToImport())
.processor(organizationInsertProcessor)
.writer(writer())
.taskExecutor(taskExecutor())
.listener(stepExecutionListener())
.listener(chunkListener())
.throttleLimit(organizationThreadSize)
.build();
}
@Bean
public Step updateOrganizationsFromImports() throws Exception {
return stepBuilderFactory.get("updateOrganizationsFromImports")
.<ImportDTO, Organization> chunk(organizationBatchSize)
.reader(findImportsToUpdate())
.processor(organizationUpdateProcessor)
.writer(writer())
.taskExecutor(taskExecutor())
.listener(stepExecutionListener())
.listener(chunkListener())
.throttleLimit(organizationThreadSize)
.build();
}
}
最佳答案
您需要添加order by子句才能选择
关于java - Spring Batch-JpaPagingItemReader-在MySQL中工作-在PostgreSQL中重复,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48684224/