

这是对 zookeeper在哪里执行的后续问题存储Kafka群集和相关信息?根据Armando Ballaci提供的答案.

This is a followup question to "Where do zookeeper store Kafka cluster and related information?" based on the answer provided by Armando Ballaci.


Now it's clear that consumer offsets are stored in the Kafka cluster in a special topic called __consumer_offsets. That's fine, I am just wondering how does the retrieval of these offsets work.


Topics are not like RDBS over which we can query for arbitrary data based on a certain predicate. Ex - if the data is stored in an RDBMS, probably a query like below will get the consumer offset for a particular partition of a topic for a particular consumer of some consumer group.

select consumer_offset__read, consumer_offset__commited from consumer_offset_table where consumer-grp-id="x" and partitionid="y"

但是很明显,这种检索是不可能的.n.Kafka Topics.那么主题检索机制如何工作?有人可以详细说明吗?

But clearly this kind of retrieval is not possible o.n Kafka Topics. So how does the retrieval mechanism from topic work? Could someone elaborate?


(Data from Kafka partitions is read in FIFO, and if Kafka consumer model is followed to retrieve a particular offset, a lot of additional data has to be processed and it's going to be slow. So am wondering if it's done in some other way...)



Some description I could find on web regarding the same when I stumbled upon this for my day job is as follows:


In Kafka releases through, consumers commit their offsets to ZooKeeper. ZooKeeper does not scale extremely well (especially for writes) when there are a large number of offsets (i.e., consumer-count * partition-count). Fortunately, Kafka now provides an ideal mechanism for storing consumer offsets. Consumers can commit their offsets in Kafka by writing them to a durable (replicated) and highly available topic. Consumers can fetch offsets by reading from this topic (although we provide an in-memory offsets cache for faster access). i.e., offset commits are regular producer requests (which are inexpensive) and offset fetches are fast memory look ups.


The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample code that shows how to use the new Kafka-based offset storage mechanism.

try {
        BlockingChannel channel = new BlockingChannel("localhost", 9092,
                5000 /* read timeout in millis */);
        final String MY_GROUP = "demoGroup";
        final String MY_CLIENTID = "demoClientId";
        int correlationId = 0;
        final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
        final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
        channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
        ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());

        if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
            Broker offsetManager = metadataResponse.coordinator();
            // if the coordinator is different, from the above channel's host then reconnect
            channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                                          5000 /* read timeout in millis */);
        } else {
            // retry (after backoff)
    catch (IOException e) {
        // retry the query (after backoff)


07-28 02:57