问题描述
我有相当大的 CSV 文件,我需要对其进行解析,然后保存到 PostgreSQL 中.例如,一个文件包含 2_070_000 条记录,我能够在大约 8 分钟(单线程)内解析并保留这些记录.是否可以使用多个线程来持久化它们?
I have fairly large CSV files which I need to parse and then persist into PostgreSQL. For example, one file contains 2_070_000 records which I was able to parse and persist in ~8 minutes (single thread). Is it possible to persist them using multiple threads?
public void importCsv(MultipartFile csvFile, Class<T> targetClass) {
final var headerMapping = getHeaderMapping(targetClass);
File tempFile = null;
try {
final var randomUuid = UUID.randomUUID().toString();
tempFile = File.createTempFile("data-" + randomUuid, "csv");
csvFile.transferTo(tempFile);
final var csvFileName = csvFile.getOriginalFilename();
final var csvReader = new BufferedReader(new FileReader(tempFile, StandardCharsets.UTF_8));
Stopwatch stopWatch = Stopwatch.createStarted();
log.info("Starting to import {}", csvFileName);
final var csvRecords = CSVFormat.DEFAULT
.withDelimiter(';')
.withHeader(headerMapping.keySet().toArray(String[]::new))
.withSkipHeaderRecord(true)
.parse(csvReader);
final var models = StreamSupport.stream(csvRecords.spliterator(), true)
.map(record -> parseRecord(record, headerMapping, targetClass))
.collect(Collectors.toUnmodifiableList());
// How to save such a large list?
log.info("Finished import of {} in {}", csvFileName, stopWatch);
} catch (IOException ex) {
ex.printStackTrace();
} finally {
tempFile.delete();
}
}
models 包含大量记录.解析为记录是使用并行流完成的,因此速度非常快.我害怕调用 SimpleJpaRepository.saveAll,因为我不确定它会在幕后做什么.
models contains a lot of records. The parsing into records is done using parallel stream, so it's quite fast. I'm afraid to call SimpleJpaRepository.saveAll, because I'm not sure what it will do under the hood.
问题是:持久化如此庞大的实体列表的最有效方法是什么?
The question is: What is the most efficient way to persist such a large list of entities?
P.S.:非常感谢任何其他改进.
P.S.: Any other improvements are greatly appreciated.
推荐答案
你必须使用批量插入.
- 为自定义存储库创建接口
SomeRepositoryCustom
public interface SomeRepositoryCustom {
void batchSave(List<Record> records);
}
- 创建
SomeRepositoryCustom
的实现
@Repository
class SomesRepositoryCustomImpl implements SomeRepositoryCustom {
private JdbcTemplate template;
@Autowired
public SomesRepositoryCustomImpl(JdbcTemplate template) {
this.template = template;
}
@Override
public void batchSave(List<Record> records) {
final String sql = "INSERT INTO RECORDS(column_a, column_b) VALUES (?, ?)";
template.execute(sql, (PreparedStatementCallback<Void>) ps -> {
for (Record record : records) {
ps.setString(1, record.getA());
ps.setString(2, record.getB());
ps.addBatch();
}
ps.executeBatch();
return null;
});
}
}
- 使用
SomeRepositoryCustom
扩展您的
JpaRepository
@Repository
public interface SomeRepository extends JpaRepository, SomeRepositoryCustom {
}
保存
someRepository.batchSave(records);
注意事项
请记住,如果您甚至使用批量插入,数据库驱动程序将不会使用它们.例如,对于 MySQL,需要在数据库 URL 中添加参数 rewriteBatchedStatements=true
.所以最好启用驱动程序 SQL 日志记录(不是 Hibernate)来验证一切.也可用于调试驱动程序代码.
Keep in mind that, if you are even using batch inserts, database driver will not use them. For example, for MySQL, it is necessary to add a parameter rewriteBatchedStatements=true
to database URL.So better to enable driver SQL logging (not Hibernate) to verify everything. Also can be useful to debug driver code.
您需要决定在循环中按数据包拆分记录
You will need to make decision about splitting records by packets in the loop
for (Record record : records) {
}
驱动程序可以为您完成,因此您将不需要它.但最好也调试这个东西.
A driver can do it for you, so you will not need it. But better to debug this thing too.
P.S. 不要到处使用 var
.
P. S. Don't use var
everywhere.
这篇关于持久化数千个实体的最有效方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!