有时,我需要从具有相同主题的特定偏移量的特定分区读取记录。
我可以每次创建新的kafka用户。但是,我可以创建使用者池并以这种方式使用它:
List<KafkaConsumer> consumers = new ArrayList<>();
// acquire consumer
KafkaConsumer consumer = consumers.get(0);
TopicPartition topicPartition = new TopicPartition("my-topic", 42);
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, 13);
ConsumerRecords records = consumer.poll(0);
// process records
// .....
// release consumer
consumer.unsubscribe();
我应该建立消费者群吗?否则效果不佳,我应该为每次使用创建新的消费者。
最佳答案
您只需要一个消费者。只需取消订阅并将其重新分配给另一个TopicPartition
。
String topic = "my-topic";
int partition = 42;
int offset = 13;
boolean running = true;
while(running) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords records = consumer.poll(0);
// process records
// .....
// release consumer
consumer.unsubscribe();
// Change topic, partition, offset as needed
}
关于java - 使用Kafka客户池是否正确?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/54981105/