问题描述
我需要在Kafka主题之外的给定时间范围内阅读消息.我能想到的解决方案是,首先找出时间范围开始处的最大偏移量,然后继续使用消息,直到所有分区上的偏移量超过该时间范围结束为止.有没有更好的方法来解决这个问题?谢谢!
I need to read the messages in a given time range out of a Kafka topic. The solution that I can think of is to first find out the largest offset for the beginning of the time range, and then continue consume the messages until the offsets on all partitions past the end of the time range. Is there any better approach for solving this problem? Thanks!
推荐答案
好吧,您肯定必须首先搜索适合该时间范围开头的第一个偏移量.
Well, you definitely have to first search for the first offset which fits the opening of the time range.
可以使用 KafkaConsumer#offsetsForTimes 方法.
该方法接受 Map< TopicPartition,Long(timestamp)>
的映射,并返回 Map< TopicPartition,OffsetAndTimestamp>
,其中中的时间戳OffsetAndTimestamp
是第一个消息的时间戳,该消息的时间戳等于或大于,然后是指定的时间戳.
The method accepts a map of Map<TopicPartition, Long(timestamp)>
, and returns a Map<TopicPartition, OffsetAndTimestamp>
where the timestamp in OffsetAndTimestamp
is of the first message with timestamp Equal to or greater then the one specified.
从那里,您可以将消费者分配给返回的偏移量,并进行迭代,直到记录中的时间戳超过您的时间范围的结束为止.
From there, you can assign your consumer to the offset returned, and iterate until the timestamp in the record exceeds the end of your time range.
一些伪代码:
static void main(String[] args) {
String topic = args[1];
long timestampBeginning = Long.parseLong(args[2]);
long timestampEnd = Long.parseLong(args[3]);
TopicPartition partition = new TopicPartition(topic, 0);
Consumer<Object, Object> consumer = createConsumer();
long beginningOffset = consumer.offsetsForTimes(
Collections.singletonMap(partition, timestampBeginning))
.get(partition).offset();
consumer.assign(Collections.singleton(partition)); // must assign before seeking
consumer.seek(partition, beginningOffset);
for (ConsumerRecord<Object, Object> record : consumer.poll()) {
if (record.timestamp() > timestampEnd) {
break; // or whatever
}
// handle record
}
}
这篇关于在指定的时间范围内,我们应如何阅读Kafka主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!