我正在使用sarama-cluster
lib在后端服务中创建kafka组使用者。 godoc的以下示例代码有效:
for {
if msg, ok := <-consumer.Messages(); ok {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}
由于这是一个死循环,因此将其放在goroutine中以避免阻塞其他 Activity ,因此它不再消耗任何消息:
go func() {
for msg := range consumer.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}()
(服务正在运行,因此此goroutine不会终止。它只是无法使用)
任何想法如何解决这个问题?
最佳答案
with the following notice已经不维护了很长时间https://github.com/Shopify/sarama/pull/1099:
请注意,由于github.com/Shopify/sarama是
合并并发布(> = v1.19.0)此库已正式发布
不推荐使用。本机实现支持各种用例
无法通过该库使用。
我建议您改用from their repository。它具有sarama-cluster
的所有功能,并且正在积极维护中。
您可以按照简单的消费者组示例ojit_a进行操作。
关于go - 在goroutine中扭曲sarama-cluster消耗操作,然后它无法消耗任何东西,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61242692/