本文介绍了当并发设置为1以上时,如何暂停特定的Kafka消费者线程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是Spring-kafka 2.2.8,并将并发设置为2,如下所示,并尝试了解如何在满足特定条件时暂停使用者线程/实例。

@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=2)
    public void listen(String in) {
        System.out.println(in);
    }

现在,我有两个问题。

  1. 我的使用者是否会跨越两个不同的投票线程来轮询记录?

  2. 如果我如上所示设置消费者的id。如何暂停特定的使用者线程(并发设置为1以上)。

请提出建议。

推荐答案

使用KafkaListenerEndpointRegistry.getListenerContainer(id)方法获取对容器的引用。

将其强制转换为ConcurrentMessageListenerContainer并调用getContainers()以获取子KafkaMessageListenerContainer的列表;然后您可以单独暂停/恢复它们。

您可以使用getAssignedPartitions()确定每个主题/分区的名称。

这篇关于当并发设置为1以上时,如何暂停特定的Kafka消费者线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-03 10:55