本文介绍了RabbitMQ-为什么错误的订户会收到已发布的消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两项服务,经理收藏家.

  1. 管理器已通过routingKey user.collected订阅了队列COLLECTED_USER,并调用了UserCollected处理程序.
  2. 收集器已通过routingKey user.collect订阅了队列COLLECT_USER,并调用了CollectUser处理程序.
  1. Manager is subscribed to Queue COLLECTED_USER with routingKey user.collected and invokes a UserCollected handler.
  2. Collector is subscribed to Queue COLLECT_USER with routingKey user.collect and invokes a CollectUser handler.

可以有多个收集器,因此我已将exclusive设置为false(有关代码,请参见下文).

There can be multiple collectors so I have set exclusive to false (see below for code).

还有其他服务可以监听事件,例如

There are also other services that listen for events like

  • user.created
  • user.updated
  • user.deleted
  • user.created,
  • user.updated,
  • user.deleted

此外,还有一些服务可以监听更常见的事件,例如

In addition there are services that listen for more general events like

  • #.created
  • user.#
  • #.created
  • user.#

以此类推.

所以我正在使用topic交换器.

So I am using a topic exchange.

| exchange | type  | routingKey     | queueName      |
| -------- | ----- | -------------- | -------------  |
| MY_APP   | topic | user.collect   | COLLECT_USER   |
| MY_APP   | topic | user.collected | COLLECTED_USER |

会发生什么:

  1. 管理器发布带有routingKey user.collect
  2. 的消息
  3. 收集器获取user.collect消息并调用CollectUser处理程序
  4. 收集器的 CollectUser处理程序确实起作用,然后使用routingKey user.collected
  5. 发布消息
  6. 管理器获取user.collected消息并调用UserCollected处理程序
  1. Manager publishes message with routingKey user.collect
  2. Collector gets the user.collect message and invokes a CollectUser handler
  3. Collector's CollectUser handler does work, then publishes a message with routingKey user.collected
  4. Manager gets the user.collected message and invokes the UserCollected handler

实际发生的事情:

  1. 管理器发布带有routingKey user.collect的消息(正确)
  2. 收集器获取user.collect消息并调用CollectUser处理程序(正确)
  3. 管理器还会获取user.collect消息,并使用错误的数据调用UserCollected处理程序. (错误)
  4. 收集器的 CollectUser处理程序确实起作用,然后发布带有routingKey user.collected(正确)的消息
  5. 管理器获取user.collected消息并调用UserCollected处理程序(正确)
  1. Manager publishes message with routingKey user.collect (correct)
  2. Collector gets the user.collect message and invokes a CollectUser handler (correct)
  3. Manager also gets the user.collect message and invokes the UserCollected handler with the wrong data. (wrong)
  4. Collector's CollectUser handler does work, then publishes a message with routingKey user.collected (correct)
  5. Manager gets the user.collected message and invokes the UserCollected handler (correct)

我的问题

管理器为什么会得到user.collect消息,原因是:

My Question

Why does the Manager get the user.collect message, given:

  1. 它在COLLECTED_USER队列而不是COLLECT_USER队列上侦听,并且
  2. 正在COLLECT_USER队列上监听的 Collector 已经处理了该消息.
  1. It's listening on the COLLECTED_USER queue not the COLLECT_USER queue, and
  2. The Collector, which is listening on the COLLECT_USER queue, has already handled the message.

实施细节

我按如下方式创建订阅者和发布者(针对相关性进行了修剪)

Implementation details

I create the subscribers and publishers as follows (trimmed for relevance)

根据AMQP url和参数urlexchangetyperoutingKeyqueueNamehandler

given the AMQP url and params url, exchange, type, routingKey, queueName and handler

const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)

创建发布者

鉴于AMQP url和参数urlexchangetype

const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })

发布

给出channel和参数exchangeroutingKeymessage

await channel.publish(exchange, routingKey, message)

注意

此问题是.

Note

This question is a follow-on from RabbitMQ — Why are my Routing Keys being ignored when using topic exchange.

推荐答案

我终于弄清了我的问题所在.肮脏的交流.在尝试此操作时,我无意间添加了一个交换,该交换将消息路由到错误的队列,这引起了我的困惑.

I finally worked out what my problem was. A dirty exchange. While experimenting with this I'd inadvertently added an exchange that was routing messages to the wrong queue, and this was causing my confusion.

要解决此问题,我启动了RabbitMQ管理员GUI并删除了所有队列,然后让我的代码创建所需的队列.上面列出的代码没有问题.

To fix it I fired up the RabbitMQ admin GUI and deleted all of the queues and let my code create the ones it needed. There was no issue with the code as outlined above.

这篇关于RabbitMQ-为什么错误的订户会收到已发布的消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 03:53