项目实战之RabbitMQ冗余双写架构-LMLPHP


🌟架构图

项目实战之RabbitMQ冗余双写架构-LMLPHP

🌟application.properties

redundancy.mq.redundancy-event-exchange=redundancy.event.exchange
redundancy.mq.add-routing-key=redundancy.add.business.consumer.routing.key
redundancy.mq.add-business-binding-key=redundancy.add.business.*.routing.key
redundancy.mq.add-consumer-binding-key=redundancy.add.*.consumer.routing.key
redundancy.mq.add-business-queue=redundancy.add.business.queue
redundancy.mq.add-consumer-queue=redundancy.add.consumer.queue

🌟RabbitMQ配置

package top.daencode.config;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Copyright (C) 2023-11-29  智源恩创网络科技工作室
 *
 * @BelongsProject: architecture-solution
 * @BelongsPackage: top.daencode.mq
 * @author: DaenCode
 * @createTime: 2023-11-29  15:08
 * @description: TODO
 * @version: 1.0
 */
@Configuration
@Slf4j
@Data
@ConfigurationProperties(prefix = "redundancy.mq")
public class RabbitMqForRedundancyConfig {
    /**
     * 交换机
     */
    private String redundancyEventExchange;
    /**
     * 添加路由key
     */
    private String addRoutingkey;
    /**
     * B端添加绑定key
     */
    private String addBusinessBindingKey;
    /**
     * C端添加绑定key
     */
    private String addConsumerBindingKey;
    /**
     * B端添加队列
     */
    private String addBusinessQueue;
    /**
     * C端添加队列
     */
    private String addConsumerQueue;

    /**
     * 创建冗余双写交换机
     * @return
     */
    @Bean
    public Exchange redundancyEventExchange(){
        return new TopicExchange(redundancyEventExchange);
    }

    /**
     * 创建B端添加队列
     * @return
     */
    @Bean
    public Queue addBusinessQueue(){
        return new Queue(addBusinessQueue,true,false,false);
    }

    /**
     * 创建C端添加队列
     * @return
     */
    @Bean
    public Queue addConsumerQueue(){
        return new Queue(addConsumerQueue,true,false,false);
    }
    /**
     * B端绑定关系
     */
    @Bean
    public Binding addBusinessBinding(){
        return new Binding(addBusinessQueue, Binding.DestinationType.QUEUE,redundancyEventExchange,
                addBusinessBindingKey,null);
    }

    /**
     * C端交换机绑定到队列
     * @return
     */
    @Bean
    public Binding addConsumerBinding(){
        return new Binding(addConsumerQueue, Binding.DestinationType.QUEUE,redundancyEventExchange,
                addConsumerBindingKey,null);
    }
}

🌟消息协议封装

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventMessage implements Serializable {
    /**
     * 消息队列id
     */
    private String messageId;

    /**
     * 事件类型
     */
    private String eventMessageType;

    /**
     * 业务id
     */
    private String bizId;

    /**
     * 消息体
     */
    private String content;

    /**
     * 异常备注
     */
    private String remark;
}

🌟消息类型封装

public enum EventMessageTypeEnum {
    REDUNDANCY_ADD,
    REDUNDANCY_ADD_BUSINESS,
    REDUNDANCY_ADD_CONSUMER,
    REDUNDANCY_DEL,
    REDUNDANCY_DEL_BUSINESS,
    REDUNDANCY_DEL_CONSUMER,
    REDUNDANCY_UPDATE,
    REDUNDANCY_UPDATE_BUSINESS,
    REDUNDANCY_UPDATE_CONSUMER,
}

🌟C端消费者

