本文介绍了Kafka Spring:如何动态或循环创建侦听器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 4 个 ConsumerFactory 侦听器,它们正在阅读 4 个不同的主题,如下所示:

I have 4 ConsumerFactory listeners that are reading from 4 different topics like this:

@KafkaListener(
      id = "test1",
      topicPattern  = "test.topic1",
      groupId = "pp-test1")
  public void listenTopic1(ConsumerRecord<String, String> record) {
    System.out.println("Topic is: " + record.topic());   
  }

但我们将有 50 个主题,我想设置至少 25 个听众以获得更好的表现.如何动态执行此操作而不是手动编写 25 个侦听器?

But we'll have 50 topics and I want to set up atleast 25 listeners for betetr performance. How can I do this dynamically instead of manually writing 25 listeners?

推荐答案

您不能以编程方式创建 @KafkaListeners,只能创建离散侦听器容器(带有自定义侦听器).

You cannot create @KafkaListeners programmatically, only discrete listener containers (with a custom listener).

您可以通过以编程方式为每个侦听器创建子应用程序上下文来实现.

You can do it by programmatically creating a child application context for each listener.

编辑

@SpringBootApplication
public class So53715268Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(So53715268Application.class, args);
        for (int i = 0; i < 2; i++) {
            AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
            child.setParent(context);
            child.register(ChildConfig.class);
            Properties props = new Properties();
            props.setProperty("group", "group." + i);
            props.setProperty("topic", "topic" + i);
            PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
            child.getEnvironment().getPropertySources().addLast(pps);
            child.refresh();
        }
    }

}

@Configuration
@EnableKafka
public class ChildConfig {

    @Bean
    public Listener listener() {
        return new Listener();
    }

}

public class Listener {

    @KafkaListener(id = "${group}", topics = "${topic}")
    public void listen(String in) {
        System.out.println(in);
    }

}

: partitions assigned: [topic0-0]
: partitions assigned: [topic1-0]

请注意,如果您使用的是 Spring Boot,则子配置类和侦听器必须与主应用程序位于不同的包中(也不是子包).

Note that, if you are using Spring Boot, the child config class and listener must be in a different package to the main app (and not a sub-package either).

这篇关于Kafka Spring:如何动态或循环创建侦听器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-24 21:06