我正在使用sarama-cluster lib在后端服务中创建kafka组使用者。 godoc的以下示例代码有效:

for {
    if msg, ok := <-consumer.Messages(); ok {
        fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
        consumer.MarkOffset(msg, "") // mark message as processed
    }
}

由于这是一个死循环,因此将其放在goroutine中以避免阻塞其他 Activity ,因此它不再消耗任何消息:

go func() {
    for msg := range consumer.Messages() {
        fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
        consumer.MarkOffset(msg, "") // mark message as processed
    }
}()

(服务正在运行,因此此goroutine不会终止。它只是无法使用)

任何想法如何解决这个问题?

最佳答案

with the following notice已经不维护了很长时间https://github.com/Shopify/sarama/pull/1099:

请注意,由于github.com/Shopify/sarama
合并并发布(> = v1.19.0)此库已正式发布
不推荐使用。本机实现支持各种用例
无法通过该库使用。

我建议您改用from their repository。它具有sarama-cluster的所有功能,并且正在积极维护中。

您可以按照简单的消费者组示例ojit_a进行操作。

关于go - 在goroutine中扭曲sarama-cluster消耗操作,然后它无法消耗任何东西,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61242692/

10-16 08:33