这是我正在尝试做的事情:
出消息
对消息进行操作
如果操作失败,则将消息放回队列
如果操作成功,请确认消息
我现在的问题是,如果操作失败,则不会重新排队该消息,而是保持未确认状态。如果进入RabbitMQ Web配置界面,即使basic.Nack已被跳过,但我仍看到消息被标记为未确认。
var delivery = subscription.Next();
var messageBody = delivery.Body;
try
{
action.Invoke(messageBody);
subscription.Ack(delivery);
}
catch (Exception ex)
{
subscription.Model.BasicNack(delivery.DeliveryTag, false, true);
throw ex;
}
更新:
因此,我注意到消息从“就绪”变为“未确认”的速度非常快。一个速率快得多,然后我实际上调用了Subscriber.Next(),就好像.Net客户端将所有消息缓存在内存中(我的应用程序的内存占用量实际上增长得很快),并从内存中处理这些消息并随后发送Ack(),取消标记来自未确认的消息。
更新2:
好像排空队列真的很快,是因为我没有在Model上设置BasicQos。以下内容修复了所有问题。 Basic.Nack()似乎仍然无法工作:
Model.BasicQos(0,1,假)
最佳答案
我怀疑您正在使用:channel.BasicConsume(your_queue_name, false, consumer);
检索消息。
我使用RabbitMQ 3.2.4服务器和客户端运行了多个测试。我无法使channel.BasickAck(...)
或channel.BasicNack(...)
正常工作。
也就是说,我能够获得预期的Ack |使用时出现小问题:BasicGetResult result = channel.BasicGet(your_queue_name, false);
因此,您可能需要考虑使用其他检索方法来获取消息。我认识到Consume&Dequeue是“首选”方法,但在我的情况下它们不起作用。我想一次公平的,一次确认的派遣。使用BasicGet是实现此目标的唯一方法。
这种方法的缺点是您可能会丢失与subscription.Next()
一起使用的客户端事件迭代器。
如果我不得不大胆猜测,我认为有关本地Queue集合的某些事情会破坏该通道提供确认的能力。值得指出的是,使用new QueueingBasicConsumer(channel);
创建使用者时会触发调用以从服务器队列中预提取事件。使用者的Queue只是一个SharedQueue<RabbitMQ.Client.Events.BasicDeliverEventArgs>
,而SharedQueue只是IEnumerable的扩展。
还请记住,提取消息的相同通道也需要提供Ack |没关系您无法确认|取消来自其他渠道的消息。或者至少我还没有弄清楚该怎么做,也没有其他人。这是一个问题,如果您将RabbitMQ对象包装在using语句中(这样就不会留下网络资源),并且您需要经过很长的流程才能安全地进行确认。
此SO Answer安排了一个体面的工作流,以解决可能的现实情况,即您的提货渠道将不再是发送Ack | Ack的渠道。没关系诀窍是设置TTL,而不会打扰发送Nack-只让新消息过期并自动重新排队。
关于c# - 基本.Nack未处理,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/21244281/