我在Java中实现了KafkaConsumer
,目前它从未退出.poll
方法。当我在调试模式下深入研究源代码时,我发现它陷入了AbstractCoordinator.ensureCoordinatorKnown()
的while循环中,因为从未找到协调器。
循环中从sendGroupMetadataRequest()
返回的future第一次使用org.apache.kafka.clients.consumer.internals.SendFailedException
失败,然后以后每次使用org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
都会失败。有谁知道为什么会这样?
如果我使用控制台生产者/使用者,则能够成功发送和接收消息,只有当我使用KafkaConsumer的实现时才可以。此外,使用者确实在我的两台服务器上工作,因此我知道这不是使用者的实现。
这是创建我的使用者所使用的属性:
Properties props = new Properties();
props.put("bootstrap.servers", "myserver:9000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
编辑:
该主题肯定是在消费者开始之前创建的。
编辑2:
我删除了集群中的所有代理,然后重新创建了它们,但是现在我又失败了。在尝试重新加入的
AbstractCoordinator.ensureActiveGroup()
中,从performGroupJoin()
返回的将来反复失败,并显示org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group.
。仍然不确定发生了什么。编辑3:
我删除了代理,并使用其他ID重新创建了它们,现在
.poll()
方法正在返回,并且已成功使用了消息。我仍然想知道为什么它首先失败了,所以我可以确保它不会再次发生。 最佳答案
删除经纪人并创建新经纪人可以解决此问题。仍然不确定经纪人出了什么问题。