三个方面保证消息的可靠性

  1.消息的持久化

  2.事物

  3.签收

一:PERSISTENT:持久性

    参数说明:1.持久化

         2.非持久化

   Java里面设置持久化和非持久化

  持久化:

    将持久性设置为持久化

     宕机前,数据正常,未被消费

  

    服务器恢复后,数据仍然存在,未被消费的消息为3  

  非持久化

    设置为非持久化

    宕机前

     服务器恢复后,消息全部丢失

  上面例子指定了持久化和非持久化,如果不指定的话默认为持久化

  以上是针对队列模式(queue)

   主题模式(Topic)

    主题模式的持久化是针对订阅者,因为订阅模式下生产者将消息发送出去就啥也不管了,如果没有订阅者,消息就等于是废消息,一点意义也没有,所以应该在消费者端进行持久化

   实验:

   1.首先消费者先订阅:如图一个订阅者(在线)

  代码:

package com.steak.activemq.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

import java.io.IOException;

public class Consumer {

private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";

    private static final StringQUEUE ="topic_persist";

    public static void main(String[] args)throws JMSException, IOException {

//创建连接工厂

        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);

        //通过连接工厂,获得连接

        Connection connection = activeMQConnectionFactory.createConnection();

        connection.setClientID("刘牌");//订阅者

        //创建session,第一个参数叫事物,第二个叫签收

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //创建目的地

        Topic topic = session.createTopic(QUEUE);

        //持久化订阅者

        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark");

        connection.start();

        Message message = topicSubscriber.receive();

        while (null != message){

TextMessage textMessage = (TextMessage) message;

            System.out.println("topic持久化的消息  "+textMessage.getText());

            //如果1秒钟以后收不到消息,自动断开,相当于取关

            message = topicSubscriber.receive(1000L);

        }

session.close();

        connection.close();

    }

}

     2.然后启动生产者(发布者):此时发布了三条,订阅者收到了三条

   代码:

package com.steak.activemq.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Producer {

private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";

    private static final StringQUEUE ="topic_persist";

    public static void main(String[] args)throws JMSException {

//创建连接工厂

        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);

        //通过连接工厂,获得连接

        Connection connection = activeMQConnectionFactory.createConnection();

        //创建session

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //创建目的地

        Topic topic = session.createTopic(QUEUE);

        //创建消息的生产者

        MessageProducer messageProducer = session.createProducer(topic);

            connection.start();

        //通过使用messageProducer生产消息发送到MQ队列里

        for (int i =0 ; i <3 ; i++){

//创建消息

            TextMessage textMessage = session.createTextMessage("消息 "+i);

            //通过messageProducer发送消息

            messageProducer.send(textMessage);

        }

//关闭资源

        messageProducer.close();

        session.close();

        connection.close();

        System.out.println("消息发送完成");

    }

}

  因为我们设置了1秒钟过后如果收不到消息就断开连接,所以消费者从在线变为离线

   如果设置为receive(),则一直监听(相当于微信公众号一直都关注,一直都能收到消息)

  无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收到的消息都接收过来(相当于我取关了,不能收到消息,但是我再此关注,我也能把我取关的这段时间的消息都收到),前提时注册过一次,第一注册以前的消息肯定是收不到

  

01-19 15:37