本文介绍了带有 MassTransit 的 Kafka Producer - IBusInstance 尚未注册的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 MassTransit

构建 Kafka 消费者

我有这段代码

var services = new ServiceCollection();services.AddMassTransit(x =>{x.AddRider(骑手 =>{Rider.AddProducer(request", m => m.Message.RequestId);骑手.使用Kafka((上下文,k)=>{k.Host(本地主机:9092");});});});var provider = services.BuildServiceProvider();var producer = provider.GetRequiredService>();等待 producer.Produce(new Request(){RequestId = "abc123",RequestedAt = DateTime.UtcNow});

这是此处

但是当我尝试运行它时,出现此异常

未处理的异常.System.InvalidOperationException: 没有注册MassTransit.Registration.IBusInstance"类型的服务.

查看他们网站上的示例,我发现这可能与我尚未注册 RabbitMQ 的事实有关

x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

但我没有 RabbitMQ,我只在这种情况下使用 Kafka.

是否需要向其他一些消息代理注册总线才能生成到 Kafka?

解决方案

来自文档:

随着 MassTransit v7 引入的 Riders,提供了一种将消息从任何来源传送到总线的新方法.车手随车一起配置,开车时上车.

要添加乘客,必须有一个巴士实例.如果您不需要具有持久传输的总线(例如 RabbitMQ),则可以使用内存传输.

var services = new ServiceCollection();services.AddMassTransit(x =>{x.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context));x.AddRider(骑手 =>{Rider.AddProducer(request", m => m.Message.RequestId);骑手.使用Kafka((上下文,k)=>{k.Host(本地主机:9092");});});});

巴士需要启动和停止,这也将启动/停止巴士上的所有乘客.您可以通过 IBusControl 执行此操作:

var provider = services.BuildServiceProvider();var busControl = provider.GetRequiredService();等待 busControl.StartAsync(cancellationToken);

或者,如果您使用的是 ASP.NET Core 通用主机,则添加 MassTransit 托管服务.

services.AddMassTransitHostedService();//在 MassTransit.AspNetCore 中

I'm trying to build a Kafka consumer using MassTransit

I have this piece of code

var services = new ServiceCollection();
services.AddMassTransit(x =>
{
    x.AddRider(rider =>
    {
        rider.AddProducer<string, Request>("request", m => m.Message.RequestId);

        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");
        });
    });
});

var provider = services.BuildServiceProvider();
var producer = provider.GetRequiredService<ITopicProducer<Request>>();

await producer.Produce(new Request()
{
    RequestId = "abc123",
    RequestedAt = DateTime.UtcNow
});

This is the simplest example of a producer from here

but when I try to run it, I get this exception

Unhandled exception. System.InvalidOperationException: No service for type 'MassTransit.Registration.IBusInstance' has been registered.

Looking at the example from their website, I see that it could be related to the fact that I haven't registered a RabbitMQ

x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

But I don't have a RabbitMQ, I only use Kafka in this scenario.

Is it necessary to register a bus with some other message broker in order to produce to Kafka?

解决方案

From the documentation:

To add riders, there must be a bus instance. If you don't need a bus with a durable transport such as RabbitMQ, you can use the in-memory transport.

var services = new ServiceCollection();
services.AddMassTransit(x =>
{
    x.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context));

    x.AddRider(rider =>
    {
        rider.AddProducer<string, Request>("request", m => m.Message.RequestId);

        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");
        });
    });
});

The bus needs to be started and stopped, which will also start/stop any riders on the bus. You can do this via IBusControl:

var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();

await busControl.StartAsync(cancellationToken);

Or by adding the MassTransit Hosted Service if you're using the ASP.NET Core Generic Host.

services.AddMassTransitHostedService(); // in MassTransit.AspNetCore

这篇关于带有 MassTransit 的 Kafka Producer - IBusInstance 尚未注册的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 00:58