问题描述
我正在使用 Java 编写一个 kafka
使用者.我想保持消息的实时性,所以如果等待消费的消息太多,比如1000条或更多,我应该放弃未消费的消息,从最后一个偏移量开始消费.
I'm writing a kafka
consumer using Java. I want to keep the real time of the message, so if there are too many messages waiting for consuming, such as 1000 or more, I should abandon the unconsumed messages and start consuming from the last offset.
对于这个问题,我尝试比较一个topic的最后提交偏移量和结束偏移量(只有1个分区),如果这两个偏移量的差值大于一定量,我会设置最后提交的偏移量主题作为下一个偏移量,这样我就可以放弃那些多余的消息.
For this problem, I try to compare the last committed offset and the end offset of a topic(only 1 partition), if the difference between these two offsets is larger than a certain amount, I will set the last committed offset of the topic as next offset so that I can abandon those redundant messages.
现在我的问题是如何获取topic的end offset,有人说可以用老consumer,但是太复杂了,新consumer有这个功能吗?
Now my problem is how to get the end offset of a topic, some people say I can use old consumer, but it's too complicated, do new consumer has this function?
推荐答案
新的消费者也很复杂.
//分配主题consumer.assign();
//寻求话题结束consumer.seekToEnd();
//位置为最新偏移量consumer.position();
这篇关于如何获取 kafka 主题分区的最后/结束偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!