问题描述
我有一个带有标准服务栈RabbitMQ抽象的服务栈服务。消息队列是为我的MyRequest型自动创建的,并且我已经设置了一个服务方法来处理来自MyRequest.In Queue的请求
我预计如果我在此方法中抛出异常,消息将被放入死信队列。但是,它们只是从入站队列中删除,不会进入死信队列
public class MyOtherService : AsmServiceBase
{
public void Any(MyRequest request)
{
try
{
throw new InvalidOperationException("this is an invalid operation");
}
catch (Exception ex)
{
Console.Write("exceptions");
throw;
}
}
}
[Route("/api/myrequest", "POST")]
public class MyRequest : HeliosRequestBase<MyResponse>
{
public string Content { get; set; }
}
public class MyResponse : HeliosResponseBase
{
}
这是AppHost中的代码,它将myRequest消息路由到我的服务方法:
RabbitMqServer mqServer = RabbitMqServerFactory
.GetRabbitMqServer(m_ServiceDiscovery).Result;
mqServer.RegisterHandler<MyRequest>(ExecuteMessage);
mqServer.Start();
谁能解释一下我做错了什么?
如果我将RegisterHandler行更改为这样
mqServer.RegisterHandler<MyRequest>(
x =>
{
try
{
object resp = ExecuteMessage(x);
//if I throw an exception here it will go to the DLQ
return resp;
}
catch (Exception ex)
{
throw;
}
}
);
我可以看到异常包含在从ExecuteMessage返回的对象中。我想也许我需要把它重新扔出去?还是我做错了什么?
更新2-发布消息的方式是问题在Mythz示例的帮助下,我已经能够确定该问题是由消息发布到队列的方式引起的。
我们这样调用发布方法:
public void PublishOneWay<TRequest>(TRequest request, string queueName, int timeoutInMilliseconds)
{
if (m_Disposed)
throw new ObjectDisposedException("The service client has been disposed and cannot be used.");
Message message = MakeMessage(request);
//Updated to just send one-way messages - why bother enquing if we are never going to capture the callback/block on this thread?
//MessagesToProcess.Add(new Tuple<string, Message, MessageDirection>(queueName, message, MessageDirection.OneWay));
MqClient.SendOneWay(queueName, message); //will work if I send request instead of message
}
问题似乎出在我们构造要发送的新消息对象的代码中。
private Message MakeMessage<TRequest>(TRequest request)
{
Guid messageId = Guid.NewGuid();
Message newMessage = new Message<TRequest>(request) { ReplyTo = ResponseQueueName, Id = messageId };
newMessage.Meta = new Dictionary<string, string>();
newMessage.Meta.Add(ServiceContextConstants.TrackingIdentifier, AsmServiceContext.TrackingIdentifierData?.Value);
newMessage.Meta.Add(ServiceContextConstants.SessionContext, AsmServiceContext.SessionContextData?.Value);
return newMessage;
}
如果我发送原始的请求对象,则如您所料,异常将被发送到DLQ。如果我没有设置Message对象的ReplyTo属性,它也会起作用。
我想知道,我需要在Message对象上设置什么属性来创建此行为吗?我可能无法设置ReplyTo属性,但不确定结果会更改多少代码。推荐答案
编辑
如果您使用的是显式ReplyTo
地址,则所有错误都将发送到该ReplyTo地址,而不是DLQ。如果您的响应DTO具有ResponseStatus
属性,则异常将填充到响应DTO的ResponseStatus中,否则您可以使用泛型ErrorResponse
DTO读取异常信息,例如:
var requestMsg = new Message<ThrowVoid>(request)
{
ReplyTo = $"mq:{request.GetType().Name}.replyto"
};
mqProducer.Publish(requestMsg);
var msg = mqClient.Get<ErrorResponse>(requestMsg.ReplyTo, null);
mqClient.Ack(msg);
msg.GetBody().ResponseStatus.ErrorCode //= InvalidOperationException
我无法仅使用正常的ServiceStack类再现此问题,如this commit所示,它在所有MQ服务器中都能正常工作。
我已经提取了使用ServiceStack's RabbitMQ Server的代码如下:
public class ThrowVoid
{
public string Content { get; set; }
}
public class TestMqService : Service
{
public void Any(ThrowVoid request)
{
throw new InvalidOperationException("this is an invalid operation");
}
}
public class AppHost : AppSelfHostBase
{
public AppHost(Func<IMessageService> createMqServerFn)
: base(nameof(TestMqService), typeof(TestMqService).Assembly) {}
public override void Configure(Container container)
{
var mqServer = new RabbitMqServer { RetryCount = 1 };
container.Register<IMessageService>(c => mqServer);
mqServer.RegisterHandler<ThrowVoid>(ExecuteMessage);
AfterInitCallbacks.Add(appHost => mqServer.Start());
}
}
邮件发送位置:
using (var mqFactory = appHost.TryResolve<IMessageFactory>())
{
var request = new ThrowVoid { Content = "Test" };
using (var mqProducer = mqFactory.CreateMessageProducer())
using (var mqClient = mqFactory.CreateMessageQueueClient())
{
mqProducer.Publish(request);
var msg = mqClient.Get<ThrowVoid>(QueueNames<ThrowVoid>.Dlq, null);
mqClient.Ack(msg);
Assert.That(msg.Error.ErrorCode, Is.EqualTo("InvalidOperationException"));
}
}
不可能知道您的问题是什么,很多实现隐藏在您自己的自定义类后面,但我建议从上面这样一个确实有效的小独立示例开始,然后慢慢添加您的自定义代码,以确定导致问题的原因。
虽然我会recommend against using inheritance to hide properties in your DTOs,但DTO是用于定义服务契约的声明性模型,将它们的属性隐藏在继承后面会使任何阅读它的人更难确切地知道每个服务接受和返回什么。继承对于定义可重用的功能很有用,但DTO是声明性的,不应该有实现,因此您基本上是在添加不必要的耦合来隐藏显式的服务契约,这使得通过查看服务契约来推断每个服务的功能变得更加困难。这篇关于我的服务堆栈服务中的异常不会将消息移动到死信队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!