1.2 JMS应用程序接口
Connection:
Session:
MessageConsumer:
MessageProducer:
Message:
2.Hello World
2.0 基本配置
使用Maven,pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.</modelVersion> <groupId>org.ygy</groupId>
<artifactId>activemq</artifactId>
<version>0.0.-SNAPSHOT</version>
<packaging>jar</packaging> <name>activemq</name>
<url>http://maven.apache.org</url> <properties>
<project.build.sourceEncoding>UTF-</project.build.sourceEncoding>
</properties> <dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency> <!-- activemq,学习中 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.</version>
</dependency> <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.5.</version>
</dependency> <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.5.</version>
</dependency> </dependencies>
</project>
这里只要引入ActiveMQ的依赖就可以了。
2.1 P2P版的HelloWorld
生产者:HelloQueueProducer
package org.ygy.mq.lesson01; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 最简单的生产者
*
* @author yuguiyang
*
*/
public class HelloQueueProducer {
public static void main(String[] args) {
// 生产者的主要流程
Connection connection = null; try {
// 1.初始化connection工厂,使用默认的URL
//failover://tcp://localhost:61616
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 2.创建Connection
connection = connectionFactory.createConnection(); // 3.打开连接
connection.start(); // 4.创建Session,(是否支持事务)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建消息目标
Destination destination = session.createQueue("queue_lesson"); //6.创建生产者
MessageProducer producer = session.createProducer(destination); //7.配置消息是否持久化
/* DeliverMode有2种方式:
*
public interface DeliveryMode {
static final int NON_PERSISTENT = 1;//不持久化:服务器重启之后,消息销毁 static final int PERSISTENT = 2;//持久化:服务器重启之后,该消息仍存在
}
*/
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //8.初始化要发送的消息
TextMessage message = session.createTextMessage("Hello World ! from yuguiyang"); //9.发送消息
producer.send(message); } catch (JMSException e) {
e.printStackTrace();
} finally{
try {
//10.关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
消费者:HelloQueueConsumer
package org.ygy.mq.lesson01; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 简单的消费者
*
* @author yuguiyang
*
*/
public class HelloQueueConsumer implements MessageListener { @Override
public void onMessage(Message message) {
//如果消息是TextMessage
if (message instanceof TextMessage) {
//强制转换一下
TextMessage txtMsg = (TextMessage) message;
try {
//输出接收到的消息
System.out.println("HaHa: I'v got " + txtMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
} public void receive() {
// 消费者的主要流程
Connection connection = null; try {
// 1.初始化connection工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 2.创建Connection
connection = connectionFactory.createConnection(); // 3.打开连接
connection.start(); // 4.创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建消息目标
Destination destination = session.createQueue("queue_lesson"); //6.创建消费者
MessageConsumer consumer = session.createConsumer(destination); //7.配置监听
consumer.setMessageListener(this);
} catch (JMSException e) {
e.printStackTrace();
}
} public static void main(String[] args) {
new HelloQueueConsumer().receive();
} }
3.测试
代码写好了,我们测试一下
3.1 启动ActiveMQ服务器
上一篇博客中,说过,进入到bin目录下,双击 activemq.bat,启动
启动后,访问 http://localhost:8161/admin/
可能会让你输入用户名和密码 ,这里默认的用户名:admin;密码:admin
然后,我们单击那个 Queues菜单:
这里默认应该什么都没有,有的话,也没事
3.2运行程序
我们先运行生产者,运行完之后,刷新一下,上面的界面:
可以看到上面的记录
这里显示的是服务器上的队列,
在这里,我们单击Browse连接:
在这里,我们能看到当前队列中的消息
下面,我们点击Message ID 连接,进入到消息的详细界面:
这里,可以看到,消息的内容和消息头信息
好了,到这里,我们就可以运行消费者了,先回到最开始的界面:
运行消费者,之后,控制台输出:
我们接受到了消息。
刷新界面:
可以看到,这里的内容变了,
因为消息被我们消费了,所以被消费消息加1,而且,当前消费者还在运行,所以有一个消费者。