如何获得kafka主题的最新偏移量

如何获得kafka主题的最新偏移量

本文介绍了如何获得kafka主题的最新偏移量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Java编写 kafka 使用者。我想保留消息的实时,所以如果有太多消息等待消费,例如1000或更多,我应该放弃未使用的消息并开始使用最新的消息。

I'm writing a kafka consumer using Java. I want to keep the real time of the message, so if there are too many messages waiting for consuming, such as 1000 or more, I should abandon the unconsumed messages and start consuming from the latest offset.

对于这个问题,我尝试比较最后提交的偏移量和主题的最新偏移量(只有1个分区),如果这两个偏移量之间的差异大于某个量,我将设置作为下一个偏移量的主题的最新偏移量,以便我可以放弃那些冗余消息。

For this problem, I try to compare the last committed offset and the latest offset of a topic(only 1 partition), if the difference between these two offsets is larger than a certain amount, I will set the latest offset of the topic as next offset so that I can abandon those redundant messages.

现在我的问题是如何获得主题的最新偏移量,有人说我可以使用旧的消费者,但它太复杂了,新的消费者是否有这个功能?

Now my problem is how to get the latest offset of a topic, some people say I can use old consumer, but it's too complicated, do new consumer has this function?

推荐答案

新消费者也很复杂。

//分配主题
consumer.assign();

//寻求结束主题
consumer.seekToEnd();

//位置是纬度est offset
consumer.position();

这篇关于如何获得kafka主题的最新偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 02:36