首先xxl-mq是大神xuxueli开发的一个消息中间件框架:
与springboot整合过程:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-mq-samples</artifactId>
<version>1.3.-SNAPSHOT</version>
</parent>
<modelVersion>4.0.</modelVersion>
<artifactId>xxl-mq-samples-springboot</artifactId>
<packaging>jar</packaging> <dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement> <dependencies> <!-- starter-web:spring-webmvc + autoconfigure + logback + yaml + tomcat -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- starter-test:junit + spring-test + mockito -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency> <!-- freemarker-starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency> <!-- xxl-mq-client -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-mq-client</artifactId>
<version>${parent.version}</version>
</dependency> </dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build> </project>
2 propeties
### web
server.port=
server.context-path=/ ### resources
spring.mvc.static-path-pattern=/static/**
spring.resources.static-locations=classpath:/static/ ### freemarker
spring.freemarker.templateLoaderPath=classpath:/templates/
spring.freemarker.suffix=.ftl
spring.freemarker.charset=UTF-8
spring.freemarker.request-context-attribute=request
spring.freemarker.settings.number_format=0.########## # xxl-mq, admin conf 这个配置时admin部署的位置
xxl.mq.admin.address=http://localhost:8080/xxl-mq-admin
### xxl-mq, access token
xxl.mq.accessToken=
index。html
<script src="${request.contextPath}/static/jquery/jquery.min.js"></script>
<body> <input type="button" class="send" _type="" value="并行消费" />
<br><br> <input type="button" class="send" _type="" value="串行消费" />
<br><br> <input type="button" class="send" _type="" value="广播消息" />
<br><br> <input type="button" class="send" _type="" value="延时消息:5分钟后执行" />
<br><br> <input type="button" class="send" _type="" value="性能测试:批量发送10000条消息" /> <hr>
<div id="console"></div> <script>
$(function(){
$(".send").click(function () {
var _type = $(this).attr("_type");
$.post( '${request.contextPath}/produce', {'type':_type}, function(data,status){
var temp = "<br>" + new Date().format("yyyy-MM-dd HH:mm:ss") + ": ";
temp += ("SUCCESS" == data)?('成功发送一条消息!'):data;
$("#console").prepend(temp);
});
});
}); // Format
Date.prototype.format = function(fmt) {
var o = {
"M+" : this.getMonth()+, //月份
"d+" : this.getDate(), //日
"h+" : this.getHours(), //小时
"m+" : this.getMinutes(), //分
"s+" : this.getSeconds(), //秒
"q+" : Math.floor((this.getMonth()+)/), //季度
"S" : this.getMilliseconds() //毫秒
};
if(/(y+)/.test(fmt))
fmt=fmt.replace(RegExp.$, (this.getFullYear()+"").substr( - RegExp.$.length));
for(var k in o)
if(new RegExp("("+ k +")").test(fmt))
fmt = fmt.replace(RegExp.$, (RegExp.$.length==) ? (o[k]) : ((""+ o[k]).substr((""+ o[k]).length)));
return fmt;
} </script> </body>
需要在juery下面引入:
日志管理配置:
logback。xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="1 seconds"> <contextName>logback</contextName>
<property name="log.path" value="/data/applogs/xxl-mq/xxl-mq-samples-springboot.log"/> <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{} - %msg%n</pattern>
</encoder>
</appender> <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%date %level [%thread] %logger{} [%file : %line] %msg%n
</pattern>
</encoder>
</appender> <root level="info">
<appender-ref ref="console"/>
<appender-ref ref="file"/>
</root> </configuration>
配置XxlMqConf.java:
package com.xxl.mq.sample.springboot.conf; import com.xxl.mq.client.factory.impl.XxlMqSpringClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component; @Component
public class XxlMqConf { // ---------------------- param ---------------------- @Value("${xxl.mq.admin.address}")
private String adminAddress;
@Value("${xxl.mq.accessToken}")
private String accessToken; @Bean
public XxlMqSpringClientFactory getXxlMqConsumer(){ XxlMqSpringClientFactory xxlMqSpringClientFactory = new XxlMqSpringClientFactory();
xxlMqSpringClientFactory.setAdminAddress(adminAddress);
xxlMqSpringClientFactory.setAccessToken(accessToken); return xxlMqSpringClientFactory;
} }
controller 根据传入的参数进行设置:
import com.xxl.mq.client.message.XxlMqMessage;
import com.xxl.mq.client.producer.XxlMqProducer;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody; import java.util.Calendar;
import java.util.Date; /**
* index controller
* @author xuxueli 2015-12-19 16:13:16
*/
@Controller
public class IndexController { @RequestMapping("/")
public String index(){
return "index";
} @RequestMapping("/produce")
@ResponseBody
public String produce(int type){ String topic = "topic_1";
String data = "时间戳:" + System.currentTimeMillis(); if (type == ) { /**
* 并行消费
*/
XxlMqProducer.produce(new XxlMqMessage(topic, data)); } else if (type == ) { /**
* 串行消费
*/
XxlMqProducer.produce(new XxlMqMessage(topic, data, 1L)); } else if (type == ) { /**
* 广播消费
*/
XxlMqProducer.broadcast(new XxlMqMessage(topic, data)); } else if (type == ) { Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, );
Date effectTime = calendar.getTime(); /**
* 延时消息
*/
XxlMqProducer.produce(new XxlMqMessage(topic, data, effectTime)); } else if (type == ) { int msgNum = ;
long start = System.currentTimeMillis();
for (int i = ; i < msgNum; i++) {
XxlMqProducer.produce(new XxlMqMessage("topic_1", "No:"+i));
}
long end = System.currentTimeMillis();
return "Cost = " + (end-start); } else {
return "Type Error.";
} return "SUCCESS";
} @ExceptionHandler({Exception.class})
public String exception(Exception e) {
e.printStackTrace();
return e.getMessage();
} }
对应的消费者:
import com.xxl.mq.client.consumer.IMqConsumer;
import com.xxl.mq.client.consumer.MqResult;
import com.xxl.mq.client.consumer.annotation.MqConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; /**
* Created by xuxueli on 16/8/28.
*/
@MqConsumer(topic = "topic_2")
@Service
public class Demo2MqComsumer implements IMqConsumer {
private Logger logger = LoggerFactory.getLogger(Demo2MqComsumer.class); @Override
public MqResult consume(String data) throws Exception {
logger.info("[Demo2MqComsumer] 消费一条消息:{}", data);
return MqResult.SUCCESS;
} }
import com.xxl.mq.client.consumer.IMqConsumer;
import com.xxl.mq.client.consumer.MqResult;
import com.xxl.mq.client.consumer.annotation.MqConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; /**
* Created by xuxueli on 16/8/28.
*/
@MqConsumer(topic = "topic_1")
@Service
public class DemoAMqComsumer implements IMqConsumer {
private Logger logger = LoggerFactory.getLogger(DemoAMqComsumer.class); @Override
public MqResult consume(String data) throws Exception {
logger.info("[DemoAMqComsumer] 消费一条消息:{}", data);
return MqResult.SUCCESS;
} }
import com.xxl.mq.client.consumer.IMqConsumer;
import com.xxl.mq.client.consumer.MqResult;
import com.xxl.mq.client.consumer.annotation.MqConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; /**
* Created by xuxueli on 16/8/28.
*/
@MqConsumer(topic = "topic_1")
@Service
public class DemoBMqComsumer implements IMqConsumer {
private Logger logger = LoggerFactory.getLogger(DemoBMqComsumer.class); @Override
public MqResult consume(String data) throws Exception {
logger.info("[DemoBMqComsumer] 消费一条消息:{}", data);
return MqResult.SUCCESS;
} }
import com.xxl.mq.client.consumer.IMqConsumer;
import com.xxl.mq.client.consumer.MqResult;
import com.xxl.mq.client.consumer.annotation.MqConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; /**
* Created by xuxueli on 16/8/28.
*/
@MqConsumer(topic = "topic_1", group = MqConsumer.EMPTY_GROUP)
@Service
public class DemoCMqComsumer implements IMqConsumer {
private Logger logger = LoggerFactory.getLogger(DemoCMqComsumer.class); @Override
public MqResult consume(String data) throws Exception {
logger.info("[DemoCMqComsumer] 消费一条消息:{}", data);
return MqResult.SUCCESS;
} }