问题描述
我在示例项目中使用了 MassTransit 和 RabbitMQ.我只是尝试发布一条消息,然后由 2 个独立的消费者使用它.但每次只有一名消费者随机开火!有什么问题?
这是发送方方法
await _publishEndpoint.Publish(new ConsultantFinishedEvent(50));
Publisher 中应用程序启动中的此配置:
#region 服务总线配置public static IServiceCollection AddEventBus(这个IServiceCollection服务,IConfiguration配置){services.AddMassTransit(x =>{RegisterMessageConsumers(x);RegisterRequestClients(x);x.UsingRabbitMq((context, cfg) =>{cfg.ConfigureEndpoints(上下文);cfg.Host(configuration[EventBusConnection"], h =>{h.用户名(配置[EventBusUserName"]);h.Password(configuration[EventBusPassword"]);});});});services.AddMassTransitHostedService();退货服务;}私有静态无效 RegisterMessageConsumers(IServiceCollectionBusConfigurator 服务){//service.AddConsumer();}#endregion
这是简单的消费者:
公共类ConsultantFinishedEventConsumer:IConsumer{公共任务消费(ConsumeContext上下文){int xx = 0;返回空;}}
还有我用于所有消费者初创公司的这个配置:
#region 服务总线public static IServiceCollection AddEventBus(这个IServiceCollection服务,IConfiguration配置){services.AddMassTransit(x =>{RegisterMessageConsumers(x);RegisterRequestClients(x);x.UsingRabbitMq((context, cfg) =>{cfg.ConfigureEndpoints(上下文);cfg.Host(configuration[EventBusConnection"], h =>{h.用户名(配置[EventBusUserName"]);h.Password(configuration[EventBusPassword"]);});//注册消费者cfg.ReceiveEndpoint(PatintQueue", ep =>{ep.PrefetchCount = 16;});});});services.AddMassTransitHostedService();退货服务;}私有静态无效 RegisterMessageConsumers(IServiceCollectionBusConfigurator 服务){service.AddConsumer();}
首先,您的发布者配置顺序需要稍作调整:
public static IServiceCollection AddEventBus(这个IServiceCollection服务,IConfiguration配置){services.AddMassTransit(x =>{RegisterMessageConsumers(x);RegisterRequestClients(x);x.UsingRabbitMq((context, cfg) =>{cfg.Host(configuration[EventBusConnection"], h =>{h.用户名(配置[EventBusUserName"]);h.Password(configuration[EventBusPassword"]);});cfg.ConfigureEndpoints(上下文);});});}
注意ConfigureEndpoints
应该在配置闭包的末尾
此外,您应该不要在异步方法中将 null
作为 Task
返回.
public class ConsultantFinishedEventConsumer : IConsumer{公共任务消费(ConsumeContext上下文){int xx = 0;返回 Task.CompletedTask;}}
最后,您在消费者方面的配置也需要调整.
public static IServiceCollection AddEventBus(这个IServiceCollection服务,IConfiguration配置){services.AddMassTransit(x =>{RegisterMessageConsumers(x);RegisterRequestClients(x);x.UsingRabbitMq((context, cfg) =>{cfg.Host(configuration[EventBusConnection"], h =>{h.用户名(配置[EventBusUserName"]);h.Password(configuration[EventBusPassword"]);});cfg.ConfigureEndpoints(上下文);});});}
默认情况下,MassTransit 将在消费者实例之间进行负载平衡.如果您想在多个实例中使用同一个使用者,您可以通过为使用者提供唯一的 instanceId 来实现.或者,您可以简单地指定仅在总线实例运行时才使用唯一端点来使用事件.
配置唯一的持久化 instanceId:
private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service){service.AddConsumer().端点(e =>{e.Name = "PatientQueue";e.InstanceId = 某些唯一值";e.PrefetchCount = 16;});}
将消费者配置为临时的,以便在实例退出时自动删除:
private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service){service.AddConsumer().端点(e =>{e.Name = "PatientQueue";e.InstanceId = 某些唯一值";e.PrefetchCount = 16;e.Temporary = true;});}
I use MassTransit and RabbitMQ in the sample project. I simply try to publish a message and then consume it by 2 separated Consumers. but each time just one of the consumers fired randomly! What is the problem?
this is the sender method
await _publishEndpoint.Publish(new ConsultantFinishedEvent(50));
this config in application startup in Publisher:
#region Service Bus Config
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(x =>
{
RegisterMessageConsumers(x);
RegisterRequestClients(x);
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
cfg.Host(configuration["EventBusConnection"], h =>
{
h.Username(configuration["EventBusUserName"]);
h.Password(configuration["EventBusPassword"]);
});
});
});
services.AddMassTransitHostedService();
return services;
}
private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service)
{
//service.AddConsumer<OurConsumerClass>();
}
#endregion
This is simple consumers:
public class ConsultantFinishedEventConsumer : IConsumer<ConsultantFinishedEvent>
{
public Task Consume(ConsumeContext<ConsultantFinishedEvent> context)
{
int xx = 0;
return null;
}
}
and also this config that I use for all Consumers startups :
#region Service Bus
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(x =>
{
RegisterMessageConsumers(x);
RegisterRequestClients(x);
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
cfg.Host(configuration["EventBusConnection"], h =>
{
h.Username(configuration["EventBusUserName"]);
h.Password(configuration["EventBusPassword"]);
});
//Register Consumers
cfg.ReceiveEndpoint("PatintQueue", ep =>
{
ep.PrefetchCount = 16;
});
});
});
services.AddMassTransitHostedService();
return services;
}
private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service)
{
service.AddConsumer<ConsultantFinishedEventConsumer>();
}
First off, your publisher configuration order needs a little adjustment:
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(x =>
{
RegisterMessageConsumers(x);
RegisterRequestClients(x);
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(configuration["EventBusConnection"], h =>
{
h.Username(configuration["EventBusUserName"]);
h.Password(configuration["EventBusPassword"]);
});
cfg.ConfigureEndpoints(context);
});
});
}
Additionally, you should not return null
as a Task
in an async method.
public class ConsultantFinishedEventConsumer : IConsumer<ConsultantFinishedEvent>
{
public Task Consume(ConsumeContext<ConsultantFinishedEvent> context)
{
int xx = 0;
return Task.CompletedTask;
}
}
Finally, your configuration on your consumer side needs adjustment as well.
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(x =>
{
RegisterMessageConsumers(x);
RegisterRequestClients(x);
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(configuration["EventBusConnection"], h =>
{
h.Username(configuration["EventBusUserName"]);
h.Password(configuration["EventBusPassword"]);
});
cfg.ConfigureEndpoints(context);
});
});
}
MassTransit will load balance across consumer instances by default. If you want to use the same consumer across multiple instances, you can do so by giving the consumer a unique instanceId. Alternatively, you could simply specify that a unique endpoint should be used to consume the event only when the bus instance is running.
To configure a unique persistent instanceId:
private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service)
{
service.AddConsumer<ConsultantFinishedEventConsumer>()
.Endpoint(e =>
{
e.Name = "PatientQueue";
e.InstanceId = "some-unique-value";
e.PrefetchCount = 16;
});
}
To configure the consumer as temporary, so it's automatically removed when the instance exits:
private static void RegisterMessageConsumers(IServiceCollectionBusConfigurator service)
{
service.AddConsumer<ConsultantFinishedEventConsumer>()
.Endpoint(e =>
{
e.Name = "PatientQueue";
e.InstanceId = "some-unique-value";
e.PrefetchCount = 16;
e.Temporary = true;
});
}
这篇关于每次我发布消息时,只有一个消费者随机触发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!