本文介绍了Spring Cloud Stream动态频道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spring Cloud Stream,并想以编程方式创建和绑定频道.我的用例是,在应用程序启动期间,我会收到要订阅的Kafka主题的动态列表.然后如何为每个主题创建频道?

I am using Spring Cloud Stream and want to programmatically create and bind channels. My use case is that during application startup I receive the dynamic list of Kafka topics to subscribe to. How can I then create a channel for each topic?

推荐答案

我最近遇到了类似的情况,下面是我动态创建SubscriberChannel的示例.

I ran into similar scenario recently and below is my sample of creating SubscriberChannels dynamically.

    ConsumerProperties consumerProperties = new ConsumerProperties();
    consumerProperties.setMaxAttempts(1);
    BindingProperties bindingProperties = new BindingProperties();
    bindingProperties.setConsumer(consumerProperties);
    bindingProperties.setDestination(retryTopic);
    bindingProperties.setGroup(consumerGroup);

    bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
    SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
    beanFactory.registerSingleton(consumerName, channel);
    channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
    bindingService.bindConsumer(channel, consumerName);
    channel.subscribe(consumerMessageHandler);

这篇关于Spring Cloud Stream动态频道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-12 23:46