问题描述
我有一个 Kafka 应用程序,我在其中使用 kafka-console-consumer.sh 消费消息,如下所示:
I have a Kafka application from where I have been consuming messages using kafka-console-consumer.sh as following:
$./kafka-console-consumer.sh --zookeeper zookeeperhost:2181 --topic myTopic
它提供了我通过 Kafka 消费者写给 Kafka 代理的所有消息,没有任何遗漏.
which gives all the messages which I write to Kafka broker through a Kafka consumer without any miss.
最近我在一个不同的环境中部署了该应用程序,在该环境中,zookeeperhost 无法访问(由于某种原因).所以我使用 kafka-simple-consumer-shell.sh 代替如下:
Recently I deployed the application in a different environment where zookeeperhost is not accessible (due to some reason). So I am using kafka-simple-consumer-shell.sh instead as below:
$./kafka-simple-consumer-shell.sh --broker-list brokerhost:9092 --topic myTopic --partition 0 --max-messages 1
但是有了这个,我看到很少有消息(大约 5000 中的 2-4 个)被遗漏.有人可以解释一下 kafka-simple-consumer-shell.sh 如何读取消息.
But with this I see few messages (around 2-4 in 5000) go missed. Could someone please explain how kafka-simple-consumer-shell.sh reads messages.
我怀疑可能有些消息会发送到某个不同的分区,因为我只是从分区 0 读取,所以我不会每次都收到所有消息.但是我不知道如何查看有多少个分区?其他分区的ID是什么?我试过 1 但它不起作用.
I am doubting that probably some messages are going to some different partition and as I am just reading from partition 0 so I am not getting all the messages every time. But I do not know how to check how many partitions are there? and what are the ids for other partitions? I tried with 1 but it does not work.
有人可以帮忙吗.
推荐答案
kafka-simple-consumer.sh
只是创建一个从一个分区读取消息的使用者.因此,您的命令只需从 brokerhost:9092
读取 myTopic 的分区 0
中的一条消息.如果分区 1 不在同一个代理中,它将不会像您所做的那样工作.(有关更多信息,请查看 来自 GitHub 的代码)
kafka-simple-consumer.sh
simply creates a consumer that reads messages from one partition. So your command simply reads a single message in partition 0 of myTopic
from brokerhost:9092
. If partition 1 is not in the same broker, it will not work as what you did. (For more information, check Code from GitHub)
如果你可以访问Zookeeper主机,你可以简单地用
If you can access to the Zookeeper host, you can simply check how partitions are distributed in a cluster with
bin/kafka-topics.sh --describe --zookeeper zookeeperhost:2181 --topic myTopic
但是如果你无法访问Zookeeper主机,我能想到的有两种方法.
but if you can't access to the Zookeeper host, there are two ways as I can think of.
- 提供一个包含所有代理的列表作为参数,并尝试从 0 到 N 的分区号.您可以以
broker1:port2 的格式向
.然后你可以计算出整个集群中存在多少个分区,但你仍然不知道哪个代理有哪些分区.--broker-list
提供多个代理,broker2:port2,broker3:port3 - 手动检查每个代理的日志目录.检查
/tmp/kafka-logs
(如果您使用的是默认日志目录).您会发现myTopic-0
、myTopic-1
、...等目录,其格式为topic-partition#
.您可以使用此手动检查哪个代理具有哪些分区.
- Provide a list having all brokers as a parameter and try partition numbers from 0 to N. You can provide multiple brokers to
--broker-list
in a format ofbroker1:port2,broker2:port2,broker3:port3
. Then you can figure out how many partitions exist in the entire cluster, but still you don't know which broker has which partitions. - Manually check a log directory of each broker. Check
/tmp/kafka-logs
(if you are using a default log directory). You will find directories likemyTopic-0
,myTopic-1
, ... which are in a format oftopic-partition#
. You can check which broker has which partitions manually with this.
这篇关于Kafka简单消费者间歇性丢失消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!