创建MyProducer类
package cn.th.mq.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyProducer {
// 定义链接工厂
private ConnectionFactory connectionFactory = null;
// 定义链接
private Connection connection = null;
// 定义会话
private Session session = null;
// 定义目的地
private Destination destination = null;
// 定义消息生成者
private MessageProducer producer = null;
// 定义消息
private Message message = null;
public void sendToMq() {
try {
//1.获得连接,要开放61616端口
/*
* 创建链接工厂
* ActiveMQConnectionFactory - 由ActiveMQ实现的ConnectionFactory接口实现类.
* 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
* userName - 访问ActiveMQ服务的用户名, 用户名可以通过jetty-realm.properties配置文件配置.
* password - 访问ActiveMQ服务的密码, 密码可以通过jetty-realm.properties配置文件配置.
* brokerURL - 访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号
* 此链接基于TCP/IP协议.
*/
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.189.101:61616");
connection = connectionFactory.createConnection();
connection.start();
//2.构建操作对象
/*
* 创建会话对象
* 方法 - connection.createSession(boolean transacted, int acknowledgeMode);
* transacted - 是否使用事务, 可选值为true|false
* true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, 建议传递的acknowledgeMode参数值为
* Session.SESSION_TRANSACTED
* false - 不使用事务, 设置此变量值, 则acknowledgeMode参数必须设置.
* acknowledgeMode - 消息确认机制, 可选值为:
* Session.AUTO_ACKNOWLEDGE - 自动确认消息机制
* Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制
* Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//3.构建消息的目的地
destination = session.createQueue("test-mq");
//4.构建消息的发送者
producer = session.createProducer(destination);
//5.构建消息
message = session.createTextMessage("hello activemq");
//6.发送
producer.send(message);
//7.关闭
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
创建测试类ActiveMQText,用于将消息发送到ActiveMQ中
package cn.th.activemqText;
import static org.junit.Assert.*;
import org.junit.Test;
import cn.th.mq.producer.MyProducer;
public class ActiveMQText {
@Test
public void testName() throws Exception {
MyProducer myProducer = new MyProducer();
myProducer.sendToMq();
}
}
创建MyConsumer类,用于接收数据
package cn.th.mq.Consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyConsumer {
// 定义链接工厂
private ConnectionFactory connectionFactory = null;
// 定义链接
private Connection connection = null;
// 定义会话
private Session session = null;
// 定义目的地
private Destination destination = null;
// 定义消息消费者
private MessageConsumer consumer = null;
// 定义消息
private Message message = null;
public void recieveFromMQ() {
try {
//1.获得连接
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.189.101:61616");
connection = connectionFactory.createConnection();
connection.start();
//2.获得操作对象
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//3.获得目的地
destination = session.createQueue("test-mq");
//4.获得消费者
consumer = session.createConsumer(destination);
//5.获得消息
message = consumer.receive();
TextMessage textMessage = (TextMessage) message;
System.out.println("接收到的消息"+textMessage);
//6.关闭
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
创建接受测试类
package cn.th.activemqText;
import org.junit.Test;
import cn.th.mq.Consumer.MyConsumer;
public class ConsumerText {
@Test
public void testName() throws Exception {
MyConsumer myConsumer =new MyConsumer();
myConsumer.recieveFromMQ();
}
}