在这篇文章中,Garry Russell解释了如何以编程方式创建多个KafkaListeners以侦听多个主题。[此设置实际上对我来说是成功的]
Kafka Spring: How to create Listeners dynamically or in a loop?
现在,我也想对JMSListeners进行类似的设置-在其中可以使用一个类,其中包含一个@JMSListener,并且可以以编程方式创建该JMSListener的多个实例,每个实例都注入了自己的queueName。
我发现了这篇文章
Spring JMS start listening to jms queues on request
在这篇文章的结尾,加里发表了类似的评论,
如果您希望动态创建大量容器,则只需以编程方式创建容器,请调用afterPropertiesSet(),然后调用start()
我使用了上面第一篇文章(与KafkaListeners相关)中所做的设置,我的多个JMS侦听器实例正在启动,但没有使用任何消息。
基本上我不知道该在哪里做
然后以编程方式创建容器,调用afterPropertiesSet(),然后调用start()
我对这个词感到困惑-容器,我知道有JMSListener并且有
JmsListenerContainerFactory,在此上下文中什么是容器-我猜是JMSListener?
我已经确认队列中有消息。另外,当我不以编程方式创建侦听器,而只在其中包含一个带有硬编码队列的侦听器时,它会很好地消耗消息。
基本上,当我以编程方式创建多个JMS侦听器时,没有一个侦听器正在使用消息
@SpringBootApplication
@EnableJms
public class MqProdConsumerApplication {
private static Logger logger = LogManager.getLogger(MqProdConsumerApplication.class.getName());
private static Consumers consumersStatic;
@Autowired
Consumers consumers;
@PostConstruct
public void init() {
consumersStatic = this.consumers;
}
@Bean
public Gson gson() {
return new Gson();
}
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(MqProdConsumerApplication.class, args);
List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
Assert.notEmpty(queueInformationList, "queueInformationList cannot be empty");
logger.debug("queueInformationList ************" + queueInformationList.toString());
for (QueueInformation queueInformation : queueInformationList) {
AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
child.setParent(context);
child.register(MQConfig.class);
Properties props = new Properties();
props.setProperty("mqQueueName", queueInformation.getMqQueueName());
//
PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
child.getEnvironment().getPropertySources().addLast(pps);
child.refresh();
}
}
}
这是具有listenerContainerFactory的MQConfig
@Configuration
public class MQConfig {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${ibm.mq.user}")
private String mqUserName;
@Bean
public MQListener listener() {
return new MQListener();
}
@PostConstruct
public void afterConstruct() {
logger.debug("************* initialized MQ Config successfully for user =" + mqUserName);
}
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// Put the MQ username in the PCF environment.
// Otherwise, the connection is identified by PCF's default user, "VCAP"
System.setProperty("user.name", mqUserName);
return factory;
}
}
然后是具有实际@JMSListener的MQListener
public class MQListener {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${mqQueueName}")
private String mqQueueName;
@PostConstruct
public void afteConstruct() {
logger.debug("************* initialized MQ Listener successfully, will read from =" + mqQueueName);
}
@JmsListener(destination = "${mqQueueName}", containerFactory = "myFactory")
public void receiveMessage(String receivedMessage) throws JAXBException, ExecutionException, InterruptedException {
logger.debug("***********************************************receivedMessage:" + receivedMessage);
}
}
这是我的application.yml
ibm.mq.queueManager: ABCTOD01
ibm.mq.channel: QMD00.SERVER
ibm.mq.connName: mqdv1.devfg.ABC.com
ibm.mq.user: pmd0app1
ibm.mq.password:
consumers:
queueInformationList:
-
mqQueueName: QMD00.D.SRF.PERSON.LITE.PHONE.LOAD
-
mqQueueName: QMD00.D.SRF.PERSON.PHONE.LOAD
最佳答案
好的,我发现了另一则帖子,其中加里回答了我要找的东西
Adding Dynamic Number of Listeners(Spring JMS)
本质上,这是我的工作解决方案。
很棒@GaryRussell-我现在是粉丝:)
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
int i = 0;
for (QueueInformation queueInformation :
queueInformationList) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint-" + i++);
endpoint.setDestination(queueInformation.getMqQueueName());
endpoint.setMessageListener(message -> {
logger.debug("***********************************************receivedMessage:" + message);
});
registrar.registerEndpoint(endpoint);
logger.debug("registered the endpoint for queue" + queueInformation.getMqQueueName());
}
}