工作中用到的RabbitMQ例子 , 但是最后没有用 , 用的CMQ , 顺便说下CMQ社区真的少 , 并且功能少 .
一、消息体
package com.bootdo.common.rabbitmq.batchsendsms; import com.alibaba.fastjson.JSON;
import com.bootdo.common.utils.UUIDGenerator; import java.io.Serializable;
import java.util.Date;
import java.util.Map; /**
* MQ消息体
* @author yuduojia
* @date 2018/8/1 10:42
*/
public class BacthSendSMSMessage implements Serializable{
private static final long serialVersionUID = 1L; private Integer intRun;//当前次数
private Integer total;//总次数
private String productCode; // 生产者代码
private String consumerCode; // 消费者代码
private String messageId; // 消息唯一标识
private Date created; // 消息发送时间
private Map<String, Object> bussinessBody; // 消息体,封装业务数据 private BacthSendSMSMessage() {
super();
} public BacthSendSMSMessage(Integer intRun, Integer total, String productCode, String consumerCode, Map<String, Object> bussinessBody) {
this.intRun = intRun;
this.total = total;
this.productCode = productCode;
this.consumerCode = consumerCode;
this.bussinessBody = bussinessBody;
} public static String productMQMessage(Integer intRun, Integer total, String productCode, String consumerCode, Map<String, Object> bussinessBody) {
BacthSendSMSMessage mqObj = new BacthSendSMSMessage(intRun, total, productCode, consumerCode, bussinessBody);
mqObj.setCreated(new Date());
mqObj.setMessageId(generatSeriaeNo());
return JSON.toJSONString(mqObj);
} //生成消息唯一标识
private static String generatSeriaeNo() {
return UUIDGenerator.generate();
} public String getProductCode() {
return productCode;
} public void setProductCode(String productCode) {
this.productCode = productCode;
} public String getConsumerCode() {
return consumerCode;
} public void setConsumerCode(String consumerCode) {
this.consumerCode = consumerCode;
} public String getMessageId() {
return messageId;
} public void setMessageId(String messageId) {
this.messageId = messageId;
} public Date getCreated() {
return created;
} public void setCreated(Date created) {
this.created = created;
} public Map<String, Object> getBussinessBody() {
return bussinessBody;
} public void setBussinessBody(Map<String, Object> bussinessBody) {
this.bussinessBody = bussinessBody;
} public static long getSerialVersionUID() {
return serialVersionUID;
} public Integer getIntRun() {
return intRun;
} public void setIntRun(Integer intRun) {
this.intRun = intRun;
} public Integer getTotal() {
return total;
} public void setTotal(Integer total) {
this.total = total;
} @Override
public String toString() {
return "BacthSendSMSMessage{" +
"intRun=" + intRun +
", total=" + total +
", productCode=" + productCode +
", consumerCode=" + consumerCode +
", messageId='" + messageId + '\'' +
", created=" + created +
", bussinessBody=" + bussinessBody +
'}';
}
}
二、发布者
package com.bootdo.common.rabbitmq.batchsendsms; import com.bootdo.server.vo.SendModel;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import java.util.List;
import java.util.Map;
import java.util.UUID; @Component
public class BatchSendSMSSender {
private static final Logger logger = LoggerFactory.getLogger(BatchSendSMSSender.class);
@Autowired
private RabbitTemplate rabbitTemplate; public void send(List<SendModel> list, String channel) {
int count = 0;
for (int i = 0; i < list.size(); i++) {
Map<String, Object> bussiness = Maps.newHashMap();
bussiness.put("productId", 15);
bussiness.put("companyId", 100); //B公司id
bussiness.put("isSmallPerson", 1); //1 or 0
bussiness.put("assignType", 1);
bussiness.put("bookNum", 1);
bussiness.put("bookAmount", 100);
bussiness.put("channel",channel);
bussiness.put("templeteCode",list.get(i).getTempleteCode());
bussiness.put("templeteParam",list.get(i).getTempleteParam());
bussiness.put("phone",list.get(i).getPhone()); String msgId = UUID.randomUUID().toString(); String messageBody = BacthSendSMSMessage.productMQMessage(i+1,list.size(),"pro","pro",bussiness);
Message message = MessageBuilder.withBody(messageBody.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setCorrelationIdString(msgId).build();
/*将 msgId和 CorrelationData绑定*/
CorrelationData correlationData = new CorrelationData(msgId); rabbitTemplate.convertAndSend("exchange", "ttd.trust.product", message);
System.out.println("ProductTopicSender : " + messageBody);
}
} }
三、订阅者(监听)
package com.bootdo.common.rabbitmq.batchsendsms; import com.alibaba.fastjson.JSON; import com.bootdo.common.config.ConfigConstants;
import com.bootdo.server.service.SMSRetryProxy;
import com.bootdo.server.service.SMSService;
import com.bootdo.server.service.SMSToSend;
import com.bootdo.sms.domain.ModelDO;
import com.bootdo.sms.domain.SmsSendLogDO;
import com.bootdo.sms.service.ModelService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component; import java.util.HashMap;
import java.util.List;
import java.util.Map; /**
* MQ消息监听
* @author yuduojia
* @date 2018/7/30 14:53
*/
@Component
@RabbitListener(queues = "ttd.trust.product")
public class BctchSendSMSReceiver {
private final Logger logger = LoggerFactory.getLogger(BctchSendSMSReceiver.class);
@Autowired
private ModelService modelService;
@Autowired
private ConnectionFactory connectionff; @Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionff);
container.setQueueNames("ttd.trust.product");
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception { try {
logger.info("消费端接收到消息:" + message.getMessageProperties() + ":" + new String(message.getBody()));
logger.info("topic:"+message.getMessageProperties().getReceivedRoutingKey());
byte[] body = message.getBody();
String jsonString = new String(body);
logger.info("BctchSendSMSReceiver : " + jsonString);
BacthSendSMSMessage msg = JSON.parseObject(jsonString, BacthSendSMSMessage.class);
Map<String, Object> bussinessBody = msg.getBussinessBody();
List<String> templateParams = (List<String>)bussinessBody.get("templeteParam");
String phone = (String) bussinessBody.get("phone"); ModelDO modelDO = modelService.getbyCode((String)bussinessBody.get("templeteCode"));
String text = modelDO.getSendModel();
String[] split = text.split("#");
String model = "";
for (int i = 0; i < split.length; i++) {
if(i%2==0){
model = model + split[i];
}
else if(i%2!=0){
model = model + templateParams.get((i-1)/2);
}
}
SMSToSend toDo = new SMSRetryProxy().getInstance(new SMSService());
Map<String,Object> map = new HashMap<String,Object>();
map.put("channel",(String) bussinessBody.get("channel"));
map.put("sendTemplateCode",(String)bussinessBody.get("templeteCode"));
map.put("templeteContents",templateParams);
map.put("mobile",phone);
map.put("apikey", ConfigConstants.SEND_SMS_APIKEY2);
// map.put("text","【娄维伟】尊敬的张帅东,您有一个工单号为:222991011111111111的待处理工单,请您及时处理!");
map.put("text",model);
map.put("smsProvider","云片");
Map<String,Object> return1 = toDo.singleSend(map);
logger.info("SendSMSController——sendSMS——短信发送详细信息为:"+return1.toString());
SmsSendLogDO smsSendLogDO = (SmsSendLogDO)return1.get("smsSendLogDO"); boolean preRet = preEventHandler(msg);
if (preRet == false) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费一条
return ;
} boolean postRet = postEventHandler(msg);
if (postRet == false) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费一条
return ;
}
//记录日志
afterEventHandler(msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费一条
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
if (message.getMessageProperties().getRedelivered()) {
logger.error("消息已重复处理失败,拒绝再次接收__"+message.getMessageProperties().getDeliveryTag());
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
} else {
logger.error("消息即将再次返回队列处理__"+message.getMessageProperties().getDeliveryTag());
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列
}
}
}
});
return container;
} private void recordLogMQ(BacthSendSMSMessage message, Integer state) {
/*LogMqMessage log = new LogMqMessage();
log.setMessageId(message.getMessageId());
log.setProductCode(message.getProductCode());
log.setConsumerCode(message.getConsumerCode());
log.setEvent(message.getEvent());
log.setBussinessBody(JSON.toJSONString(message));
log.setState(state); logMqMessageService.insertEntry(log);*/
System.out.println("记录日志");
} /**
* 消息体检查
* @param message
* @return Map
*/
private boolean preEventHandler(BacthSendSMSMessage message) { return false;
} /**
* 业务处理
* @param message
* @return
*/
private boolean postEventHandler(BacthSendSMSMessage message) { return true;
} /**
* 记录消息日志
* @param message
*/
private void afterEventHandler(BacthSendSMSMessage message) {
recordLogMQ(message, 1);
} }
四、RabbitMQConfig
package com.bootdo.common.rabbitmq.batchsendsms.config; import com.bootdo.common.rabbitmq.confirm.MsgSendConfirmCallBack;
import com.bootdo.common.rabbitmq.confirm.MsgSendReturnCallback;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; /**
* @author yuduojia
* @date 2018/8/3 15:14
*/
@Configuration
public class RabbitMqConfig { public static final String ROUTING_KEY_1 = "batch_send_key1";
public static final String ROUTING_KEY_2 = "batch_send_key2"; @Autowired
private QueueConfig queueConfig; @Autowired
private ExchangeConfig exchangeConfig; @Autowired
private ConnectionFactory connectionFactory; /**
* 将消息队列1和交换机1进行绑定,指定队列key1
*/
@Bean
public Binding binding_one() {
return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_1);
} /**
* 将消息队列2和交换机1进行绑定,指定队列key2
*/
@Bean
public Binding binding_two() {
return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTING_KEY_2);
} /**
* 定义rabbit template用于数据的接收和发送
* 可以设置消息确认机制和回调
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// template.setMessageConverter(); 可以自定义消息转换器 默认使用的JDK的,所以消息对象需要实现Serializable
// template.setMessageConverter(new Jackson2JsonMessageConverter()); template.setConfirmCallback(msgSendConfirmCallBack());
template.setReturnCallback(msgSendReturnCallback());
template.setMandatory(true);
return template;
} @Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack(){
return new MsgSendConfirmCallBack();
} @Bean
public MsgSendReturnCallback msgSendReturnCallback(){
return new MsgSendReturnCallback();
} }
五、QueueConfig
package com.bootdo.common.rabbitmq.batchsendsms.config; import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import java.util.HashMap;
import java.util.Map; /**
* 队列配置 可以配置多个队列
* @author zhuzhe
* @date 2018/5/25 13:25
* @email [email protected]
*/
@Configuration
public class QueueConfig { /*对列名称*/
public static final String BATCH_SEND_QUEUE_NAME1 = "batch_send_queue1";
public static final String BATCH_SEND_QUEUE_NAME2 = "batch_send_queue2";
//public static final String BATCH_SEND_QUEUE_NAME3 = "batch_send_queue3"; @Bean
public Queue firstQueue() {
return new Queue(BATCH_SEND_QUEUE_NAME1,true,false,false);
} @Bean
public Queue secondQueue() {
return new Queue(BATCH_SEND_QUEUE_NAME2,true,false,false);
} /*@Bean
public Queue thirdQueue() {
return new Queue(BATCH_SEND_QUEUE_NAME3,true,false,false);
}*/
}
六、ExchangeConfig
package com.bootdo.common.rabbitmq.batchsendsms.config; import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; /**
* 消息交换机配置 可以配置多个
* @author zhuzhe
* @date 2018/5/25 15:40
* @email [email protected]
*/
@Configuration
public class ExchangeConfig { public final String EXCHANGE_01 = "batch_send_exchange"; @Bean
public DirectExchange directExchange(){
DirectExchange directExchange = new DirectExchange(EXCHANGE_01,true,false);
return directExchange;
}
}
七、确认后回调
package com.bootdo.common.rabbitmq.confirm; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData; /**
* 确认后回调 massage2exchange
* @author yuduojia
* @date 2018/8/2 15:49
*/
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息确认成功cause:"+cause);
} else {
//处理丢失的消息
System.out.println("消息确认失败:"+correlationData.getId()+"#cause:"+cause);
}
}
}
八、失败后回调
package com.bootdo.common.rabbitmq.confirm; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate; /**
* 失败后return回调 exchange2queue
* @author yuduojia
* @date 2018/8/2 15:48
*/
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.out.println("确认后回调return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:"
+ replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
}
}
如果实际用需要优化 , 我这个没有优化就被砍掉啦 , 很遗憾 等下次用到再更新,确认回调机制 用redis做就可以。
如果有疑问或者建议欢迎评论。