我正在通过Spring Boot使用apache activemq,我想迁移到apache artemis来提高集群和节点的使用率。

目前,我主要使用VirtualTopics的概念以及诸如

@JMSListener(destination = "Consumer.A.VirtualTopic.simple")
public void receiveMessage() {
    ...
}


...

public void send(JMSTemplate template) {
    template.convertAndSend("VirtualTopic.simple", "Hello world!");
}


我读过,阿尔art弥斯将其地址模型更改为地址,队列和路由类型,而不是像activemq那样的队列,主题和虚拟主题。
我读了很多书,但我认为我不正确,我现在该如何迁移。我以与上面相同的方式进行了尝试,因此我从Maven导入了Artemis JMSClient并希望像以前一样使用它,但是使用FQQN(完全合格的队列名称)或VirtualTopic-Wildcard可以在某些来源上阅读。但是以某种方式它不能正常工作。

我的问题是:
-如何迁移VirtualTopics?我使用FQQN和那些VirtualTopics-Wildcards正确吗?
-如何为上述代码示例指定路由类型的任播和多播? (在在线示例中,地址和队列在服务器broker.xml中进行了硬编码,但是我想在应用程序运行时创建它。)
-如何将其与openwire协议一起使用,应用程序如何知道其用途?它仅取决于我使用的artemis端口吗?那么61616适用于openwire?

任何人都可以帮助澄清我的想法吗?

更新:

一些进一步的问题。

1)我总是读类似“默认的5.x使用者”之类的东西。难道那会和阿耳emi弥斯混在一起吗?就像您保留所有这些命名约定,只是将VirtualTopic名称中的地址添加到FQQN中一样,然后仅将依赖关系更改为artemis吗?

2)我已经用"import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;""import org.apache.activemq.ActiveMQConnectionFactory;"尝试了“ virtualTopicConsumerWildcards”,但仅在第二种情况下有所不同。

3)我也尝试仅在接受器中使用OpenWire作为协议,但是在这种情况下(以及"import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;"),在启动应用程序时出现以下错误:“ 2020-03-30 11:41:19,504 ERROR [org.apache.activemq.artemis.core.server] AMQ224096: Error setting up connection from /127.0.0.1:54201 to /127.0.0.1:61616; protocol CORE not found in map: [OPENWIRE]”。

4)我是否将multicast:://VirtualTopic.simple作为目的地名称放在template.convertAndSend(...)中?
我尝试将template.setPubSubDomain(true)用作多播路由类型,然后将其保留用于任意播,这是可行的。但这是个好方法吗?

5)您是否知道,我如何使用template.convertAndSend(...);“告诉”我的spring-boot-application以使用Openwire?

UPDATE2:
共享的持久订阅

@JmsListener(destination = "VirtualTopic.test", id = "c1", subscription = "Consumer.A.VirtualTopic.test", containerFactory = "queueConnectionFactory")
public void receive1(String m) {

}

@JmsListener(destination = "VirtualTopic.test", id = "c2", subscription = "Consumer.B.VirtualTopic.test", containerFactory = "queueConnectionFactory")
public void receive2(String m) {

}




@Bean
public DefaultJmsListenerContainerFactory queueConnectionFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setClientId("brokerClientId");
    factory.setSubscriptionDurable(true);
    factory.setSubscriptionShared(true);
    return factory;
}


错误:

2020-04-17 11:23:44.485  WARN 7900 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'VirtualTopic.test' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer;
2020-04-17 11:23:44.514 ERROR 7900 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'VirtualTopic.test' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Broker: d1 - Client: brokerClientId already connected from /127.0.0.1:59979


我在这里做错了什么?

最佳答案

虚拟主题的思想是,生产者以通常的JMS方式发送到主题,并且消费者可以从物理队列中消费逻辑主题订阅,从而允许许多消费者在许多计算机和线程上运行以平衡负载。

Artemis在内部使用每个主题订户模型的队列,并且可能使用其Fully Qualified Queue name(FQQN)直接寻址订阅队列。

例如,主题VirtualTopic.simple订阅A Consumer.A.VirtualTopic.simple的默认5.x消费者目的地将替换为由地址和队列VirtualTopic.simple::Consumer.A.VirtualTopic.simple组成的Artemis FQQN。

