问题描述
我正在尝试使用 MassTransit
构建 Kafka 消费者我有这段代码
var services = new ServiceCollection();services.AddMassTransit(x =>{x.AddRider(骑手 =>{Rider.AddProducer(request", m => m.Message.RequestId);骑手.使用Kafka((上下文,k)=>{k.Host(本地主机:9092");});});});var provider = services.BuildServiceProvider();var producer = provider.GetRequiredService>();等待 producer.Produce(new Request(){RequestId = "abc123",RequestedAt = DateTime.UtcNow});
这是此处
但是当我尝试运行它时,出现此异常
未处理的异常.System.InvalidOperationException: 没有注册MassTransit.Registration.IBusInstance"类型的服务.
查看他们网站上的示例,我发现这可能与我尚未注册 RabbitMQ 的事实有关
x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
但我没有 RabbitMQ,我只在这种情况下使用 Kafka.
是否需要向其他一些消息代理注册总线才能生成到 Kafka?
来自文档:
随着 MassTransit v7 引入的 Riders,提供了一种将消息从任何来源传送到总线的新方法.车手随车一起配置,开车时上车.
要添加乘客,必须有一个巴士实例.如果您不需要具有持久传输的总线(例如 RabbitMQ),则可以使用内存传输.
var services = new ServiceCollection();services.AddMassTransit(x =>{x.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context));x.AddRider(骑手 =>{Rider.AddProducer(request", m => m.Message.RequestId);骑手.使用Kafka((上下文,k)=>{k.Host(本地主机:9092");});});});
巴士需要启动和停止,这也将启动/停止巴士上的所有乘客.您可以通过 IBusControl
执行此操作:
var provider = services.BuildServiceProvider();var busControl = provider.GetRequiredService();等待 busControl.StartAsync(cancellationToken);
或者,如果您使用的是 ASP.NET Core 通用主机,则添加 MassTransit 托管服务.
services.AddMassTransitHostedService();//在 MassTransit.AspNetCore 中
I'm trying to build a Kafka consumer using MassTransit
I have this piece of code
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddRider(rider =>
{
rider.AddProducer<string, Request>("request", m => m.Message.RequestId);
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
});
});
});
var provider = services.BuildServiceProvider();
var producer = provider.GetRequiredService<ITopicProducer<Request>>();
await producer.Produce(new Request()
{
RequestId = "abc123",
RequestedAt = DateTime.UtcNow
});
This is the simplest example of a producer from here
but when I try to run it, I get this exception
Unhandled exception. System.InvalidOperationException: No service for type 'MassTransit.Registration.IBusInstance' has been registered.
Looking at the example from their website, I see that it could be related to the fact that I haven't registered a RabbitMQ
x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
But I don't have a RabbitMQ, I only use Kafka in this scenario.
Is it necessary to register a bus with some other message broker in order to produce to Kafka?
From the documentation:
To add riders, there must be a bus instance. If you don't need a bus with a durable transport such as RabbitMQ, you can use the in-memory transport.
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddProducer<string, Request>("request", m => m.Message.RequestId);
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
});
});
});
The bus needs to be started and stopped, which will also start/stop any riders on the bus. You can do this via IBusControl
:
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(cancellationToken);
Or by adding the MassTransit Hosted Service if you're using the ASP.NET Core Generic Host.
services.AddMassTransitHostedService(); // in MassTransit.AspNetCore
这篇关于带有 MassTransit 的 Kafka Producer - IBusInstance 尚未注册的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!