我们有一个代码,获取有关kafka主题使用者的一些详细信息。下面的代码显示了如何获取分区和相应的偏移量。我们需要的缺失信息是使用者组中分区的客户机ID /使用者。我们是否有办法获取每个主题分区的使用者?
ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
ArrayList<OffsetAndMetadata> offsets = new ArrayList<OffsetAndMetadata>();
for (int i=0;i<consumer.partitionsFor(topic).size();i++)
{
TopicPartition partitiontemp = new TopicPartition(topic, i);
partitions.add(partitiontemp);
OffsetAndMetadata offsettemp = consumer.committed(partitiontemp);
offsets.add(offsettemp);
}
consumer.assign(partitions);
consumer.seekToEnd(partitions);
for (int i=0;i<consumer.partitionsFor(topic).size();i++)
{
try {
long cur_offset = offsets.get(partitions.get(i).partition()).offset();
long log_offset = consumer.position(partitions.get(partitions.get(i).partition()));
System.out.printf("Topic: %s partitionID: %d current offset: %d log offset: %d uncommitted: %d\n",
topic, partitions.get(i).partition(),cur_offset , log_offset , log_offset - cur_offset);
}catch (Exception ex){
System.out.printf("Topic: %s partitionID: %d current offset: - log offset: - uncommitted: -\n", topic, partitions.get(i).partition());
}
}
最佳答案
您可能是指使用者的group.id
属性,因为偏移量是按组而不是每个client.id
。本讨论stackoverflow.com/questions/55937806/apache-kafka-get-list-of-consumers-on-a-specific-topic/55938325
可以回答你的问题。