如果您使用相同的端点名称创建发布者和使用者,我们遇到了 MassTransit 丢失消息的情况。
注意下面的代码;如果我为消费者或发布者使用不同的端点名称(例如,发布者使用“rabbitmq://localhost/mtlossPublised”),那么该消息将同时计算发布和消费匹配;如果我使用相同的端点名称(如示例中所示),那么消耗的消息比发布的消息少。
这是预期的行为吗?或者我做错了什么,下面的工作示例代码。
using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MTMessageLoss
{
class Program
{
static void Main(string[] args)
{
var consumerBus = ServiceBusFactory.New(b =>
{
b.UseRabbitMq();
b.UseRabbitMqRouting();
b.ReceiveFrom("rabbitmq://localhost/mtloss");
});
var publisherBus = ServiceBusFactory.New(b =>
{
b.UseRabbitMq();
b.UseRabbitMqRouting();
b.ReceiveFrom("rabbitmq://localhost/mtloss");
});
consumerBus.SubscribeConsumer(() => new MessageConsumer());
for (int i = 0; i < 10; i++)
publisherBus.Publish(new SimpleMessage() { CorrelationId = Guid.NewGuid(), Message = string.Format("This is message {0}", i) });
Console.WriteLine("Press ENTER Key to see how many you consumed");
Console.ReadLine();
Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", MessageConsumer.Count);
Console.ReadLine();
consumerBus.Dispose();
publisherBus.Dispose();
}
}
public interface ISimpleMessage : CorrelatedBy<Guid>
{
string Message { get; }
}
public class SimpleMessage : ISimpleMessage
{
public Guid CorrelationId { get; set; }
public string Message { get; set; }
}
public class MessageConsumer : Consumes<ISimpleMessage>.All
{
public static int Count = 0;
public void Consume(ISimpleMessage message)
{
System.Threading.Interlocked.Increment(ref Count);
}
}
}
最佳答案
最重要的是,总线的每个实例都需要它自己的队列来读取。即使总线只是为了发布消息而存在。这只是 MassTransit 工作方式的要求。
http://masstransit.readthedocs.org/en/master/configuration/config_api.html#basic-options - 查看警告。
当两个总线实例共享同一个队列时,我们将行为保留为未定义。无论如何,这不是我们支持的条件。每个总线实例可能会向其他总线实例发送元数据,并且需要它自己的端点。这对 MSMQ 来说是一个更大的交易,所以也许我们可以让这个案例在 RabbitMQ 上工作——但这不是我们在这一点上花太多时间考虑的事情。
关于rabbitmq - MassTransit 丢失消息 - Rabbit MQ - 当发布者和消费者端点名称相同时,,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/12458175/