一、消息队列介绍
1、消息队列概念
2、常⽤的消息队列产品
⼆、RabbitMQ
1、RabbitMQ介绍
2、RabbitMQ安装和配置
3、RabbitMQ逻辑结构
三、RabbitMQ⽤户管理
1、逻辑结构
2、⽤户管理
2.1、命令⾏⽤户管理
2.2、管理系统进⾏⽤户管理
四、RabbitMQ⼯作⽅式
1、简单模式
2、⼯作模式
3、订阅模式
4、路由模式
五、RabbitMQ交换机和队列管理
1、创建队列
2、创建交换机
3、交换机绑定队列
六、在普通的Maven应⽤中使⽤MQ
1、简单模式
1.1、消息⽣产者
1.2、消息消费者
2、⼯作模式
2.1、发送者
public class SendMsg { public static void main(String[] args) throws Exception{ System.out.println("请输⼊消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicPublish("","queue2",null,msg.getBytes()); System.out.println("发送:" + msg); channel.close(); connection.close(); } } }
2.2、消费者1
public class ReceiveMsg { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer1接收:"+msg); if("wait".equals(msg)){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; channel.basicConsume("queue2",true,consumer); } }
2.3、消费者2
public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer2接收:"+msg); } }; channel.basicConsume("queue2",true,consumer); } }
3、订阅模式
1、发送者 发送消息到交换机
public class SendMsg { public static void main(String[] args) throws Exception{ System.out.println("请输⼊消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicPublish("ex1","",null,msg.getBytes()); System.out.println("发送:" + msg); channel.close(); connection.close(); } } }
2、消费者1
public class ReceiveMsg1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer1接收:"+msg); if("wait".equals(msg)){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; channel.basicConsume("queue3",true,consumer); } }
3、消费者2
public class ReceiveMsg2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer2接收:"+msg); } } ; channel.basicConsume("queue4",true,consumer); } }
4、路由模式
1、发送者 发送消息到交换机
public class SendMsg { public static void main(String[] args) throws Exception{ System.out.println("请输⼊消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); if(msg.startsWith("a")){ channel.basicPublish("ex2","a",null,msg.getBytes()); } else if(msg.startsWith("b")){ channel.basicPublish("ex2","b",null,msg.getBytes()); } System.out.println("发送:" + msg); channel.close(); connection.close(); } } }
2、消费者1
public class ReceiveMsg1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer1接收:"+msg); if("wait".equals(msg)){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; channel.basicConsume("queue5",true,consumer); } }
3、消费者2
public class ReceiveMsg2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer2接收:"+msg); } }; channel.basicConsume("queue6",true,consumer); } }
七、在SpringBoot应⽤中使⽤MQ
1、消息⽣产者
2、消息消费者
⼋、使⽤RabbitMQ传递对象
1、使⽤序列化对象
2、使⽤序列化字节数组
3、使⽤JSON字符串传递
九、基于Java的交换机与队列创建
1、普通Maven项⽬交换机及队列创建
2、SpringBoot应⽤中通过配置完成队列的创建
@Configuration public class RabbitMQConfiguration { //声明队列 @Bean public Queue queue9(){ Queue queue9 = new Queue("queue9"); //设置队列属性 return queue9; } @Bean public Queue queue10(){ Queue queue10 = new Queue("queue10"); //设置队列属性 return queue10; } //声明订阅模式交换机 @Bean public FanoutExchange ex5(){ return new FanoutExchange("ex5"); } //声明路由模式交换机 @Bean public DirectExchange ex6(){ return new DirectExchange("ex6"); } //绑定队列 @Bean public Binding bindingQueue9(Queue queue9, DirectExchange ex6){ return BindingBuilder.bind(queue9).to(ex6).with("k1"); } @Bean public Binding bindingQueue10(Queue queue10, DirectExchange ex6){ return BindingBuilder.bind(queue10).to(ex6).with("k2"); } }
⼗、消息的可靠性
1、RabbitMQ事务
Connection connection = RabbitMQUtil.getConnection(); //connection 表示与 host1的连接 Channel channel = connection.createChannel(); channel.txSelect();//开启事务 try{ channel.basicPublish("ex4", "k1", null, msg.getBytes()); channel.txCommit();//提交事务 } catch (Exception e){ channel.txRollback();//事务回滚 } finally{ channel.close(); connection.close(); }
2、RabbitMQ消息确认和return机制
2.1、普通Maven项⽬的消息确认
2.2、普通Maven项⽬的return机制
2.3、在SpringBoot应⽤实现消息确认与return监听
3、RabbitMQ消费者⼿动应答
@Component @RabbitListener(queues="queue01") public class Consumer1 { @RabbitHandler public void process(String msg,Channel channel, Message message) throws IOException { try { System.out.println("get msg1 success msg = "+msg); /** * 确认⼀条消息:<br> * channel.basicAck(deliveryTag, false); <br> * deliveryTag:该消息的index <br> * multiple:是否批量.true:将⼀次性ack所有⼩于deliveryTag的消息 <br> */ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { //消费者处理出了问题,需要告诉队列信息消费失败 /** * 拒绝确认消息:<br> * channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br> * deliveryTag:该消息的index<br> * multiple:是否批量.true:将⼀次性拒绝所有⼩于deliveryTag的消息。<br> * requeue:被拒绝的是否重新⼊队列 <br> */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.err.println("get msg1 failed msg = "+msg); } } }
4、消息消费的幂等性问题
⼗⼀、延迟机制
1、延迟队列
2、使⽤延迟队列实现订单⽀付监控
⼗⼆、消息队列作⽤/使⽤场景总结
1、解耦
2、异步
3、消息通信
4、流量削峰
5、⽇志处理