目录:
- RabbitMQ几大组件
- 交换器类型
- RabbitMQ运行流程
RabbitMQ几大组件:(与RabbitMQ第一节中AMQP一样,不细说)
1、生产者、消费者、消息
2、Broker:简单的来说broker就是一个RabbitMQ的一个服务节点
3、队列
4、交换器、路由键、绑定键
交换器类型:
1、fanout(分列):把所以发送到该交换器上的消息,路由到与改交换器绑定的队列中。
1 public class Product { 2 3 private static final String EXCHANGE_NAME = "exchange"; 4 private static final String QUEUE_NAME = "exchange_queue1"; 5 private static final String QUEUE_NAME2 = "exchange_queue2"; 6 private static final String ROUTING_KEY = "exchange_routingKey"; 7 private static final String ROUTING_KEY2 = "exchange_routingKey2"; 8 9 public static void main(String[] args) throws IOException, TimeoutException { 10 Connection connection = getConnection(); 11 Channel channel = connection.createChannel(); 12 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); 13 14 // 定义队列1、队列2 15 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 16 channel.queueDeclare(QUEUE_NAME2, false, false, false, null); 17 // 将队列1、队列2绑定到同一个fanout交换器上,但路由键不一样 18 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); 19 channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, ROUTING_KEY2); 20 21 // 仅发送路由键1的消息,但因绑定键是fanout类型,所以队列1和队列2都会受到消息 22 channel.basicPublish(EXCHANGE_NAME, 23 ROUTING_KEY, 24 MessageProperties.PERSISTENT_TEXT_PLAIN, 25 "Hello world".getBytes()); 26 27 close(connection, channel); 28 } 29 }
2、direct(直接):需要bindingKey和routingKey完全匹配才能成功发送消息。
就fanout例子的第12行代码修改为channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);(注意:修改代码运行前,需要先将原先绑定的交换器删掉,因为之前那个交换器已经是fantou类型了;或者是定义一个新的交换器)
便只有exchange_queue1才能接受到消息,便为2。
3、topic(主题):与direct类似,但支持通配符匹配(类似于广播)。
- #:匹配多个或零个单词
- *:匹配一个单词
4、header:不是根据路由键的匹配规则,而是通过发送消息中的header来匹配。
RabbitMQ运行流程:
1、生产者:
- 生产者与Broker建立连接并开启信道
- 生产者声明交换器(交换器类型、是否持久化、是否自动删除)、队列(是否持久化、是否排他、是否自动删除)
- 生产者通过路由键将交换器和队列绑定
- 生产者发送消息到Broker(携带路由键等)
- 交换器再根据接收到的路由键以及交换器类型查找相匹配的队列,如果匹配上了将消息存入队列中,若未匹配上,则根据生产者的配置决定丢弃或是退回给生产者
- 关闭信道和连接
2、消费者:
- 消费者与Broker建立连接并开启信道
- 消费者向Broker请求消费相对应的队列,此中可以设置回调函数
- 接受并处理消息
- 消费者可以确认消息(ack)
- RabbitMQ从队列中删除已被确认的消息
- 关闭信道和连接