创建MyProducer类

package cn.th.mq.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MyProducer {
	// 定义链接工厂
	private ConnectionFactory connectionFactory = null;
	// 定义链接
	private Connection connection = null;
	// 定义会话
	private Session session = null;
	// 定义目的地
	private Destination destination = null;
	// 定义消息生成者
	private MessageProducer producer = null;
	// 定义消息
	private Message message = null;

	public void sendToMq() {

		try {
			//1.获得连接,要开放61616端口

			/*
			 * 创建链接工厂
			 * ActiveMQConnectionFactory - 由ActiveMQ实现的ConnectionFactory接口实现类.
			 * 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
			 *  userName - 访问ActiveMQ服务的用户名, 用户名可以通过jetty-realm.properties配置文件配置.
			 *  password - 访问ActiveMQ服务的密码, 密码可以通过jetty-realm.properties配置文件配置.
			 *  brokerURL - 访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号
			 *      此链接基于TCP/IP协议.
			 */
			connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.189.101:61616");
			connection = connectionFactory.createConnection();
			connection.start();
			//2.构建操作对象
			/*
			 * 创建会话对象
			 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode);
			 *  transacted - 是否使用事务, 可选值为true|false
			 *      true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, 建议传递的acknowledgeMode参数值为
			 *          Session.SESSION_TRANSACTED
			 *      false - 不使用事务, 设置此变量值, 则acknowledgeMode参数必须设置.
			 *  acknowledgeMode - 消息确认机制, 可选值为:
			 *      Session.AUTO_ACKNOWLEDGE - 自动确认消息机制
			 *      Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制
			 *      Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
			 */
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//3.构建消息的目的地
			destination = session.createQueue("test-mq");
			//4.构建消息的发送者
			producer = session.createProducer(destination);
			//5.构建消息
			message = session.createTextMessage("hello activemq");
			//6.发送
			producer.send(message);
			//7.关闭
			session.close();
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}



}

创建测试类ActiveMQText,用于将消息发送到ActiveMQ中

package cn.th.activemqText;

import static org.junit.Assert.*;

import org.junit.Test;

import cn.th.mq.producer.MyProducer;

public class ActiveMQText {


	@Test
	public void testName() throws Exception {
		MyProducer myProducer = new MyProducer();
		myProducer.sendToMq();
	}
}

JavaAPI-----ActiveMQ实例(二)-LMLPHP

创建MyConsumer类,用于接收数据

package cn.th.mq.Consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MyConsumer {

	// 定义链接工厂
	private ConnectionFactory connectionFactory = null;
	// 定义链接
	private Connection connection = null;
	// 定义会话
	private Session session = null;
	// 定义目的地
	private Destination destination = null;
	// 定义消息消费者
	private MessageConsumer consumer = null;
	// 定义消息
	private Message message = null;

	public void recieveFromMQ() {

		try {
			//1.获得连接
			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.189.101:61616");
			connection = connectionFactory.createConnection();
			connection.start();
			//2.获得操作对象
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//3.获得目的地
			destination = session.createQueue("test-mq");
			//4.获得消费者
			consumer = session.createConsumer(destination);
			//5.获得消息
			message = consumer.receive();
			TextMessage textMessage = (TextMessage) message;
			System.out.println("接收到的消息"+textMessage);
			//6.关闭
			session.close();
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}
}

创建接受测试类

package cn.th.activemqText;

import org.junit.Test;

import cn.th.mq.Consumer.MyConsumer;

public class ConsumerText {

	@Test
	public void testName() throws Exception {
		MyConsumer myConsumer =new MyConsumer();
		myConsumer.recieveFromMQ();
	}
}

JavaAPI-----ActiveMQ实例(二)-LMLPHP

JavaAPI-----ActiveMQ实例(二)-LMLPHP

07-30 10:30