交换器

RabbitMQ 消息传递模型的核心思想是生产者从不直接向队列发送任何消息。生产者只将消息发送到 Exchange 交换器中,并不知道消息是否会被传送到队列。交换器负责接收生产者生产的消息,并通过一定路由规则将消息发送到指定的队列,起到一个传递的作用

类型介绍

RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP规范里还提到两种 Exchange Type,分别为 system 与自定义,这里不予以描述)。

fanout

fanout 类型的 Exchange 路由规则非常。它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中。这种模式在 RabboitMQ 官方介绍中称之为:发布/订阅

图 1 fanout Exchange

来看下官方给出的代码示例:

emit_log.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'exchange_name';
$channel->exchange_declare($exchange_name, 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "info: Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

receive_logs.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'exchange_name';
$channel->exchange_declare($exchange_name, 'fanout', false, false, false);
// 获取系统返回的队列名称
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// 将 Queue 和 Exchange 绑定
$channel->queue_bind($queue_name, $exchange_name);

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

执行 shell 命令

php receive_logs.php

php emit_log.php 

direct

direct 类型的 Exchange 路由规则也很简单,它会把消息路由到那些 binding key 与
routing key 完全匹配的 Queue 中。官方说明:direct

图 2 direct Exchange

以上图的配置为例,我们以 routingKey=”error”发送消息到 Exchange,则消息会路由到 Queue1(amqp.gen-S9b…,这是由 RabbitMQ 自动生成的 Queue 名称)和 Queue2(amqp.gen-Agl…);如果我们以 routingKey=”info”或 routingKey=”warning”来发送消息,则消息只会路由到 Queue2。如果我们以其他 routingKey 发送消息,则消息不会路由到这两个 Queue 中。

来看下官方给出的代码示例:

emit_log.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'direct_logs';
$channel->exchange_declare($exchange_name, 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

// basic_publish 函数中第三个参数指定 routingKey,将消息、交换器和 routingKey 绑定到一起
$channel->basic_publish($msg, $exchange_name, $severity);

echo ' [x] Sent ', $severity, ':', $data, "\n";

$channel->close();
$connection->close();

receive_logs.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'direct_logs';
$channel->exchange_declare($exchange_name, 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

// 循环调用 queue_bind 函数,第三个参数 routingKey,将队列、交换器和 routingKey 绑定到一起,交换器会根据 routingKey 将消息路由到绑定的队列中
foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, $exchange_name, $severity);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

执行 shell 命令:

 # 生产者:
 php emit_log.php error "routingKey error"
 php emit_log.php warning "routingKey warning"
 php emit_log.php info "routingKey info"
 php emit_log.php info warning error "routingKey info warning error"

 # 消费者:
 # 消费者可以执行多个终端
 php receive_logs.php info warning error
 php receive_logs.php info
 php receive_logs.php warning
 php receive_logs.php warning error

topic

direct 类型的 Exchange 路由规则是完全匹配 binding key 与 routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic 类型的 Exchange 在匹配规则上进行了扩展,它与 direct 类型的 Exchage 相似,也是将消息路由到 binding key 与 routing key 相匹配的 Queue 中,但这里的匹配规则有些不同。官方解释:topic。它约定:

  • routing key 为一个英文句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key 与 routing key 一样也是句点号“. ”分隔的字符串
  • binding key 中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个

图 3 topic Exchange

以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到 Q1 与 Q2,routingKey=”lazy.orange.fox”的消息会路由到 Q1,routingKey=”lazy.brown.fox”的消息会路由到 Q2,routingKey=”lazy.pink.rabbit”的消息会路由到 Q2(只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何 bindingKey。

来看下官方给出的代码示例:

emit_log.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'topic_logs';
$channel->exchange_declare($exchange_name, 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, $exchange_name, $routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

$channel->close();
$connection->close();

receive_logs.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'topic_logs';
$channel->exchange_declare($exchange_name, 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, $exchange_name, $binding_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

执行 shell 命令,可自行测试:

# 所有
php receive_logs.php "#"

# 路由到 Q1 与 Q2
php receive_logs.php "quick.orange.rabbit"

# 路由到 Q1
php receive_logs.php "lazy.orange.fox"

# 路由到 Q2
php receive_logs.php "lazy.brown.fox*"

# 路由到 Q1 与 Q2
php emit_log.php "quick.orange.rabbit" "Route to Q1 and Q2 at the same time"

headers

headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消
息,而是根据发送的消息内容中的 headers 属性进行匹配。

在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到 Exchange 时,RabbitMQ 会取
到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配 Queue 与
Exchange 绑定时指定的键值对;如果完全匹配则消息会路由到该 Queue,否则不会路由到该
Queue。

headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

03-05 20:20