但是,Artemis支持virtual topic wildcard filter mechanism,它将自动将使用者目的地转换为相应的FQQN。要启用过滤器机制,配置字符串属性
可以使用virtualTopicConsumerWildcards。它有两个部分,每个部分用;分隔,即默认的5.x虚拟主题(消费者前缀为Consumer.*.)将需要virtualTopicConsumerWildcards过滤器为Consumer.*.>;2

默认情况下,Artemis配置为自动创建客户端请求的目的地。他们可以在连接到地址时指定特殊的前缀,以指示要使用的路由类型。可以通过将配置字符串属性anycastPrefixmulticastPrefix添加到接受器来启用它们,您可以在Using Prefixes to Determine Routing Type中找到更多详细信息。例如,添加到接受方anycastPrefix=anycast://;multicastPrefix=multicast://,如果客户端仅需要将消息发送到ANYCAST队列之一,则应使用目标anycast:://VirtualTopic.simple,如果客户端需要将消息发送到MULTICAST,则应使用目标。

Artemis acceptors支持对所有协议使用单个端口,它们将自动检测正在使用哪个协议(CORE,AMQP,STOMP或OPENWIRE),但是可以通过使用protocol参数来限制支持哪些协议。

以下接受器启用任意播前缀multicast:://VirtualTopic.simple,多播前缀anycast://和虚拟主题使用者通配符,从而在端点localhost:61616上禁用除OPENWIRE之外的所有协议。

<acceptor name="artemis">tcp://localhost:61616?anycastPrefix=anycast://;multicastPrefix=multicast://;virtualTopicConsumerWildcards=Consumer.*.%3E%3B2;protocols=OPENWIRE</acceptor>


更新:
以下示例应用程序使用OpenWire协议连接到具有先前接受器的Artemis实例。

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@SpringBootApplication
@EnableJms
public class Application {

   private final String BROKER_URL = "tcp://localhost:61616";
   private final String BROKER_USERNAME = "admin";
   private final String BROKER_PASSWORD = "admin";

   public static void main(String[] args) throws Exception {
      final ConfigurableApplicationContext context = SpringApplication.run(Application.class);
      System.out.println("********************* Sending message...");

      JmsTemplate jmsTemplate = context.getBean("jmsTemplate", JmsTemplate.class);
      JmsTemplate jmsTemplateAnycast = context.getBean("jmsTemplateAnycast", JmsTemplate.class);
      JmsTemplate jmsTemplateMulticast = context.getBean("jmsTemplateMulticast", JmsTemplate.class);

      jmsTemplateAnycast.convertAndSend("VirtualTopic.simple", "Hello world anycast!");
      jmsTemplate.convertAndSend("anycast://VirtualTopic.simple", "Hello world anycast using prefix!");
      jmsTemplateMulticast.convertAndSend("VirtualTopic.simple", "Hello world multicast!");
      jmsTemplate.convertAndSend("multicast://VirtualTopic.simple", "Hello world multicast using prefix!");

      System.out.print("Press any key to close the context");
      System.in.read();

      context.close();
   }

   @Bean
   public ActiveMQConnectionFactory connectionFactory(){
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
      connectionFactory.setBrokerURL(BROKER_URL);
      connectionFactory.setUserName(BROKER_USERNAME);
      connectionFactory.setPassword(BROKER_PASSWORD);
      return connectionFactory;
   }

   @Bean
   public JmsTemplate jmsTemplate(){
      JmsTemplate template = new JmsTemplate();
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public JmsTemplate jmsTemplateAnycast(){
      JmsTemplate template = new JmsTemplate();
      template.setPubSubDomain(false);
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public JmsTemplate jmsTemplateMulticast(){
      JmsTemplate template = new JmsTemplate();
      template.setPubSubDomain(true);
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
      DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory());
      factory.setConcurrency("1-1");
      return factory;
   }

   @JmsListener(destination = "Consumer.A.VirtualTopic.simple")
   public void receiveMessageFromA(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM A: " + message);
   }

   @JmsListener(destination = "Consumer.B.VirtualTopic.simple")
   public void receiveMessageFromB(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM B: " + message);
   }

   @JmsListener(destination = "VirtualTopic.simple")
   public void receiveMessageFromTopic(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM TOPIC: " + message);
   }
}

10-06 02:46