问题描述
我正在尝试在一个非常简单的控制台应用程序上使用手动 ACK,但我无法让它工作.
I'm trying to use a manual ACK on a very simple console application, but I can't make it work.
在发件人,我有以下代码:
On the sender, I have the following code:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.ConfirmSelect();
channel.BasicAcks += (sender, e) =>
{
Console.Write("ACK received");
};
var properties = channel.CreateBasicProperties();
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
在接收器上,我有以下代码:
On the receiver I have the following code:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
channel.ConfirmSelect();
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue",
noAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
我期望的是,当我在接收器上调用 channel.BasicAck()
时,发送器上的事件 BasicAcks
会被触发,但是当消息在 consumer.Received
之前传递给客户端.
What I expect is that the event BasicAcks
on the sender is fired when I call channel.BasicAck()
on the receiver, but that event is being fired when the message is delivered to the client, before consumer.Received
.
我期望的是正确的行为还是我遗漏了什么?
Is what I'm expecting the correct behavior or am I missing something?
推荐答案
您的期望不正确.BasicAcks
是关于发布者确认,而不是关于接收者的确认.因此,您向代理发布消息,broker(因此,RabbitMQ 本身)将在处理此消息时(例如 - 将其写入磁盘以获取持久消息时)确认或确认(否定确认)您, 或当 in 将其放入队列中).请注意,这里不涉及接收者 - 完全在发布者和 RabbitMQ 之间.
Your expectation is not correct. BasicAcks
is about publisher confirms, not about ack from receiver. So you publish a message to broker and broker (so, RabbitMQ itself) will ack or nack (negative acknowledge) you when it handles this message (for example - when it will write it to disk for persistent messages, or when in puts it in queue). Note that no receiver is involved here - it's entirely between publisher and RabbitMQ.
现在,当您在接收方确认消息时 - 这再次仅在接收方和 RabbitMQ 之间 - 您告诉兔子该消息已被处理并且可以安全地删除.这样做是为了处理接收器在处理过程中崩溃的情况 - 然后 rabbit 将能够将此消息传递给下一个接收器(如果有).
Now when you Ack message at receiver - that's again only between receiver and RabbitMQ - you tell rabbit that message is processed and can be safely deleted. This is done to handle situations when receiver crashes during processing - then rabbit will be able to deliver this message to the next receiver (if any).
请注意,此类架构的全部目的是将发布者和接收者分开 - 他们不应相互依赖.
Note that the whole purpose of such arcitecture is to separate publishers and receivers - they should not be dependent on each other.
如果您有一个接收器(可以有多个)并且您想确保它处理您的消息 - 使用 RPC 模式:发送消息并等待来自该接收器的另一条消息.
If you have one receiver (there can be many) and you want to ensure it processed your message - use RPC pattern: send message and wait for another message back from this receiver.
这篇关于c# 客户端上的 RabbitMQ 手动 ACK的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!