本文介绍了如何获取时间戳上指定的 kafka 偏移数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
当我尝试运行它时,我已经尝试根据时间戳从 Kafka 主题获取偏移量,它抛出空指针错误,
I've tried to get the offset from Kafka topic based on timestamp when I tried to run it was throwing null pointer error,
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : partitions) {
timestampsToSearch.put(partition, startTimestamp);
}
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
for (TopicPartition partition : partitions) {
Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset);
任何帮助将不胜感激.
推荐答案
要查找与时间戳对应的偏移量,您需要使用 offsetsForTimes()
方法.
To find the offsets that correspond to a timestamp, you need to use the offsetsForTimes()
method.
例如,这将打印 mytopic
的分区 0 对应于 1 秒前的偏移量:
For example, this will print the offsets for partition 0 of mytopic
that correspond to 1 second ago:
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(new TopicPartition("mytopic", 0), System.currentTimeMillis()-1*1000);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
System.err.println(offsets);
}
这将显示如下内容:
{offset-test-0=(timestamp=1561469319192, leaderEpoch=0, offset=100131)}
这篇关于如何获取时间戳上指定的 kafka 偏移数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!