PTP模式下,异步接收消息需要定义一个MessageListener来监听,当生产者有消息要发送时会主动通知Listener去处理该消息,会调用监听的onMessage方法去处理。
首先看生产者(和同步接收时没有任何区别):
package com.thunisoft.jms.mine; import java.util.HashMap; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; /**
* JMS生产者
*
* @author zhangxsh
*
*/
public class Producer { /**
* @param args
*/
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
// 通过连接工厂创建连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 通过连接打开一个会话
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 根据特定的队列名称创建一个目标地
destination = session.createQueue("TestQueue");
// 根据目标地创建一个生产者
producer = session.createProducer(destination);
// 不需要持久化的投递模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 1; i <= 10; i++) {
ObjectMessage message = session.createObjectMessage();
HashMap m = new HashMap();
m.put("key" + i, i);
message.setObject(m);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
下面是消费者异步接收代码:
package com.thunisoft.jms.mine; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumeListener { /**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
connection.start();
Destination destination = session.createQueue("TestQueue");
MessageConsumer consumer = session.createConsumer(destination); //定义一个MessageListener用于通知有消息到来
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// System.out.println("message");
if (message instanceof ObjectMessage)
try {
System.out.println("收到消息"
+ ((ObjectMessage) message).getObject());
// session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
});
// consumer.receive(); } }