说,我想检查Kafka中特定分区的第一条消息和最后一条消息的偏移量。我的想法是将assign(…)方法与seekToBeginning(…)seekToEnd(…)一起使用。不幸的是,这行不通。

如果我将AUTO_OFFSET_RESET_CONFIG设置为"latest",则seekToBeginning(…)无效;如果我将其设置为"earliest",则seekToEnd(…)不起作用。看来,对我的消费者来说唯一重要的是AUTO_OFFSET_RESET_CONFIG

我看过类似的主题,但问题是subscribe()而不是assign()方法。提出的解决方案是实现ConsumerRebalanceListner并将其作为参数传递给subscribe()方法。不幸的是,assign()方法只有一个签名,并且只能获取主题分区的列表。

问题是:可以将seekToBeginning()seekToEnd()assign()方法一起使用。如果是,怎么办?如果没有,为什么?

我的代码的相关片段:

KafkaConsumer<String, ProtoMeasurement> consumer = createConsumer();
TopicPartition zeroP = new TopicPartition(TOPIC, 1);
List<TopicPartition> partitions = Collections.singletonList(zeroP);

consumer.assign(partitions);
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(partitions);
long currOffsetPos = consumer.position(zeroP);
LOGGER.info("Current offset {}.", currOffsetPos);
ConsumerRecords<String, ProtoMeasurement> records = consumer.poll(Duration.ofMillis(100));
// ...

记录器打印偏移量n,它是所考虑主题的最大(最新)偏移量。

最佳答案


您可以为此使用 beginningOffsets endOffsets

您必须在 poll() seekToBeginning 之后调用seekToEnd:
此函数懒惰求值,仅在调用poll(Duration)或position(TopicPartition)时寻求所有分区中的第一个偏移量

08-05 16:03