1.项目结构

activemq 生产消费模式,订阅发布模式不同类型数据传输-LMLPHP

2. activemq-pom pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>activemq</groupId>
<artifactId>activemq-pom</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>activemq-producer</module>
<module>activemq-consumer</module>
<module>activemq-common</module>
</modules> <properties>
<springframework>5.0.4.RELEASE</springframework>
<slf4j.version>1.7.7</slf4j.version>
<logback.version>1.1.2</logback.version>
<fastjson.version>1.2.44</fastjson.version>
</properties> </project>

3.activemq-common pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>activemq</groupId>
<artifactId>activemq-pom</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>activemq-common</groupId>
<artifactId>activemq-common</artifactId> <dependencies>
<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.12.1</version>
</dependency> <!-- logback -->
<dependency>
<!--主要介绍的是这个jar包,这个包是负责logback随着项目启动的jar包-->
<groupId>org.logback-extensions</groupId>
<artifactId>logback-ext-spring</artifactId>
<version>0.1.4</version>
</dependency>
<!-- slf4j start -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<!-- slf4j end --> <!-- 反射工具 -->
<dependency>
<groupId>org.db4j</groupId>
<artifactId>reflectasm</artifactId>
<version>1.11.4-2</version>
</dependency> <dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency> <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency> </dependencies> </project>

4.activemq-producer pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>activemq</groupId>
<artifactId>activemq-pom</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>activemq-producer</groupId>
<artifactId>activemq-producer</artifactId>
<packaging>war</packaging> <dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${springframework}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${springframework}</version>
</dependency>
<!-- 导入java ee jar 包 -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<!-- xbean 如<amq:connectionFactory /> -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency> <dependency>
<groupId>activemq-common</groupId>
<artifactId>activemq-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

5.activemq-consumer pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>activemq</groupId>
<artifactId>activemq-pom</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>activemq-consumer</groupId>
<artifactId>activemq-consumer</artifactId>
<packaging>war</packaging> <dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${springframework}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${springframework}</version>
</dependency>
<!-- 导入java ee jar 包 -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<!-- xbean 如<amq:connectionFactory /> -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency> <dependency>
<groupId>activemq-common</groupId>
<artifactId>activemq-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies> </project>

6.activemq-producer activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd"
> <!-- 配置JMS连接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="10" />
<!-- 接收者ID -->
<property name="clientId" value="client_3" />
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- MQ地址 -->
<property name="brokerURL" value="tcp://192.168.64.128:61616" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
</bean>
</property>
</bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false" />
</bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="true" />
<!--转换器,我们自己可以继承重写这个类的方法 ,这里使用spring提供的默认方法 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean> </beans>

7.消息发送工具类 ProducerTest.java

package com.bonade.activemq.test;

import java.util.Date;
import java.util.HashMap;
import java.util.Map; import javax.jms.JMSException; import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.bonade.activemq.util.ProducerUtil; @RunWith(SpringJUnit4ClassRunner.class) //使用junit4进行测试
@ContextConfiguration(locations={"classpath:spring.xml"}) //加载配置文件
public class ProducerTest { @Autowired
private ProducerUtil producerUtil; @Test
public void sendTextMsg() {
try {
producerUtil.sendTextMsg("bonade.q1", "it is my world");
} catch (JMSException e) { }
} @Test
public void sendMapMsg() {
Map<String,String> param = new HashMap<>();
param.put("name", "张三");
param.put("sex", "男");
param.put("age", "23"); producerUtil.sendMapMsg("bonade.q", param);
} @Test
public void sendObjectMsg() {
Map<String,String> param = new HashMap<>();
param.put("name", "张三");
param.put("sex", "男");
param.put("age", "23"); BaseDTO dto = new BaseDTO();
dto.setBizno("CS201805110001");
dto.setDate(new Date());
producerUtil.sendObjectMsg("bonade.q", dto);
} @Test
public void sendTopicTextMsg() {
try {
producerUtil.sendTopicTextMsg("topic.q.2", "it is my world");
} catch (JMSException e) { }
} @Test
public void sendTopicMapMsg() {
Map<String,String> param = new HashMap<>();
param.put("name", "张三");
param.put("sex", "男");
param.put("age", "23"); producerUtil.sendTopicMapMsg("topic.q.2", param);
} @Test
public void sendTopicObjectMsg() {
Map<String,String> param = new HashMap<>();
param.put("name", "张三");
param.put("sex", "男");
param.put("age", "23"); BaseDTO dto = new BaseDTO();
dto.setBizno("CS201805110001");
dto.setDate(new Date());
producerUtil.sendTopicObjectMsg("topic.q.2", dto);
} }

8.activemq-consumer activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd"
> <!-- 配置JMS连接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="10" />
<!-- 接收者ID -->
<property name="clientId" value="client_2" />
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- MQ地址 -->
<property name="brokerURL" value="tcp://192.168.64.128:61616" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
</bean>
</property>
</bean> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="bonade.q1" />
</bean> <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="queueMessageListener" />
<property name="receiveTimeout" value="10000"/>
</bean> <bean id="queueMessageListener" class="com.bonade.activemq.QueueMessageListener" /> <!-- 发送消息的目的地(一个主题) -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息主题的名字 -->
<constructor-arg index="0" value="topic.q.2" />
</bean> <!-- 消息监听容器 -->
<bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<!-- 发布订阅模式 -->
<property name="pubSubDomain" value="true"/>
<!-- 消息持久化 -->
<property name="subscriptionDurable" value="true"/>
<property name="receiveTimeout" value="10000"/>
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="queueMessageListener" />
</bean> </beans>

9.消费监听类 QueueMessageListener.java

package com.bonade.activemq;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; public class QueueMessageListener implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(QueueMessageListener.class); public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage text = (TextMessage) message;
LOGGER.debug("收到文本消息:{}",text);
}
if(message instanceof MapMessage){
MapMessage map = (MapMessage) message;
LOGGER.debug("收到Map消息:{}",map); }
if(message instanceof ObjectMessage){
ObjectMessage objMsg = (ObjectMessage) message;
LOGGER.debug("收到Object消息:{}",JSON.toJSONString(objMsg)); }
} }
05-27 10:35