我有producer,它不断向某个主题发送消息。

假设我的consumer消耗了1到10条消息,然后消耗了它崩溃的第11条消息,当它返回producer产生了100条消息时,假设现在的消息为110,我知道当consumer加入consumer group它将获取最后一个提交的偏移量,因此它将重新开始从11读取,但是我想使用java在日志中打印这些偏移量值,并确保它不会丢失任何消息

还有我们如何在kafka中获得主题明智的TTL

最佳答案

从ConsumerRecord中,您可以获取与主题相对应的所有元数据

kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
            String kafkaMessages = null
            try{
                while(true){
                   ConsumerRecords kafkaRecords
                   kafkaRecords = kafkaConsumer.poll(100)
                     for(ConsumerRecord record: kafkaRecords){
                       partition = record.partition()
                       offset = record.offset()
                       topicName = record.topic()
                         Object message = record.value()
                       }

10-04 20:04