目录

RabbitMQ --- Hello Mr.Tua

RabbitMQ --- Work Queues(工作队列)

RabbitMQ --- Publish/Subscribe(发布/订阅)

前言

在上一章中介绍了 Publish/Subscribe(发布/订阅),它是把每个消息发送给多个 Consumer,也就是说每个 Consumer 都是接收所有的消息,辣么问题来了,如果 Consumer 只接收它想要的某一部分消息,那该怎么办呢?可以通过 Routing(路由)的机制来实现。

Direct交换机(Direct exchange)

在上一章的示例中通过 Fanout exchange 把所有消息广播到多个 Consumer,这样是无法满足 Consumer 自定义接收消息的需求,为了解决这个问题就需要使用 Direct exchange ,它会使消息转发到 Routing key 和 Binding key 完全匹配的队列,而这两者不匹配的消息都会被丢弃。

RabbitMQ --- Routing(路由)-LMLPHP

也可以用相同的 Binding key 绑定多个队列,看上去和 Fanout exchange 的作用一样。

RabbitMQ --- Routing(路由)-LMLPHP

完整示例

现在修改上一章的示例代码,使 Consumer 可以自定义接收奇数(odd)或偶数(even)消息:

using RabbitMQ.Client;
using System;
using System.Text; namespace Producer
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
HostName = "10.202.228.107",
UserName = "Tua",
Password = "Tua",
Port =
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare
(
exchange: "Tua",
type: ExchangeType.Direct//Direct交换机
);
for (int m = ; m < ; m++)
{
string marks = string.Empty;
for (int n = ; n <= m; n++)
{
marks += ">";
}
string routingKey = string.Empty;
if(marks.Length % != )
{
routingKey = "odd";//奇数
}
else
{
routingKey = "even";//偶数
}
string msg = "Mr.Tua" + marks + marks.Length + "s";
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish
(
exchange: "Tua",
routingKey: routingKey,//路由键
basicProperties: null,
body: body
);
Console.WriteLine("Producer sent message: {0}", msg);
}
Console.ReadLine();
}
}
}
}
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Linq;
using System.Text;
using System.Threading; namespace Consumer
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare
(
exchange: "Tua",
type: ExchangeType.Direct//Direct交换机
);
string queueName = channel.QueueDeclare().QueueName;
string[] bindingKeys = { "odd", "even" };
Random random = new Random();
int index = random.Next();
string bindingKey = bindingKeys[index];//随机生成绑定键
channel.QueueBind
(
queue: queueName,
exchange: "Tua",
routingKey: bindingKey//绑定键
);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var body = e.Body;
var msg = Encoding.UTF8.GetString(body);
int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count();
Console.WriteLine("Consumer received {0} message: {1}", bindingKey, msg);
Thread.Sleep(marks * );
Console.WriteLine("OK");
};
channel.BasicConsume
(
queue: queueName,
noAck: true,
consumer: consumer
);
Console.ReadLine();
}
}
}
}
}

RabbitMQ --- Routing(路由)-LMLPHP

04-17 16:52