一、消息队列介绍

1、消息队列概念

RabbitMQ保姆级教程最佳实践-LMLPHP

2、常⽤的消息队列产品

RabbitMQ保姆级教程最佳实践-LMLPHP

⼆、RabbitMQ

1、RabbitMQ介绍

2、RabbitMQ安装和配置

3、RabbitMQ逻辑结构

RabbitMQ保姆级教程最佳实践-LMLPHP

三、RabbitMQ⽤户管理

1、逻辑结构

2、⽤户管理

2.1、命令⾏⽤户管理

2.2、管理系统进⾏⽤户管理

RabbitMQ保姆级教程最佳实践-LMLPHP

RabbitMQ保姆级教程最佳实践-LMLPHP

RabbitMQ保姆级教程最佳实践-LMLPHP

RabbitMQ保姆级教程最佳实践-LMLPHP

四、RabbitMQ⼯作⽅式

1、简单模式

RabbitMQ保姆级教程最佳实践-LMLPHP

2、⼯作模式

RabbitMQ保姆级教程最佳实践-LMLPHP

3、订阅模式

RabbitMQ保姆级教程最佳实践-LMLPHP

4、路由模式

RabbitMQ保姆级教程最佳实践-LMLPHP

五、RabbitMQ交换机和队列管理

1、创建队列

RabbitMQ保姆级教程最佳实践-LMLPHP

2、创建交换机

RabbitMQ保姆级教程最佳实践-LMLPHP

3、交换机绑定队列

RabbitMQ保姆级教程最佳实践-LMLPHP

六、在普通的Maven应⽤中使⽤MQ

RabbitMQ保姆级教程最佳实践-LMLPHP

1、简单模式

1.1、消息⽣产者

1.2、消息消费者

2、⼯作模式

2.1、发送者

public class SendMsg {
    public static void main(String[] args) throws Exception{
        System.out.println("请输⼊消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.basicPublish("","queue2",null,msg.getBytes());
            System.out.println("发送:" + msg);
            channel.close();
            connection.close();
        }
    }
}

2.2、消费者1

public class ReceiveMsg {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
                if("wait".equals(msg)){
                    try {
                        Thread.sleep(10000);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        channel.basicConsume("queue2",true,consumer);
    }
}

2.3、消费者2

public class ReceiveMsg {
    public static void main(String[] args) throws IOException,
    TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };
        channel.basicConsume("queue2",true,consumer);
    }
}

3、订阅模式

1、发送者 发送消息到交换机

public class SendMsg {
    public static void main(String[] args) throws Exception{
        System.out.println("请输⼊消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.basicPublish("ex1","",null,msg.getBytes());
            System.out.println("发送:" + msg);
            channel.close();
            connection.close();
        }
    }
}

2、消费者1

public class ReceiveMsg1 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
                if("wait".equals(msg)){
                    try {
                        Thread.sleep(10000);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        channel.basicConsume("queue3",true,consumer);
    }
}

3、消费者2

public class ReceiveMsg2 {
    public static void main(String[] args) throws IOException,
    TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        }
        ;
        channel.basicConsume("queue4",true,consumer);
    }
}

4、路由模式

1、发送者 发送消息到交换机

public class SendMsg {
    public static void main(String[] args) throws Exception{
        System.out.println("请输⼊消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            if(msg.startsWith("a")){
                channel.basicPublish("ex2","a",null,msg.getBytes());
            } else if(msg.startsWith("b")){
                channel.basicPublish("ex2","b",null,msg.getBytes());
            }
            System.out.println("发送:" + msg);
            channel.close();
            connection.close();
        }
    }
}

2、消费者1

public class ReceiveMsg1 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
                if("wait".equals(msg)){
                    try {
                        Thread.sleep(10000);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        channel.basicConsume("queue5",true,consumer);
    }
}

3、消费者2

public class ReceiveMsg2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };
        channel.basicConsume("queue6",true,consumer);
    }
}

