RabbitMQ消息的可靠性主要包括两方面,一方面是通过实现消费的重试机制(通过@Retryable来实现重试,可以设置重试次数和重试频率,但是要保证幂等性),另一方面就是实现消息生产者的可靠投递(注意消费单幂等),下面主要讲下生产者实现的可靠消息投递。

    rabbitTemplate的发送流程是这样的:

    1 发送数据并返回(不确认rabbitmq服务器已成功接收)

    2 异步的接收从rabbitmq返回的ack确认信息

    3 收到ack后调用confirmCallback函数 注意:在confirmCallback中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用 在这种情况下,如果在2,3步中任何时候切断连接,我们都无法确认数据是否真的已经成功发送出去,从而造成数据丢失的问题。

    最完美的解决方案只有1种: 使用rabbitmq的事务机制。 但是在这种情况下,rabbitmq的效率极低,每秒钟处理的message在几百条左右。实在不可取。

    第二种解决方式,使用同步的发送机制,也就是说,客户端发送数据,rabbitmq收到后返回ack,再收到ack后,send函数才返回。代码类似这样:

创建channel
send message
wait for ack(or 超时)
close channel
返回成功or失败

    同样的,由于每次发送message都要重新建立连接,效率很低。

    基于上面的分析,我们使用一种新的方式来做到数据的不丢失。

    在rabbitTemplate异步确认的基础上

    1 在redis中缓存已发送的message

    2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除

    3 定时扫描本地的message,如果大于一定时间未被确认,则重发

    当然了,这种解决方式也有一定的问题: 想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。 自动重试的代码如下:

    

package cn.chinotan.service.reliabletransmission;

/**
 * @program: test
 * @description: rabbitMq常量
 * @author: xingcheng
 * @create: 2018-08-12 12:30
 **/
public class MyConstant {

    public static final String MY_EXCHANGE = "my_exchange";

    public static final String ERROR_EXCHANGE = "error_exchange";

    public static final String MY_QUEUE_THREE = "my_queue_three";

    public final static String KEY_PREFIX = "test:rabbitMq:";

    /**
     * consumer失败后等待时间(mils)
     */
    public static final int ONE_MINUTE = 1 * 60 * 1000;

    /**
     * MQ消息retry时间
     */
    public static final int RETRY_TIME_INTERVAL = ONE_MINUTE;

    /**
     * MQ消息有效时间
     */
    public static final int VALID_TIME = ONE_MINUTE;

}
package cn.chinotan.service.reliabletransmission;

import java.io.Serializable;

/**
 * @program: test
 * @description: 包装消息
 * @author: xingcheng
 * @create: 2018-09-24 15:32
 **/
public class MessageWithTime implements Serializable {

    private String id;
    private long time;
    private String message;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}
package cn.chinotan.service.reliabletransmission;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitMQ配置
 */
@Configuration
public class ReliableRabbitConfig {

    @Bean
    public DirectExchange myExchange() {
        return new DirectExchange(MyConstant.MY_EXCHANGE, true, false);
    }

    @Bean
    public Queue myQueueOne() {
        return new Queue(MyConstant.MY_QUEUE_THREE, true);
    }

    @Bean
    public Binding queueOneBinding() {
        return BindingBuilder.bind(myQueueOne()).to(myExchange()).withQueueName();
    }
}

package cn.chinotan.service.reliabletransmission;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.UUID;

/**
 * @program: test
 * @description: rabbitService
 * @author: xingcheng
 * @create: 2018-09-24 14:28
 **/
@Service
public class RabbitMQService {

    Logger logger = LoggerFactory.getLogger(RabbitMQService.class);

    @Autowired
    StringRedisTemplate redisTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public Boolean send(String exchange, String routingKey, Object message) {
        try {
            String key = StringUtils.join(MyConstant.KEY_PREFIX, UUID.randomUUID().toString().replace("-", "").toLowerCase());

            // 发送前保存消息和时间和id到redis缓存中
            MessageWithTime messageWithTime = new MessageWithTime();
            messageWithTime.setId(key);
            messageWithTime.setMessage(JSONObject.toJSONString(message));
            messageWithTime.setTime(System.currentTimeMillis());
            redisTemplate.opsForValue().set(key, JSONObject.toJSONString(messageWithTime));

            // 异步回调通知
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    logger.info("message send success--id:[{}]", correlationData.getId());
                    // 发送成功后,删除redis缓存
                    redisTemplate.delete(correlationData.getId());
                } else {
                    // 发送失败后打印日志,进行重试
                    logger.error("message send fail--id:[{}]", correlationData.getId());
                }
            });

            CorrelationData correlationData = new CorrelationData(key);
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        } catch (Exception e) {
            logger.error("发送消息异常{}", e);
            return false;
        }

        return true;
    }

    Boolean send(String exchange, String routingKey, MessageWithTime message) {
        try {
            // 异步回调通知
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    logger.info("message send success--id:[{}]", correlationData.getId());
                    // 发送成功后,删除redis缓存
                    redisTemplate.delete(correlationData.getId());
                } else {
                    // 发送失败后打印日志,进行重试
                    logger.error("message send fail--id:[{}]", correlationData.getId());
                }
            });

            CorrelationData correlationData = new CorrelationData(message.getId());
            Map map = JSON.parseObject(message.getMessage(), Map.class);
            rabbitTemplate.convertAndSend(exchange, routingKey, map, correlationData);
        } catch (Exception e) {
            logger.error("发送消息异常{}", e);
            return false;
        }

        return true;
    }

}
package cn.chinotan.service.reliabletransmission;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Map;

