问题描述
我曾经在生产中看到过这个(我不记得我们是如何解决它的),现在我可以在集成测试中重复它,它总是从全新的 Kafka 安装开始.过程如下:
I've seen this in production once (I don't remember how we solved it) and now I can repeat it in the integration tests, which always start with a brand new Kafka installation. Here's how it goes:
第 1 步:尚不存在的组的使用者订阅尚不存在的主题并开始轮询.
Step 1: A consumer of a group that doesn't exist yet subscribes to a topic that does not exist yet and starts polling.
self.kafka_consumer = confluent_kafka.Consumer({
'group.id': 'mygroup',
'bootstrap.servers': 'kafka:9092',
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
})
self.kafka_consumer.subscribe('mytopic')
第 2 步:生产者向主题写入消息.
Step 2: A producer writes a message to the topic.
结果:
- 大约有一半的时间可以正常工作;消费者阅读消息没问题.
- 另一半消费者似乎卡住了.我试过最多等待 10 分钟,看看它是否会解开,但不会.
- 即使这两个步骤相反,即消费者尝试订阅已有消息的现有主题,行为也是相同的(但该组始终是新的).
更多详情
消费者轮询超时 2 秒,如果没有结果则循环结束.
The consumer is polling with a timeout of 2 seconds, and if there's no result it loops over.
当主题不存在时,poll()
返回 None
.主题存在后,poll()
返回一个 msg
,其 error().code()
是 _PARTITION_EOF
.
While the topic doesn't exist, poll()
returns None
. After the topic exists, poll()
returns an msg
whose error().code()
is _PARTITION_EOF
.
虽然消费者似乎卡住了,但我问 kafka mygroup
发生了什么,它告诉我:
While the consumer seems stuck, I ask kafka what's going on with mygroup
, and here's what it tells me:
root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
root@e7b124b4039c:/#
我试图通过将另一个不存在的主题阅读为 mygroup
来使其解脱:
I try to make it unstuck by trying to read another nonexistent topic as mygroup
:
root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group mygroup --topic nonexistent --from-beginning
[2018-03-15 16:36:59,369] WARN [Consumer clientId=consumer-1, groupId=pixelprocessor] Error while fetching metadata with correlation id 2 : {nonexistent=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
^CProcessed a total of 0 messages
root@e7b124b4039c:/#
在我这样做之后,这是 Kafka 对 mygroup
的评价:
After I do that, here's what Kafka has to say about mygroup
:
root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mytopic 0 - 1 - rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57 /172.20.0.6 rdkafka
(another topic) 0 - 0 - rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57 /172.20.0.6 rdkafka
(a third topic) 0 - 0 - rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57 /172.20.0.6 rdkafka
nonexistent 0 0 0 0 - - -
这是 Kafka 1.0.1、librdkafka 0.11.3、confluent_kafka 0.11.0,在 Ubuntu 16.04 dockers(带有操作系统的打包 zookeeper 3.4.8)上运行,它们在带有 Linux 4.9.0 的 Debian 延伸(9.4)上运行-6-amd64.
This is Kafka 1.0.1, librdkafka 0.11.3, confluent_kafka 0.11.0, on Ubuntu 16.04 dockers (with the OS's packaged zookeeper 3.4.8) which are running on a Debian stretch (9.4) with Linux 4.9.0-6-amd64.
推荐答案
问题似乎出在 Consumer()
参数中.这不能正常工作:
The problem seems to have been in the Consumer()
arguments. This doesn't work properly:
self.kafka_consumer = confluent_kafka.Consumer({
'group.id': 'mygroup',
'bootstrap.servers': 'kafka:9092',
'auto.offset.reset': 'earliest',
})
但这确实是:
self.kafka_consumer = confluent_kafka.Consumer({
'group.id': 'mygroup',
'bootstrap.servers': 'kafka:9092',
'default.topic.config': {
'auto.offset.reset': 'earliest',
},
})
这篇关于有时新的消费组不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!