七、在SpringBoot应⽤中使⽤MQ

1、消息⽣产者

2、消息消费者

⼋、使⽤RabbitMQ传递对象

1、使⽤序列化对象

2、使⽤序列化字节数组

3、使⽤JSON字符串传递

九、基于Java的交换机与队列创建

1、普通Maven项⽬交换机及队列创建

2、SpringBoot应⽤中通过配置完成队列的创建

@Configuration
public class RabbitMQConfiguration {
    //声明队列
    @Bean
    public Queue queue9(){
        Queue queue9 = new Queue("queue9");
        //设置队列属性
        return queue9;
    }
    @Bean
    public Queue queue10(){
        Queue queue10 = new Queue("queue10");
        //设置队列属性
        return queue10;
    }
    //声明订阅模式交换机
    @Bean
    public FanoutExchange ex5(){
        return new FanoutExchange("ex5");
    }
    //声明路由模式交换机
    @Bean
    public DirectExchange ex6(){
        return new DirectExchange("ex6");
    }
    //绑定队列
    @Bean
    public Binding bindingQueue9(Queue queue9, DirectExchange ex6){
        return BindingBuilder.bind(queue9).to(ex6).with("k1");
    }
    @Bean
    public Binding bindingQueue10(Queue queue10, DirectExchange ex6){
        return BindingBuilder.bind(queue10).to(ex6).with("k2");
    }
}

⼗、消息的可靠性

RabbitMQ保姆级教程最佳实践-LMLPHP

1、RabbitMQ事务

Connection connection = RabbitMQUtil.getConnection(); //connection 表示与 host1的连接
Channel channel = connection.createChannel();
channel.txSelect();//开启事务
try{
    channel.basicPublish("ex4", "k1", null, msg.getBytes());
    channel.txCommit();//提交事务
}
catch (Exception e){
    channel.txRollback();//事务回滚
}
finally{
    channel.close();
    connection.close();
}

2、RabbitMQ消息确认和return机制

RabbitMQ保姆级教程最佳实践-LMLPHP

2.1、普通Maven项⽬的消息确认

2.2、普通Maven项⽬的return机制

2.3、在SpringBoot应⽤实现消息确认与return监听

3、RabbitMQ消费者⼿动应答 

@Component
@RabbitListener(queues="queue01")
public class Consumer1 {
    @RabbitHandler
    public void process(String msg,Channel channel, Message message) throws IOException {
        try {
            System.out.println("get msg1 success msg = "+msg);
            /**
         * 确认⼀条消息:<br>
         * channel.basicAck(deliveryTag, false); <br>
         * deliveryTag:该消息的index <br>
         * multiple:是否批量.true:将⼀次性ack所有⼩于deliveryTag的消息 <br>
       */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            //消费者处理出了问题,需要告诉队列信息消费失败
            /**
         * 拒绝确认消息:<br>
         * channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br>
         * deliveryTag:该消息的index<br>
         * multiple:是否批量.true:将⼀次性拒绝所有⼩于deliveryTag的消息。<br>
         * requeue:被拒绝的是否重新⼊队列 <br>
       */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.err.println("get msg1 failed msg = "+msg);
        }
    }
}

4、消息消费的幂等性问题

⼗⼀、延迟机制

1、延迟队列

RabbitMQ保姆级教程最佳实践-LMLPHP

2、使⽤延迟队列实现订单⽀付监控

⼗⼆、消息队列作⽤/使⽤场景总结

1、解耦

RabbitMQ保姆级教程最佳实践-LMLPHP

RabbitMQ保姆级教程最佳实践-LMLPHP

2、异步

RabbitMQ保姆级教程最佳实践-LMLPHP

3、消息通信

RabbitMQ保姆级教程最佳实践-LMLPHP

4、流量削峰

RabbitMQ保姆级教程最佳实践-LMLPHP

5、⽇志处理

RabbitMQ保姆级教程最佳实践-LMLPHP

09-24 02:03