1. Composite Destinations 组合目的地
组合队列Composite Destinations : 允许用一个虚拟的destination代表多个destinations,这样就可以通过composite destinations在一个操作中同时向多个queue/topic发送消息。
有两种实现方式:
第一种:在客户端编码实现
第二种:在activemq.xml配置文件中实现
- 第一种:在客户端编码实现
在composite destinations中,多个destination之间采用","分隔。如下:这里有2个destination "my-queue3"和"topic://topic-1",这个代表主题模式的topic-1
private static final String queueName = "my-queue3,topic://topic-1";
默认是queue模式。
package cn.qlq.activemq; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory; /**
* 生产消息
*/
public class MsgProducer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String queueName = "my-queue3,topic://topic-1";
private static Session session = null; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5.创建Destination(Queue继承Queue)
Queue destination = session.createQueue(queueName); TemporaryQueue temporaryQueue = session.createTemporaryQueue(); // 6.创建生产者producer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 5; i++) {
// 7.创建Message,有好多类型,这里用最简单的TextMessage
TextMessage tms = session.createTextMessage("textMessage:" + i); // 设置附加属性
tms.setStringProperty("str", "stringProperties" + i);
tms.setJMSPriority(6);
tms.setJMSReplyTo(temporaryQueue); // 8.生产者发送消息
producer.send(tms);
} // 9.提交事务
session.commit(); // 10.关闭connection
session.close();
connection.close();
} }
结果会创建5条 my-queue3 队列消息 与 5条 主题模式 topic-1 消息。
消费者正常消费即可,与队列模型的消息和主题模式的消息消费一样。
- 第二种:在activemq.xml配置文件中实现
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}"> <destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="comQueue">
<forwardTo>
<queue physicalName="queue88" />
<topic physicalName="topic88" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
。。。
程序中向组合队列发送消息:
package cn.qlq.activemq; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory; /**
* 生产消息
*/
public class MsgProducer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String queueName = "comQueue";
private static Session session = null; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5.创建Destination(Queue继承Queue)
Queue destination = session.createQueue(queueName); TemporaryQueue temporaryQueue = session.createTemporaryQueue(); // 6.创建生产者producer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 5; i++) {
// 7.创建Message,有好多类型,这里用最简单的TextMessage
TextMessage tms = session.createTextMessage("textMessage:" + i); // 设置附加属性
tms.setStringProperty("str", "stringProperties" + i);
tms.setJMSPriority(6);
tms.setJMSReplyTo(temporaryQueue); // 8.生产者发送消息
producer.send(tms);
} // 9.提交事务
session.commit(); // 10.关闭connection
session.close();
connection.close();
} }
结果:
queue88产生五条消息:
topic88生产五条消息:
2 .Configure Startup Destinations--启动创建队列和主题,只是没有消息
在启动ActiveMQ的时候如果需要创建Destination的话,可以在activemq.xml中配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}">
<destinations>
<queue physicalName="autoqueue" />
<topic physicalName="autotopic" />
</destinations>
...
3.Delete Inactive Destinations---删除没有消息的队列或主题
在ActiveMQ的queue在不使用之后,可以通过web控制台或者JMX方式来删除掉,当然,也可以通过配置,使得broker可以自动探测到无用的队列并删除掉,回收响应资源。
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="1000">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="30000" />
</policyEntries>
</policyMap>
</destinationPolicy>
。。。
说明:
schedulePeriodForDestinationPurge: 设置多长时间检查一次,这里是1秒。
inactiveTimoutBeforeGC: 设置当Destination为空后,多长时间被删除,这里是30秒。
gcInactiveDestinations:设置删除掉不活动的队列,默认为false
4.wildcars(通配符)
Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。
ActiveMQ支持以下三种wildcars:
. 用于作为路径上名字间的分隔符
* 用于匹配路径上的任何名字
> 用于递归地匹配任何以这个名字开始的destination
5. Destination 选项
这个是给消费者在JMS规范之外添加的功能特性,通过在队列名称后面使用类似url的语法添加多个选项。包括:
1 consumer.perfetchSize,消费者持有的未确认的最大消费数量
2 consumer.maximumPendingMessageLimit: 用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认为0
3 consumer.noLocal: 默认false
4 consumer.dispatchAsync: 是否异步分发,默认true
5 consumer.retroactive: 是否为回溯消费者,默认false
6 consumer.selector: JMS的selector,默认null
7 consumer.exclusive: 是否为独占消费者,默认false
8 consumer.priority:设置消费者的优先级,默认0
使用示例:
Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=
false&consumer.perfetchSize=10");
Consumer consumer = session.createConsumer(queue);
6. 虚拟destination用来创建逻辑destination,客户端可以通过它来生产和消费消息,它会把消息映射到物理destination.
ActiveMQ支持2种方式:
1:虚拟主题(Virtual Topics)
2:组合Destinations(Composite Destinations)
为什么使用虚拟主题?
ActiveMQ只有在持久订阅才是持久化的。持久订阅时,每一个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
第一:同一应用内消费者端护在均衡的问题。也就是说一个应用程序内的持久化消息,不能使用对个消费者共同承担消息处理能力。因为每个消费者都会获取所有消息。因为每一个消费者都会获取所有信息。
Queue到时可以解决这个问题,但broker端又不能将消息发送到多个应用端,所以纪要发布订阅,又要让消费者分组,这个功能JMS本身是没有的
第二:同一应用内消费者端failover问题,由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理,系统的健壮性不高。
如何使用虚拟topic?
第一:对于消息发布者来说,就是一个正常的topic,名称以VirtualTopic.开始,比如VirtualTopic.Orders,代码示例如下:
Topicdestination = session.createTopic("VirtualTopic.Orders");
第二:对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列名称,即可表明自己的身份即可实现消费端应用分组。
例如Consumer.A.VirtualTopic.Orders说明它是名称为A的消费端,同理Consumer.B VirtualTopic.Orders说明是一名称为B的消费端。可以在同一个应用中使用多个消费者消费这个队列
又因为不同应用使用的topic名称不一样,前缀不同,所以不同应用中都可以接受到全部消息。每一个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
代码示例:
Destination dest = session.createQueue("Consumer.A.VirtualTopic.Orders");
生产者代码:
package cn.qlq.activemq.topic; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class MsgProducer { private static final String url = "tcp://127.0.0.1:61616";
private static final String topicName = "VirtualTopic.Orders"; public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(topicName); MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 10; i++) { TextMessage tms = session.createTextMessage("textMessage:" + i); producer.send(tms); System.out.println("send:" + tms.getText());
}
connection.close();
} }
消费者代码:
package cn.qlq.activemq; import java.util.Enumeration; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消费消息
*
* @author QiaoLiQiang
* @time 2018年9月18日下午11:26:41
*/
public class MsgConsumer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String queueName = "Consumer.A.VirtualTopic.Orders"; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
Enumeration jmsxPropertyNames = connection.getMetaData().getJMSXPropertyNames();
while (jmsxPropertyNames.hasMoreElements()) {
String nextElement = (String) jmsxPropertyNames.nextElement();
System.out.println("JMSX name ===" + nextElement);
}
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
// 5.创建Destination(Queue继承Queue)
Queue destination = session.createQueue(queueName);
// 6.创建消费者consumer
MessageConsumer consumer = session.createConsumer(destination); int i = 0;
while (i < 5) {
TextMessage textMessage = (TextMessage) consumer.receive();
System.out.println("接收消息:" + textMessage.getText() + ";属性" + textMessage.getStringProperty("str"));
i++; if (i == 5) {// 确保消费完所有的消息再进行确认
textMessage.acknowledge();
}
} // 提交事务,进行确认收到消息
session.commit(); session.close();
connection.close();
}
}
其实把消费者队列化了。
修改虚拟主题的前缀:
默认前缀是VirtualTopic.>
自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.>
修改配置:
<broker xmlns="http://activemq.apache.org/schema/core">
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>