🌟架构图
🌟application.properties
redundancy.mq.redundancy-event-exchange=redundancy.event.exchange
redundancy.mq.add-routing-key=redundancy.add.business.consumer.routing.key
redundancy.mq.add-business-binding-key=redundancy.add.business.*.routing.key
redundancy.mq.add-consumer-binding-key=redundancy.add.*.consumer.routing.key
redundancy.mq.add-business-queue=redundancy.add.business.queue
redundancy.mq.add-consumer-queue=redundancy.add.consumer.queue
🌟RabbitMQ配置
package top.daencode.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Copyright (C) 2023-11-29 智源恩创网络科技工作室
*
* @BelongsProject: architecture-solution
* @BelongsPackage: top.daencode.mq
* @author: DaenCode
* @createTime: 2023-11-29 15:08
* @description: TODO
* @version: 1.0
*/
@Configuration
@Slf4j
@Data
@ConfigurationProperties(prefix = "redundancy.mq")
public class RabbitMqForRedundancyConfig {
/**
* 交换机
*/
private String redundancyEventExchange;
/**
* 添加路由key
*/
private String addRoutingkey;
/**
* B端添加绑定key
*/
private String addBusinessBindingKey;
/**
* C端添加绑定key
*/
private String addConsumerBindingKey;
/**
* B端添加队列
*/
private String addBusinessQueue;
/**
* C端添加队列
*/
private String addConsumerQueue;
/**
* 创建冗余双写交换机
* @return
*/
@Bean
public Exchange redundancyEventExchange(){
return new TopicExchange(redundancyEventExchange);
}
/**
* 创建B端添加队列
* @return
*/
@Bean
public Queue addBusinessQueue(){
return new Queue(addBusinessQueue,true,false,false);
}
/**
* 创建C端添加队列
* @return
*/
@Bean
public Queue addConsumerQueue(){
return new Queue(addConsumerQueue,true,false,false);
}
/**
* B端绑定关系
*/
@Bean
public Binding addBusinessBinding(){
return new Binding(addBusinessQueue, Binding.DestinationType.QUEUE,redundancyEventExchange,
addBusinessBindingKey,null);
}
/**
* C端交换机绑定到队列
* @return
*/
@Bean
public Binding addConsumerBinding(){
return new Binding(addConsumerQueue, Binding.DestinationType.QUEUE,redundancyEventExchange,
addConsumerBindingKey,null);
}
}
🌟消息协议封装
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventMessage implements Serializable {
/**
* 消息队列id
*/
private String messageId;
/**
* 事件类型
*/
private String eventMessageType;
/**
* 业务id
*/
private String bizId;
/**
* 消息体
*/
private String content;
/**
* 异常备注
*/
private String remark;
}
🌟消息类型封装
public enum EventMessageTypeEnum {
REDUNDANCY_ADD,
REDUNDANCY_ADD_BUSINESS,
REDUNDANCY_ADD_CONSUMER,
REDUNDANCY_DEL,
REDUNDANCY_DEL_BUSINESS,
REDUNDANCY_DEL_CONSUMER,
REDUNDANCY_UPDATE,
REDUNDANCY_UPDATE_BUSINESS,
REDUNDANCY_UPDATE_CONSUMER,
}
🌟C端消费者
@Component
@Slf4j
@RabbitListener(queuesToDeclare = {@Queue("redundancy.add.consumer.queue")})
public class RedundancyAddConsumerMQListener {
@Autowired
private DetailService detailService;
/**
* 消费消息
* @param eventMessage
* @param message
* @param channel
*/
@RabbitHandler
public void handleAddConsumer(EventMessage eventMessage, Message message, Channel channel){
try {
eventMessage.setEventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD_CONSUMER.name());
boolean flag= detailService.handleAddDetail(eventMessage);
} catch (Exception e) {
log.error("handleAddConsumer--消费失败{}",eventMessage);
}
}
}
🌟B端消费者
@Component
@Slf4j
@RabbitListener(queuesToDeclare = {@Queue("redundancy.add.business.queue")})
public class RedundancyAddBusinessMQListener {
@Autowired
private DetailService detailService;
/**
* 消费消息
* @param eventMessage
* @param message
* @param channel
*/
@RabbitHandler
public void handleAddBusiness(EventMessage eventMessage, Message message, Channel channel){
try {
eventMessage.setEventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD_BUSINESS.name());
boolean flag= detailService.handleAddDetail(eventMessage);
} catch (Exception e) {
log.error("handleAddBusiness--消费失败{}",eventMessage);
}
}
}
🌟发送消息与处理消息
/**
* 发送新增消息
* @param detailRequest
*/
@Override
public void addDetail(DetailRequest detailRequest) {
detailRequest.setBId(IDUtil.generateRandomNumber(5));
detailRequest.setCId(IDUtil.generateRandomNumber(5));
//构造消息
EventMessage eventMessage = EventMessage.builder()
.messageId(IDUtil.generateRandomNumber(5).toString())
.content(JsonUtil.obj2Json(detailRequest))
.eventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD.name())
.build();
rabbitTemplate.convertAndSend(rabbitMqForRedundancyConfig.getRedundancyEventExchange(),
rabbitMqForRedundancyConfig.getAddRoutingkey(),eventMessage);
}
//处理新增消息
@Override
public boolean handleAddDetail(EventMessage eventMessage) {
String messageType= eventMessage.getEventMessageType();
DetailRequest detailRequest=JsonUtil.json2Obj(eventMessage.getContent(), DetailRequest.class);
if (messageType.equals(EventMessageTypeEnum.REDUNDANCY_ADD_CONSUMER.name())){
CDetailDO cDetailDOIndb=cDetailMapper.selectOne(new QueryWrapper<CDetailDO>()
.eq("c_id",detailRequest.getCId())
.eq("detail",detailRequest.getDetail()));
if (cDetailDOIndb==null){
CDetailDO cDetailDO = CDetailDO.builder()
.bId(detailRequest.getBId())
.cId(detailRequest.getCId())
.detail(detailRequest.getDetail())
.build();
cDetailMapper.insert(cDetailDO);
}else {
log.error("handleAddDetail---REDUNDANCY_ADD_CONSUMER重复{}",eventMessage);
}
} else if (messageType.equals(EventMessageTypeEnum.REDUNDANCY_ADD_BUSINESS.name())) {
BDetailDO bDetailDOIndb=bDetailMapper.selectOne(new QueryWrapper<BDetailDO>()
.eq("b_id",detailRequest.getCId())
.eq("detail",detailRequest.getDetail()));
if (bDetailDOIndb==null){
BDetailDO bDetailDO = BDetailDO.builder()
.bId(detailRequest.getBId())
.cId(detailRequest.getCId())
.detail(detailRequest.getDetail())
.build();
bDetailMapper.insert(bDetailDO);
}else {
log.error("handleAddDetail---REDUNDANCY_ADD_BUSINESS重复{}",eventMessage);
}
}
return false;
}
🌟最后
最后,感谢大家对本文的阅读,希望对大家有帮助。