/**
 * 生产者
 */
@Service
public class ReliableProducr {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReliableProducr.class);

    @Autowired
    private RabbitMQService rabbitMQService;

    public Boolean send(Map msg) {
        return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg);
    }

    public Boolean send(MessageWithTime msg) {
        return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg);
    }
}

package cn.chinotan.service.reliabletransmission;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;

/**
 * @program: test
 * @description: 可靠投递监听器
 * @author: xingcheng
 * @create: 2018-09-24 16:05
 **/
@WebListener
public class ReliableTransContextListener implements ServletContextListener {

    Logger logger = LoggerFactory.getLogger(ReliableTransContextListener.class);

    private WebApplicationContext springContext;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        logger.info("ReliableTransContextListener init start...........");
        springContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
        if (springContext != null) {
            RetryCache retryCache = (RetryCache) springContext.getBean("retryCache");
            new Thread(() -> retryCache.startRetry()).start();
        }
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
    }
}
package cn.chinotan.service.reliabletransmission;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Set;

/**
 * @program: test
 * @description: 缓存重试
 * @author: xingcheng
 * @create: 2018-09-24 16:12
 **/
@Component("retryCache")
public class RetryCache {

    private boolean stop = false;

    Logger logger = LoggerFactory.getLogger(RetryCache.class);

    @Autowired
    private ReliableProducr producr;

    @Autowired
    private StringRedisTemplate redisTemplate;

    private final String STAR = "*";

    public void startRetry() {
        while (!stop) {
            try {
                Thread.sleep(MyConstant.RETRY_TIME_INTERVAL);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            long now = System.currentTimeMillis();

            Set<String> keys = redisTemplate.keys(StringUtils.join(MyConstant.KEY_PREFIX, STAR));
            if (keys != null && !keys.isEmpty()) {
                List<String> list = redisTemplate.opsForValue().multiGet(keys);
                list.forEach(value -> {
                    MessageWithTime messageWithTime = JSON.parseObject(value, MessageWithTime.class);

                    if (null != messageWithTime) {
                        if (messageWithTime.getTime() + 3 * MyConstant.VALID_TIME < now) {
                            logger.error("send message {} failed after 3 min ", messageWithTime);
                            redisTemplate.delete(messageWithTime.getId());
                        } else if (messageWithTime.getTime() + MyConstant.VALID_TIME < now) {
                            Boolean send = producr.send(messageWithTime);
                            logger.info("进行重新投递消息");
                            if (!send) {
                                logger.error("retry send message failed {}", messageWithTime);
                            }
                        }
                    }
                });
            }
        }
    }

}
package cn.chinotan.service.reliabletransmission;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

/**
 * queueThree消费者
 */
@Component
public class MyQueueThreeConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueThreeConsumer.class);

    /**
     * 消费者做好幂等
     *
     * @param content
     */
    @RabbitListener(queues = MyConstant.MY_QUEUE_THREE)
    @RabbitHandler
    public void process(Map content) {
        LOGGER.info("消费者,queueThree开始执行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        LOGGER.info("消费者,queueThree消费内容:[{}]", JSON.toJSONString(content));
    }
}
import cn.chinotan.service.reliabletransmission.MyConstant;
import cn.chinotan.service.reliabletransmission.RabbitMQService;
import cn.chinotan.service.reliabletransmission.ReliableProducr;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

/**
 * @program: test
 * @description: 可靠投递测试
 * @author: xingcheng
 * @create: 2018-09-24 15:57
 **/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = MyApplication.class)
public class ReliableTransmissionTest {

    @Autowired
    private ReliableProducr producr;

    @Autowired
    private RabbitMQService rabbitMQService;

    /**
     * 正常情况测试
     * @throws Exception
     */
    @Test
    public void reliableTransmissionTest() throws Exception {
        Map<String, String> map = new HashMap<>();
        map.put("name", "xingheng");
        producr.send(map);
    }

    /**
     * 异常情况测试
     * @throws Exception
     */
    @Test
    public void reliableTransmissionFailTest() throws Exception {
        Map<String, String> map = new HashMap<>();
        map.put("name", "xingheng");
        rabbitMQService.send(MyConstant.ERROR_EXCHANGE, MyConstant.MY_QUEUE_THREE, map);
    }
}

注意事项:

1.配置中要开启发布者确认,类似这样:

spring:
  rabbitmq:
    publisher-confirms: true

2.如果要测试异常情况只需要将消息发送到一个不存在的交换机即可

3.注意消费端幂等

简单测试结果:

rabbitMQ实现可靠消息投递-LMLPHP

rabbitMQ实现可靠消息投递-LMLPHP

在重试一次后,会将它发送到正确的交换机,于是发送成功
rabbitMQ实现可靠消息投递-LMLPHP

03-07 15:33