最近,我开始学习与kafka一起工作。我正在从事的项目使用sarama。
为了阅读消息,我使用ConsumerGroup
。
如果foo
返回false
,我需要在一段时间后再次阅读该消息。如何才能做到这一点?
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}
}
return nil
}
最佳答案
您可以通过在消费者组的Setup()
回调中包含以下内容,将消费者组的偏移量重置为较早的偏移量:
func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
sess.ResetOffset(topic, partition, offset, "")
return nil
}
您也可以通过控制台实现以下目的:
kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--topic myTopicName \
--reset-offsets \
--to-offfset 100 \
--execute