本文介绍了当并发设置为1以上时,如何暂停特定的Kafka消费者线程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我使用的是Spring-kafka 2.2.8,并将并发设置为2,如下所示,并尝试了解如何在满足特定条件时暂停使用者线程/实例。
@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=2)
public void listen(String in) {
System.out.println(in);
}
现在,我有两个问题。
我的使用者是否会跨越两个不同的投票线程来轮询记录?
如果我如上所示设置消费者的id。如何暂停特定的使用者线程(并发设置为1以上)。
请提出建议。
推荐答案
使用KafkaListenerEndpointRegistry.getListenerContainer(id)
方法获取对容器的引用。
将其强制转换为ConcurrentMessageListenerContainer
并调用getContainers()
以获取子KafkaMessageListenerContainer
的列表;然后您可以单独暂停/恢复它们。
您可以使用getAssignedPartitions()
确定每个主题/分区的名称。
这篇关于当并发设置为1以上时,如何暂停特定的Kafka消费者线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!