问题描述
当我在rpc-pattern中声明一个临时回复队列为独占(例如匿名队列(exclusive=true, autodelete=true))时,响应消息无法发布到指定的回复队列(例如message.replyTo="amq.gen-Jg_tv8QYxtEQhq0tF30vAA") 因为 RabbitMqProducer.PublishMessage() 尝试使用不同的参数(exclusive=false)重新声明队列,这可以理解地导致错误.
When I declare a temporary reply queue to be exclusive (e.g. anonymous queue (exclusive=true, autodelete=true) in rpc-pattern), the response message cannot be posted to the specified reply queue (e.g. message.replyTo="amq.gen-Jg_tv8QYxtEQhq0tF30vAA") because RabbitMqProducer.PublishMessage() tries to redeclare the queue with different parameters (exclusive=false), which understandably results in an error.
不幸的是,在 RabbitMqProducer.PublishMessage() 中对 channel.RegisterQueue(queueName) 的错误调用似乎在传入队列中取消了请求消息,因此,当 ServiceStack.Messaging.MessageHandler.DefaultInExceptionHandler 尝试确认请求消息(以将其从传入队列中删除),消息只会停留在传入队列的顶部并重新处理.这个过程无限重复,每次迭代都会产生一个 dlq 消息,这会慢慢填满 dlq.
Unfortunately, the erroneous call to channel.RegisterQueue(queueName) in RabbitMqProducer.PublishMessage() seems to nack the request message in the incoming queue so that, when ServiceStack.Messaging.MessageHandler.DefaultInExceptionHandler tries to acknowlege the request message (to remove it from the incoming queue), the message just stays on top of the incoming queue and gets processed all over again. This procedure repeats indefinitely and results in one dlq-message per iteration which slowly fills up the dlq.
我在想,
- 如果ServiceStack处理了这种情况,当ServiceStack.RabbitMq.RabbitMqProducer无法正确声明响应队列时
- 如果 ServiceStack.RabbitMq.RabbitMqProducer 总是在发布响应之前声明响应队列
- 如果最好有一些配置标志来省略所有交换和队列声明调用(在第一次初始化之外).RabbitMqProducer 只会假设每个队列/交换都已正确设置并发布消息.
(目前我们的客户端只是将其响应队列声明为exclusive=false 并且一切正常.但我真的很想使用rabbitmq 的内置临时队列.)
(At the moment our client just declares its response queue to be exclusive=false and everything works fine. But I'd really like to use rabbitmq's built-in temporary queues.)
MQ-Client 代码,需要简单的SayHello"服务:
MQ-Client Code, requires simple "SayHello" service:
const string INQ_QUEUE_NAME = "mq:SayHello.inq";
const string EXCHANGE_NAME="mx.servicestack";
var factory = new ConnectionFactory() { HostName = "192.168.179.110" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// Create temporary queue and setup bindings
// this works (because "mq:tmp:" stops RabbitMqProducer from redeclaring response queue)
string responseQueueName = "mq:tmp:SayHello_" + Guid.NewGuid().ToString() + ".inq";
channel.QueueDeclare(responseQueueName, false, false, true, null);
// this does NOT work (RabbitMqProducer tries to declare queue again => error):
//string responseQueueName = Guid.NewGuid().ToString() + ".inq";
//channel.QueueDeclare(responseQueueName, false, false, true, null);
// this does NOT work either (RabbitMqProducer tries to declare queue again => error)
//var responseQueueName = channel.QueueDeclare().QueueName;
// publish simple SayHello-Request to standard servicestack exchange ("mx.servicestack") with routing key "mq:SayHello.inq":
var props = channel.CreateBasicProperties();
props.ReplyTo = responseQueueName;
channel.BasicPublish(EXCHANGE_NAME, INQ_QUEUE_NAME, props, Encoding.UTF8.GetBytes("{\"ToName\": \"Chris\"}"));
// consume response from response queue
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(responseQueueName, true, consumer);
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
// print result: should be "Hello, Chris!"
Console.WriteLine(Encoding.UTF8.GetString(ea.Body));
}
}
当 RabbitMqProducer 不尝试声明队列时,一切似乎都正常,如下所示:
Everything seems to work fine when RabbitMqProducer does not try to declare the queues, like that:
public void PublishMessage(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body)
{
const bool MustDeclareQueue = false; // new config parameter??
try
{
if (MustDeclareQueue && !Queues.Contains(routingKey))
{
Channel.RegisterQueueByName(routingKey);
Queues = new HashSet<string>(Queues) { routingKey };
}
Channel.BasicPublish(exchange, routingKey, basicProperties, body);
}
catch (OperationInterruptedException ex)
{
if (ex.Is404())
{
Channel.RegisterExchangeByName(exchange);
Channel.BasicPublish(exchange, routingKey, basicProperties, body);
}
throw;
}
}
推荐答案
这个问题在 servicestack 的 v4.0.32 版本中得到了解决(在这个 提交).
The issue got adressed in servicestack's version v4.0.32 (fixed in this commit).
RabbitMqProducer 不再尝试重新声明临时队列,而是假设回复队列已经存在(这解决了我的问题.)
The RabbitMqProducer no longer tries to redeclare temporary queues and instead assumes that the reply queue already exist (which solves my problem.)
(无限循环的根本原因(发布响应消息时错误处理)可能仍然存在.)
(The underlying cause of the infinite loop (wrong error handling while publishing response message) probably still exists.)
示例
以下基本 mq-client(不使用 ServiceStackmq 客户端,而是直接依赖于 rabbitmq 的 .net-library;但它使用 ServiceStack.Text 进行序列化)可以执行通用 RPC:
The following basic mq-client (which does not use ServiceStackmq client and instead depends directly on rabbitmq's .net-library; it uses ServiceStack.Text for serialization though) can perform generic RPCs:
public class MqClient : IDisposable
{
ConnectionFactory factory = new ConnectionFactory()
{
HostName = "192.168.97.201",
UserName = "guest",
Password = "guest",
//VirtualHost = "test",
Port = AmqpTcpEndpoint.UseDefaultPort,
};
private IConnection connection;
private string exchangeName;
public MqClient(string defaultExchange)
{
this.exchangeName = defaultExchange;
this.connection = factory.CreateConnection();
}
public TResponse RpcCall<TResponse>(IReturn<TResponse> reqDto, string exchange = null)
{
using (var channel = connection.CreateModel())
{
string inq_queue_name = string.Format("mq:{0}.inq", reqDto.GetType().Name);
string responseQueueName = channel.QueueDeclare().QueueName;
var props = channel.CreateBasicProperties();
props.ReplyTo = responseQueueName;
var message = ServiceStack.Text.JsonSerializer.SerializeToString(reqDto);
channel.BasicPublish(exchange ?? this.exchangeName, inq_queue_name, props, UTF8Encoding.UTF8.GetBytes(message));
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(responseQueueName, true, consumer);
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
//channel.BasicAck(ea.DeliveryTag, false);
string response = UTF8Encoding.UTF8.GetString(ea.Body);
string responseType = ea.BasicProperties.Type;
Console.WriteLine(" [x] New Message of Type '{1}' Received:{2}{0}", response, responseType, Environment.NewLine);
return ServiceStack.Text.JsonSerializer.DeserializeFromString<TResponse>(response);
}
}
~MqClient()
{
this.Dispose();
}
public void Dispose()
{
if (connection != null)
{
this.connection.Dispose();
this.connection = null;
}
}
}
要点:
- 客户端声明匿名队列(=空队列名称)
channel.QueueDeclare()
- 服务器生成队列并返回队列名称(amq.gen*)
- 客户端将队列名称添加到消息属性中 (
props.ReplyTo = responseQueueName;
) - ServiceStack 自动向临时队列发送响应
- 客户端接收响应并反序列化
可以这样使用:
using (var mqClient = new MqClient("mx.servicestack"))
{
var pingResponse = mqClient.RpcCall<PingResponse>(new Ping { });
}
重要提示:您必须使用 servicestack 版本 4.0.32+.
Important: You've got to use servicestack version 4.0.32+.
这篇关于Servicestack RabbitMQ:当 RabbitMqProducer 无法以 RPC 模式重新声明临时队列时,无限循环会填满死信队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!