@Component
@Slf4j
@RabbitListener(queuesToDeclare = {@Queue("redundancy.add.consumer.queue")})
public class RedundancyAddConsumerMQListener {
    @Autowired
    private DetailService detailService;
    /**
     * 消费消息
     * @param eventMessage
     * @param message
     * @param channel
     */
    @RabbitHandler
    public void handleAddConsumer(EventMessage eventMessage, Message message, Channel channel){
        try {
            eventMessage.setEventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD_CONSUMER.name());
            boolean flag= detailService.handleAddDetail(eventMessage);
        } catch (Exception e) {
            log.error("handleAddConsumer--消费失败{}",eventMessage);
        }
    }
}

🌟B端消费者

@Component
@Slf4j
@RabbitListener(queuesToDeclare = {@Queue("redundancy.add.business.queue")})
public class RedundancyAddBusinessMQListener {
    @Autowired
    private DetailService detailService;
    /**
     * 消费消息
     * @param eventMessage
     * @param message
     * @param channel
     */
    @RabbitHandler
    public void handleAddBusiness(EventMessage eventMessage, Message message, Channel channel){
        try {
            eventMessage.setEventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD_BUSINESS.name());
            boolean flag= detailService.handleAddDetail(eventMessage);
        } catch (Exception e) {
            log.error("handleAddBusiness--消费失败{}",eventMessage);
        }
    }
}

🌟发送消息与处理消息

   /**
     * 发送新增消息
     * @param detailRequest
     */
    @Override
    public void addDetail(DetailRequest detailRequest) {
        detailRequest.setBId(IDUtil.generateRandomNumber(5));
        detailRequest.setCId(IDUtil.generateRandomNumber(5));
        //构造消息
        EventMessage eventMessage = EventMessage.builder()
                .messageId(IDUtil.generateRandomNumber(5).toString())
                .content(JsonUtil.obj2Json(detailRequest))
                .eventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD.name())
                .build();
        rabbitTemplate.convertAndSend(rabbitMqForRedundancyConfig.getRedundancyEventExchange(),
                rabbitMqForRedundancyConfig.getAddRoutingkey(),eventMessage);
    }
//处理新增消息
@Override
    public boolean handleAddDetail(EventMessage eventMessage) {
        String messageType= eventMessage.getEventMessageType();
        DetailRequest detailRequest=JsonUtil.json2Obj(eventMessage.getContent(), DetailRequest.class);
        if (messageType.equals(EventMessageTypeEnum.REDUNDANCY_ADD_CONSUMER.name())){
            CDetailDO cDetailDOIndb=cDetailMapper.selectOne(new QueryWrapper<CDetailDO>()
                    .eq("c_id",detailRequest.getCId())
                    .eq("detail",detailRequest.getDetail()));
            if (cDetailDOIndb==null){
                CDetailDO cDetailDO = CDetailDO.builder()
                        .bId(detailRequest.getBId())
                        .cId(detailRequest.getCId())
                        .detail(detailRequest.getDetail())
                        .build();
                cDetailMapper.insert(cDetailDO);
            }else {
                log.error("handleAddDetail---REDUNDANCY_ADD_CONSUMER重复{}",eventMessage);
            }
        } else if (messageType.equals(EventMessageTypeEnum.REDUNDANCY_ADD_BUSINESS.name())) {
            BDetailDO bDetailDOIndb=bDetailMapper.selectOne(new QueryWrapper<BDetailDO>()
                    .eq("b_id",detailRequest.getCId())
                    .eq("detail",detailRequest.getDetail()));
            if (bDetailDOIndb==null){
                BDetailDO bDetailDO = BDetailDO.builder()
                        .bId(detailRequest.getBId())
                        .cId(detailRequest.getCId())
                        .detail(detailRequest.getDetail())
                        .build();
                bDetailMapper.insert(bDetailDO);
            }else {
                log.error("handleAddDetail---REDUNDANCY_ADD_BUSINESS重复{}",eventMessage);
            }
        }
        return false;
    }

🌟最后

最后,感谢大家对本文的阅读,希望对大家有帮助。


项目实战之RabbitMQ冗余双写架构-LMLPHP

12-02 10:38