1、安装rabbitmq
怎么安装rabbitmq请查看之前课程,如果已经安装,请略过此步。
2、创建vendor文件夹或是直接采用PHP框架
mkdir vendor
3、进入文件
cd vendor
4、安装php扩展
composer require php-amqplib/php-amqplib
5、进入上级创建topic文件夹
cd ../
mkdir topic
6、进入topic文件并创建生产者php
cd topic
touch publish.php
7、输入topic生产者内容
<?php
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$v_host = 'order';
$exc_name ='topic_log';
$routing_key = 'goods.warn';
$connection = new AMQPStreamConnection('localhost',5672,'wangyulong','wangyulong',$v_host);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'topic',false,false,false);
$data = 'this is '.$routing_key.' message';
$msg = new AMQPMessage($data,['delivery_mode'=>AMQPMEssage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg,$exc_name,$routing_key);
$channel->close();
$connection->close();
8、创建消费者php
touch all.php
touch user.php
touch warn.php
9、输入topic消费者内容
vi all.php
<?php
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$v_host ='order';
$exc_name = 'topic_log';
$routing_key = '#';
$connection = new AMQPStreamConnection('localhost',5672,'wangyulong','wangyulong',$v_host);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'topic',false,false,false);
list($queue_name,,) = $channel->queue_declare('',false,false,true,false);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$callback = function($msg){
echo 'received ' ,$msg->body,"\n";
$msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
vi user.php
<?php
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$v_host ='order';
$exc_name = 'topic_log';
$routing_key = 'user.*';
$connection = new AMQPStreamConnection('localhost',5672,'wangyulong','wangyulong',$v_host);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'topic',false,false,false);
list($queue_name,,) = $channel->queue_declare('',false,false,true,false);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$callback = function($msg){
echo 'received ' ,$msg->body,"\n";
$msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
vi warn.php
<?php
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$v_host ='order';
$exc_name = 'topic_log';
$routing_key = '*.warn';
$connection = new AMQPStreamConnection('localhost',5672,'wangyulong','wangyulong',$v_host);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'topic',false,false,false);
list($queue_name,,) = $channel->queue_declare('',false,false,true,false);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$callback = function($msg){
echo 'received ' ,$msg->body,"\n";
$msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
10、执行消费者
php all.php
php user.php
php warn.php
11、执行生产者
php publish.php
此时你会发现只有warn.php对应的消费者及all.php都有数据,如果你把publish.php生产者里面的$routing_key对应的值改成user.goods,你会发现user.php及all.php对应的消费者都有数据。这就是rabbitmq topic工作模式硬实力