更多的问题#
Direct Exchange帮助我们解决了分类发布与订阅消息的问题,但是Direct Exchange的问题是,它所使用的routingKey是一个简单字符串,这决定了它只能按照一个条件进行分类。
比如RabbitMQ学习笔记(四)Routing中的列子,我们是按照新闻的类型分类的,分为game, sport, music。
如果game下面有ff7,ff8的子分类,sport下面有soccer, basketball的分类。现在我们要求消费者程序接收game下面ff7子类别的所有新闻,sport下面的soccer子类别的所有新闻 ,以及所有的music新闻,单单使用Direct Exchange不能帮助我们解决这个问题。
Topic Exchange#
Topic Exchange和Direct Exchange的用法基本一样。
他们的区别是, Topic Exchange在指定routingKey的时候,必须使用一个单词列表,每个单词之间用”.”间隔, 例如game.ff7, sport.soccer, Topic Exchange使用的routingKey的最大长度是256字节。
而且在Topic Exchange的routingKey中可以使用2种通配符
- * 匹配任何一个单词, 例* 表示所有的游戏新闻,game.ff7, game.ff8可以匹配,但是game.ff7.music就不能匹配,因为只能匹配一个单词。
- # 匹配零个或多个单词# 表示也表示游戏所有的新闻,game.ff7,game.ff8可以匹配,game.ff7.music也可以匹配。
你甚至可以这样写*.*.rabbit,表示所有3级新闻类别兔子的新闻,zoo.grass.rabbit和zoo.cage.rabbit都可以匹配
修改代码#
Send#
- 使用Topic Exchange
- 发送消息需要指定新闻分类 ,命令格式“dotnet run [分类路径] [消息信息]”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | static void Main( string [] args) { var messageType = args[0]; var factory = new ConnectionFactory() { HostName = "localhost" }; using ( var connection = factory.CreateConnection()) { using ( var channel = connection.CreateModel()) { channel.ExchangeDeclare( "topicExchange" , "topic" ); string message = $ "{args[0]} news: {args[1]}" ; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "topicExchange" , routingKey: messageType, basicProperties: null , body: body ); Console.WriteLine( "[x] Sent {0}" , message); } } } |
Receive#
- 使用Topic Exchange
- 启动程序时,需要设置希望订阅的新闻分类, 命令格式“dotnet run [新闻分类1] [新闻分类2] ….”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | static void Main( string [] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using ( var connection = factory.CreateConnection()) { using ( var channel = connection.CreateModel()) { channel.ExchangeDeclare( "topicExchange" , "topic" ); var queueName = channel.QueueDeclare().QueueName; foreach ( var type in args) { channel.QueueBind(queue: queueName, exchange: "topicExchange" , routingKey: type); } var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine( "[x] Received {0}" , message); }; channel.BasicConsume(queue: queueName, autoAck: true , consumer: consumer); Console.Read(); } } } |
最终效果#
- 启动一个Send程序,2个Receive程序
- 一个Receive程序订阅ff7下面的所有新闻(包含ff7下面所有的子类别新闻),soccer下面的所有新闻(包含soccer下面的所有子类别新闻),music下的所有新闻(不包括子类别新闻)
- 一个Receive程序订阅ff8下面的所有新闻(包含ff8下面所有的子类别新闻),basketball下面的所有新闻(包含basketball下面的所有子类别新闻)。