问题描述
ItemReader
正在从DB2读取数据,并提供了Java对象ClaimDto
.现在,ClaimProcessor
接受ClaimDto
的对象,并返回CompositeClaimRecord
对象,该对象由claimRecord1
和claimRecord2
组成,该对象将被发送到两个不同的Kafka主题.如何分别将claimRecord1
和claimRecord2
写入topic1和topic2.
ItemReader
is reading data from DB2 and gave java object ClaimDto
. Now the ClaimProcessor
takes in the object of ClaimDto
and return CompositeClaimRecord
object which comprises of claimRecord1
and claimRecord2
which to be sent to two different Kafka topics. How to write claimRecord1
and claimRecord2
to topic1 and topic2 respectively.
推荐答案
只需编写一个自定义的ItemWriter
即可.
Just write a custom ItemWriter
that does exactly that.
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
for (CompositeClaimRecord record : items) {
writer1.write(Collections.singletonList(record.claimRecord1));
writer2.write(Collections.singletonList(record.claimRecord2));
}
}
}
或者不是一次写入1条记录,而是将单个列表转换为2个列表并将其传递.但是以这种方式,错误处理可能会有点挑战. \
Or instead of writing 1 record at a time convert the single list into 2 lists and pass that along. But error handling might be a bit of a challenge that way. \
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());
writer1.write(record1List);
writer2.write(record2List);
}
}
这篇关于Spring Batch:一个阅读器,一个复合处理器(两个具有不同实体的类)和两个kafkaItemWriter的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!