我们有一个代码,获取有关kafka主题使用者的一些详细信息。下面的代码显示了如何获取分区和相应的偏移量。我们需要的缺失信息是使用者组中分区的客户机ID /使用者。我们是否有办法获取每个主题分区的使用者?

    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
    ArrayList<OffsetAndMetadata> offsets = new ArrayList<OffsetAndMetadata>();

    for (int i=0;i<consumer.partitionsFor(topic).size();i++)
    {
        TopicPartition partitiontemp = new TopicPartition(topic, i);
        partitions.add(partitiontemp);
        OffsetAndMetadata offsettemp = consumer.committed(partitiontemp);
        offsets.add(offsettemp);
    }

    consumer.assign(partitions);
    consumer.seekToEnd(partitions);

    for (int i=0;i<consumer.partitionsFor(topic).size();i++)
    {
        try {

            long cur_offset = offsets.get(partitions.get(i).partition()).offset();
            long log_offset = consumer.position(partitions.get(partitions.get(i).partition()));

            System.out.printf("Topic: %s partitionID: %d current offset: %d log offset: %d uncommitted: %d\n",
                    topic, partitions.get(i).partition(),cur_offset , log_offset , log_offset - cur_offset);


        }catch (Exception ex){
            System.out.printf("Topic: %s partitionID: %d current offset: - log offset: - uncommitted: -\n", topic, partitions.get(i).partition());

        }
    }

最佳答案

您可能是指使用者的group.id属性,因为偏移量是按组而不是每个client.id。本讨论stackoverflow.com/questions/55937806/apache-kafka-get-list-of-consumers-on-a-specific-topic/55938325

可以回答你的问题。

09-06 02:36