本文介绍了为什么我的MassTransit Fault使用者没有被呼叫?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在使用MassTransit 7.1.4.0和RabbitMQ 3.8.7.
I am using MassTransit 7.1.4.0 together with RabbitMQ 3.8.7.
我有一个消费者,它只抛出一个异常,而我有一个错误消费者,它是 Console.WriteLine
发生故障的事实.
I have a consumer which simply throws an exception, and I have a fault consumer which Console.WriteLine
the fact that a fault occurred.
不调用故障使用者.我还尝试使用错误的使用者定义.也是行不通的.
The fault consumer is not called. I also tried using a fault consumer definition. Also that does not work.
下面是整个程序.
using System;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.ConsumeConfigurators;
using MassTransit.Definition;
using Microsoft.Extensions.DependencyInjection;
namespace FaultConsumer
{
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Hello Fault Consumer!");
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>(typeof(MyConsumerDefinition));
x.AddConsumer<MyFaultConsumer>();//typeof(MyFaultConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(context);
});
});
var serviceProvider = services.BuildServiceProvider();
var bus = serviceProvider.GetRequiredService<IBusControl>();
await bus.StartAsync();
await bus.Publish(new SubmitOrder() { DateTimeStamp = DateTime.Now });
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await bus.StopAsync();
}
}
class SubmitOrder
{
public DateTime DateTimeStamp { get; set; }
}
class MyConsumer : IConsumer<SubmitOrder>
{
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
Console.WriteLine($"Attempting to consume {context.GetRetryCount()} {context.GetRetryAttempt()}");
throw new Exception(context.GetRetryCount().ToString());
}
}
class MyConsumerDefinition : ConsumerDefinition<MyConsumer>
{
public MyConsumerDefinition()
{
EndpointName = "order-service-202102081221";
ConcurrentMessageLimit = 8;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<MyConsumer> consumerConfigurator)
{
//endpointConfigurator.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
endpointConfigurator.UseMessageRetry(r => r.Immediate(5));
endpointConfigurator.UseInMemoryOutbox();
}
}
class MyFaultConsumer : IConsumer<Fault<MyConsumer>>
{
public async Task Consume(ConsumeContext<Fault<MyConsumer>> context)
{
Console.WriteLine("Fault");
await Task.CompletedTask;
}
}
// class MyFaultConsumerDefinition : ConsumerDefinition<MyFaultConsumer>
// {
// public MyFaultConsumerDefinition()
// {
// // override the default endpoint name
// EndpointName = "order-service-faults-202102081221";
// // limit the number of messages consumed concurrently
// // this applies to the consumer only, not the endpoint
// ConcurrentMessageLimit = 8;
// }
// protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
// IConsumerConfigurator<MyFaultConsumer> consumerConfigurator)
// {
// // configure message retry with millisecond intervals
// endpointConfigurator.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
// // use the outbox to prevent duplicate events from being published
// endpointConfigurator.UseInMemoryOutbox();
// }
// }
}
推荐答案
故障是根据消息类型而非消费者类型发布的.
Faults are published based on message type, not consumer type.
您的错误使用者应该使用 Fault< SubmitOrder>
.
Your fault consumer should be consuming Fault<SubmitOrder>
.
class MyFaultConsumer :
IConsumer<Fault<SubmitOrder>>
{
public Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
{
Console.WriteLine("Fault");
return Task.CompletedTask;
}
}
这篇关于为什么我的MassTransit Fault使用者没有被呼叫?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!