我正在开发Flink服务,在该服务中,我从一个Kafka中读取消息,然后将它们反序列化为HashMap并对其进行处理,最后将它们写到另一个Kafka中。现在,我遇到了一个我不知道如何解决的问题,也没有找到关于如何解决它的在线示例。
我想做的是为Flink Kafka Producer创建一个自定义分区,以便具有相同ID的事件进入相同的分区,因为重要的是要按顺序保留来自相同ID的事件。但是我不明白如何实现FlinkKafkaPartitioner,并且文档在这方面没有多大帮助。

到目前为止,我对Producer的了解如下(因为我只想使其正常工作,所以对FlinkKafkaProducer使用null,但是应该用自定义分区程序代替):

FlinkKafkaProducer010<String> writeToNewPipe = new FlinkKafkaProducer010<String>(
                processorConfig.getKafkaDestTopic(),
                new SimpleStringSchema(),
                producerProps,
                (FlinkKafkaPartitioner) null
        );


在我的代码中,我执行以下操作:

eventsFromOldPipe
                .map(event -> {
                    ObjectMapper mapper = new ObjectMapper();
                    mapper.registerModule(new JavaTimeModule());
                    mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
                    try {
                        return mapper.writeValueAsString(event);
                    }
                    catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                    return null;
                })
                .addSink(writeToNewPipe);


这里的eventsFromOldPipe发出HashMap。

现在说来自eventsFromOldPipe的HashMap包含一个我想用于分区键的sessionId字段,理想情况下,如果可能的话,我也想从生产者发送给Kafka的记录中删除该sessionId(删除它并不重要,但是会很好)。

我对Flink实现的更多“自定义”部分还很陌生,因此我对此一无所知,因此不胜感激。

最佳答案

只需实现KafkaSerializationSchema,使用键和值定义ProducerRecord。
kafka将按您定义的键对记录进行分区。

关于java - 如何实现FlinkKafkaPartitioner?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/58824501/

10-12 03:47
查看更多