我有producer
,它不断向某个主题发送消息。
假设我的consumer
消耗了1到10条消息,然后消耗了它崩溃的第11条消息,当它返回producer
产生了100条消息时,假设现在的消息为110,我知道当consumer
加入consumer group
它将获取最后一个提交的偏移量,因此它将重新开始从11读取,但是我想使用java
在日志中打印这些偏移量值,并确保它不会丢失任何消息
还有我们如何在kafka
中获得主题明智的TTL
最佳答案
从ConsumerRecord中,您可以获取与主题相对应的所有元数据
kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
String kafkaMessages = null
try{
while(true){
ConsumerRecords kafkaRecords
kafkaRecords = kafkaConsumer.poll(100)
for(ConsumerRecord record: kafkaRecords){
partition = record.partition()
offset = record.offset()
topicName = record.topic()
Object message = record.value()
}