我正在通过Spring Boot使用apache activemq,我想迁移到apache artemis来提高集群和节点的使用率。
目前,我主要使用VirtualTopics的概念以及诸如
@JMSListener(destination = "Consumer.A.VirtualTopic.simple")
public void receiveMessage() {
...
}
...
public void send(JMSTemplate template) {
template.convertAndSend("VirtualTopic.simple", "Hello world!");
}
我读过,阿尔art弥斯将其地址模型更改为地址,队列和路由类型,而不是像activemq那样的队列,主题和虚拟主题。
我读了很多书,但我认为我不正确,我现在该如何迁移。我以与上面相同的方式进行了尝试,因此我从Maven导入了Artemis JMSClient并希望像以前一样使用它,但是使用FQQN(完全合格的队列名称)或VirtualTopic-Wildcard可以在某些来源上阅读。但是以某种方式它不能正常工作。
我的问题是:
-如何迁移VirtualTopics?我使用FQQN和那些VirtualTopics-Wildcards正确吗?
-如何为上述代码示例指定路由类型的任播和多播? (在在线示例中,地址和队列在服务器broker.xml中进行了硬编码,但是我想在应用程序运行时创建它。)
-如何将其与openwire协议一起使用,应用程序如何知道其用途?它仅取决于我使用的artemis端口吗?那么61616适用于openwire?
任何人都可以帮助澄清我的想法吗?
更新:
一些进一步的问题。
1)我总是读类似“默认的5.x使用者”之类的东西。难道那会和阿耳emi弥斯混在一起吗?就像您保留所有这些命名约定,只是将VirtualTopic名称中的地址添加到FQQN中一样,然后仅将依赖关系更改为artemis吗?
2)我已经用
"import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;"
和"import org.apache.activemq.ActiveMQConnectionFactory;"
尝试了“ virtualTopicConsumerWildcards”,但仅在第二种情况下有所不同。3)我也尝试仅在接受器中使用OpenWire作为协议,但是在这种情况下(以及
"import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;"
),在启动应用程序时出现以下错误:“ 2020-03-30 11:41:19,504 ERROR [org.apache.activemq.artemis.core.server] AMQ224096: Error setting up connection from /127.0.0.1:54201 to /127.0.0.1:61616; protocol CORE not found in map: [OPENWIRE]
”。4)我是否将
multicast:://VirtualTopic.simple
作为目的地名称放在template.convertAndSend(...)
中?我尝试将
template.setPubSubDomain(true)
用作多播路由类型,然后将其保留用于任意播,这是可行的。但这是个好方法吗?5)您是否知道,我如何使用
template.convertAndSend(...);
“告诉”我的spring-boot-application以使用Openwire?UPDATE2:
共享的持久订阅
@JmsListener(destination = "VirtualTopic.test", id = "c1", subscription = "Consumer.A.VirtualTopic.test", containerFactory = "queueConnectionFactory")
public void receive1(String m) {
}
@JmsListener(destination = "VirtualTopic.test", id = "c2", subscription = "Consumer.B.VirtualTopic.test", containerFactory = "queueConnectionFactory")
public void receive2(String m) {
}
@Bean
public DefaultJmsListenerContainerFactory queueConnectionFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setClientId("brokerClientId");
factory.setSubscriptionDurable(true);
factory.setSubscriptionShared(true);
return factory;
}
错误:
2020-04-17 11:23:44.485 WARN 7900 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'VirtualTopic.test' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer;
2020-04-17 11:23:44.514 ERROR 7900 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'VirtualTopic.test' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Broker: d1 - Client: brokerClientId already connected from /127.0.0.1:59979
我在这里做错了什么?
最佳答案
虚拟主题的思想是,生产者以通常的JMS方式发送到主题,并且消费者可以从物理队列中消费逻辑主题订阅,从而允许许多消费者在许多计算机和线程上运行以平衡负载。
Artemis在内部使用每个主题订户模型的队列,并且可能使用其Fully Qualified Queue name(FQQN)直接寻址订阅队列。
例如,主题VirtualTopic.simple订阅A Consumer.A.VirtualTopic.simple
的默认5.x消费者目的地将替换为由地址和队列VirtualTopic.simple::Consumer.A.VirtualTopic.simple
组成的Artemis FQQN。
但是,Artemis支持virtual topic wildcard filter mechanism,它将自动将使用者目的地转换为相应的FQQN。要启用过滤器机制,配置字符串属性
可以使用virtualTopicConsumerWildcards
。它有两个部分,每个部分用;
分隔,即默认的5.x虚拟主题(消费者前缀为Consumer.*.
)将需要virtualTopicConsumerWildcards
过滤器为Consumer.*.>;2
。
默认情况下,Artemis配置为自动创建客户端请求的目的地。他们可以在连接到地址时指定特殊的前缀,以指示要使用的路由类型。可以通过将配置字符串属性anycastPrefix
和multicastPrefix
添加到接受器来启用它们,您可以在Using Prefixes to Determine Routing Type中找到更多详细信息。例如,添加到接受方anycastPrefix=anycast://;multicastPrefix=multicast://
,如果客户端仅需要将消息发送到ANYCAST队列之一,则应使用目标anycast:://VirtualTopic.simple
,如果客户端需要将消息发送到MULTICAST,则应使用目标。
Artemis acceptors支持对所有协议使用单个端口,它们将自动检测正在使用哪个协议(CORE,AMQP,STOMP或OPENWIRE),但是可以通过使用protocol参数来限制支持哪些协议。
以下接受器启用任意播前缀multicast:://VirtualTopic.simple
,多播前缀anycast://
和虚拟主题使用者通配符,从而在端点localhost:61616上禁用除OPENWIRE之外的所有协议。
<acceptor name="artemis">tcp://localhost:61616?anycastPrefix=anycast://;multicastPrefix=multicast://;virtualTopicConsumerWildcards=Consumer.*.%3E%3B2;protocols=OPENWIRE</acceptor>
更新:
以下示例应用程序使用OpenWire协议连接到具有先前接受器的Artemis实例。
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
@SpringBootApplication
@EnableJms
public class Application {
private final String BROKER_URL = "tcp://localhost:61616";
private final String BROKER_USERNAME = "admin";
private final String BROKER_PASSWORD = "admin";
public static void main(String[] args) throws Exception {
final ConfigurableApplicationContext context = SpringApplication.run(Application.class);
System.out.println("********************* Sending message...");
JmsTemplate jmsTemplate = context.getBean("jmsTemplate", JmsTemplate.class);
JmsTemplate jmsTemplateAnycast = context.getBean("jmsTemplateAnycast", JmsTemplate.class);
JmsTemplate jmsTemplateMulticast = context.getBean("jmsTemplateMulticast", JmsTemplate.class);
jmsTemplateAnycast.convertAndSend("VirtualTopic.simple", "Hello world anycast!");
jmsTemplate.convertAndSend("anycast://VirtualTopic.simple", "Hello world anycast using prefix!");
jmsTemplateMulticast.convertAndSend("VirtualTopic.simple", "Hello world multicast!");
jmsTemplate.convertAndSend("multicast://VirtualTopic.simple", "Hello world multicast using prefix!");
System.out.print("Press any key to close the context");
System.in.read();
context.close();
}
@Bean
public ActiveMQConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setUserName(BROKER_USERNAME);
connectionFactory.setPassword(BROKER_PASSWORD);
return connectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
return template;
}
@Bean
public JmsTemplate jmsTemplateAnycast(){
JmsTemplate template = new JmsTemplate();
template.setPubSubDomain(false);
template.setConnectionFactory(connectionFactory());
return template;
}
@Bean
public JmsTemplate jmsTemplateMulticast(){
JmsTemplate template = new JmsTemplate();
template.setPubSubDomain(true);
template.setConnectionFactory(connectionFactory());
return template;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
return factory;
}
@JmsListener(destination = "Consumer.A.VirtualTopic.simple")
public void receiveMessageFromA(String message) {
System.out.println("*********************** MESSAGE RECEIVED FROM A: " + message);
}
@JmsListener(destination = "Consumer.B.VirtualTopic.simple")
public void receiveMessageFromB(String message) {
System.out.println("*********************** MESSAGE RECEIVED FROM B: " + message);
}
@JmsListener(destination = "VirtualTopic.simple")
public void receiveMessageFromTopic(String message) {
System.out.println("*********************** MESSAGE RECEIVED FROM TOPIC: " + message);
}
}