本文介绍了为什么我的MassTransit Fault使用者没有被呼叫?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用MassTransit 7.1.4.0和RabbitMQ 3.8.7.

I am using MassTransit 7.1.4.0 together with RabbitMQ 3.8.7.

我有一个消费者,它只抛出一个异常,而我有一个错误消费者,它是 Console.WriteLine 发生故障的事实.

I have a consumer which simply throws an exception, and I have a fault consumer which Console.WriteLine the fact that a fault occurred.

不调用故障使用者.我还尝试使用错误的使用者定义.也是行不通的.

The fault consumer is not called. I also tried using a fault consumer definition. Also that does not work.

下面是整个程序.

    using System;
    using System.Threading.Tasks;
    using GreenPipes;
    using MassTransit;
    using MassTransit.ConsumeConfigurators;
    using MassTransit.Definition;
    using Microsoft.Extensions.DependencyInjection;

    namespace FaultConsumer
    {
      class Program
      {
        static async Task Main(string[] args)
        {
          Console.WriteLine("Hello Fault Consumer!");

          var services = new ServiceCollection();

          services.AddMassTransit(x =>
          {

            x.AddConsumer<MyConsumer>(typeof(MyConsumerDefinition));
            x.AddConsumer<MyFaultConsumer>();//typeof(MyFaultConsumerDefinition));
            x.SetKebabCaseEndpointNameFormatter();
            x.UsingRabbitMq((context, cfg) =>
            {
              cfg.Host("rabbitmq://localhost");
              cfg.ConfigureEndpoints(context);
            });

          });

          var serviceProvider = services.BuildServiceProvider();
          var bus = serviceProvider.GetRequiredService<IBusControl>();

          await bus.StartAsync();
          await bus.Publish(new SubmitOrder() { DateTimeStamp = DateTime.Now });

          Console.WriteLine("Press any key to exit");
          await Task.Run(() => Console.ReadKey());

          await bus.StopAsync();

        }
      }

      class SubmitOrder
      {
        public DateTime DateTimeStamp { get; set; }
      }

      class MyConsumer : IConsumer<SubmitOrder>
      {
        public async Task Consume(ConsumeContext<SubmitOrder> context)
        {
          Console.WriteLine($"Attempting to consume {context.GetRetryCount()} {context.GetRetryAttempt()}");
          throw new Exception(context.GetRetryCount().ToString());
        }
      }

      class MyConsumerDefinition : ConsumerDefinition<MyConsumer>
      {
        public MyConsumerDefinition()
        {
          EndpointName = "order-service-202102081221";

          ConcurrentMessageLimit = 8;
        }

        protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
              IConsumerConfigurator<MyConsumer> consumerConfigurator)
        {
          //endpointConfigurator.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
          endpointConfigurator.UseMessageRetry(r => r.Immediate(5));

          endpointConfigurator.UseInMemoryOutbox();
        }
      }

      class MyFaultConsumer : IConsumer<Fault<MyConsumer>>
      {
        public async Task Consume(ConsumeContext<Fault<MyConsumer>> context)
        {
          Console.WriteLine("Fault");
          await Task.CompletedTask;
        }

      }

      //   class MyFaultConsumerDefinition : ConsumerDefinition<MyFaultConsumer>
      //   {
      //     public MyFaultConsumerDefinition()
      //     {
      //       // override the default endpoint name
      //       EndpointName = "order-service-faults-202102081221";

      //       // limit the number of messages consumed concurrently
      //       // this applies to the consumer only, not the endpoint
      //       ConcurrentMessageLimit = 8;
      //     }

      //     protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
      //           IConsumerConfigurator<MyFaultConsumer> consumerConfigurator)
      //     {
      //       // configure message retry with millisecond intervals
      //       endpointConfigurator.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));

      //       // use the outbox to prevent duplicate events from being published
      //       endpointConfigurator.UseInMemoryOutbox();
      //     }
      //   }
    }

推荐答案

故障是根据消息类型而非消费者类型发布的.

Faults are published based on message type, not consumer type.

您的错误使用者应该使用 Fault< SubmitOrder> .

Your fault consumer should be consuming Fault<SubmitOrder>.

class MyFaultConsumer : 
    IConsumer<Fault<SubmitOrder>>
{
    public Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
    {
        Console.WriteLine("Fault");

        return Task.CompletedTask;
    }
}

这篇关于为什么我的MassTransit Fault使用者没有被呼叫?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 20:22