我在Java中实现了KafkaConsumer,目前它从未退出.poll方法。当我在调试模式下深入研究源代码时,我发现它陷入了AbstractCoordinator.ensureCoordinatorKnown()的while循环中,因为从未找到协调器。

循环中从sendGroupMetadataRequest()返回的future第一次使用org.apache.kafka.clients.consumer.internals.SendFailedException失败,然后以后每次使用org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.都会失败。有谁知道为什么会这样?

如果我使用控制台生产者/使用者,则能够成功发送和接收消息,只有当我使用KafkaConsumer的实现时才可以。此外,使用者确实在我的两台服务器上工作,因此我知道这不是使用者的实现。

这是创建我的使用者所使用的属性:

Properties props = new Properties();
props.put("bootstrap.servers", "myserver:9000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");


编辑:

该主题肯定是在消费者开始之前创建的。

编辑2:
我删除了集群中的所有代理,然后重新创建了它们,但是现在我又失败了。在尝试重新加入的AbstractCoordinator.ensureActiveGroup()中,从performGroupJoin()返回的将来反复失败,并显示org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group.。仍然不确定发生了什么。

编辑3:
我删除了代理,并使用其他ID重新创建了它们,现在.poll()方法正在返回,并且已成功使用了消息。我仍然想知道为什么它首先失败了,所以我可以确保它不会再次发生。

最佳答案

删除经纪人并创建新经纪人可以解决此问题。仍然不确定经纪人出了什么问题。

09-11 03:43
查看更多