我正在开发Flink服务,在该服务中,我从一个Kafka中读取消息,然后将它们反序列化为HashMap并对其进行处理,最后将它们写到另一个Kafka中。现在,我遇到了一个我不知道如何解决的问题,也没有找到关于如何解决它的在线示例。
我想做的是为Flink Kafka Producer创建一个自定义分区,以便具有相同ID的事件进入相同的分区,因为重要的是要按顺序保留来自相同ID的事件。但是我不明白如何实现FlinkKafkaPartitioner,并且文档在这方面没有多大帮助。
到目前为止,我对Producer的了解如下(因为我只想使其正常工作,所以对FlinkKafkaProducer使用null,但是应该用自定义分区程序代替):
FlinkKafkaProducer010<String> writeToNewPipe = new FlinkKafkaProducer010<String>(
processorConfig.getKafkaDestTopic(),
new SimpleStringSchema(),
producerProps,
(FlinkKafkaPartitioner) null
);
在我的代码中,我执行以下操作:
eventsFromOldPipe
.map(event -> {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
try {
return mapper.writeValueAsString(event);
}
catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
})
.addSink(writeToNewPipe);
这里的eventsFromOldPipe发出HashMap。
现在说来自eventsFromOldPipe的HashMap包含一个我想用于分区键的sessionId字段,理想情况下,如果可能的话,我也想从生产者发送给Kafka的记录中删除该sessionId(删除它并不重要,但是会很好)。
我对Flink实现的更多“自定义”部分还很陌生,因此我对此一无所知,因此不胜感激。
最佳答案
只需实现KafkaSerializationSchema,使用键和值定义ProducerRecord。
kafka将按您定义的键对记录进行分区。
关于java - 如何实现FlinkKafkaPartitioner?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/58824501/