当我尝试运行时,我尝试根据时间戳从Kafka主题获取偏移量,这会引发空指针错误,
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : partitions) {
timestampsToSearch.put(partition, startTimestamp);
}
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
for (TopicPartition partition : partitions) {
Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset);
任何帮助将不胜感激。
最佳答案
要查找与时间戳相对应的偏移量,您需要使用offsetsForTimes()
方法。
例如,这将打印mytopic
分区0的偏移量,该偏移量对应于1秒前:
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(new TopicPartition("mytopic", 0), System.currentTimeMillis()-1*1000);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
System.err.println(offsets);
}
这将显示如下内容:
{offset-test-0=(timestamp=1561469319192, leaderEpoch=0, offset=100131)}