问题描述
AFAIK,
引入了kafka中的分区和(用户)组的概念以实现并行性.我正在通过python与kafka一起工作.我有一个特定的主题,该主题有2个分区.这意味着,如果我从一个包含两个使用者的使用者组开始,它们将被映射(订阅)到不同的分区.
The concept of partitions and (consumer) groups in kafka was introduced to implement parallelism. I am working with kafka through python. I have a certain topic, which has (say) 2 partitions. This means, if I start a consumer group with 2 consumers in it, they will be mapped(subscribed) to different partitions.
但是,在python中使用kafka
库,我遇到了一个奇怪的问题.我以基本相同的组ID启动了两个使用者,并启动了供他们使用消息的线程.
But, using kafka
library in python, I came across a weird issue. I started 2 consumers with essentially the same group-ids, and started the threads for them to consume messages.
但是,kafka流中的每条消息都被他们两个消耗!这对我来说似乎很荒谬,甚至在概念上是不正确的.无论如何,我是否可以手动将使用者映射到某些(不同的)分区(如果他们没有自动映射到不同的分区)?
But, every message in the kafka-stream is being consumed by both of them !! This seems ridiculous to me, and even conceptually incorrect. Is there anyway I can map the consumers to certain (distinct) partitions manually (if they are not mapped to different partitions automatically)?
这是代码:
from kafka import KafkaConsumer
import thread
def con1(consumer):
for msg in consumer:
print msg
consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
thread.start_new_thread(con1, (consumer1,))
thread.start_new_thread(con1, (consumer2,))
以下是我使用kafka-console-producer生成的某些消息的输出:
Here is the output for some messages that I produced using kafka-console-producer:
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
其中一个是期望值.顺便说一句,这个主题k-test
有2个分区.
while expected was one of each. BTW, this topic k-test
has 2 partitions.
推荐答案
尝试运行bin/kafka-consumer-groups.sh命令行工具,以验证您使用的Python Kafka客户端是否支持正确的使用者组管理.如果两个使用者确实在同一个组中,那么他们应该从互斥的分区中获取消息.
Try running the bin/kafka-consumer-groups.sh command line tool to verify if the Python Kafka client you are using supports proper consumer group management. If both consumers are indeed in the same group then they should get messages from mutually exclusive partitions.
这篇关于Python Kafka消费者组ID问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!