问题描述
这是我向事件网格发布到对象的方式.我希望能够使用azure服务总线来收听它.
here it is how I am publishing to an object to event grid. I want to be able to use the azure service bus to listen to it.
public void Publicar<T>(T model, string operation, string entity)
{
_nomeEvento = entity + operation;
Boolean.TryParse(Configuration["EventGridConfig:Enabled"], out var eventGridIsActive);
if (!eventGridIsActive)
return;
var primaryTopicKey = Configuration["EventGridConfig:AcessKey"];
var primaryTopic = Configuration["EventGridConfig:Endpoint"];
var primaryTopicHostname = new Uri(primaryTopic).Host;
var topicCredentials = new TopicCredentials(primaryTopicKey);
var client = new EventGridClient(topicCredentials);
client.PublishEventsAsync(primaryTopicHostname, GetEventsList(model)).GetAwaiter().GetResult();
}
private List<EventGridEvent> GetEventsList<T>(T model)
{
return new List<EventGridEvent>
{
new EventGridEvent()
{
Id = Guid.NewGuid().ToString(),
EventType = _nomeEvento,
Data = model,
EventTime = DateTime.Now,
Subject = "MS_Clientes",
DataVersion = "1.0",
}
};
}
这是我与服务总线的连接方式
here it is how i am connection to the service bus
static class CustomExtensionsMethods
{
public static IServiceCollection AddBus(this IServiceCollection services, IConfiguration configuration,
IHostingEnvironment env)
{
services.AddMassTransit(x => { x.AddConsumer<NomeEmailChangeConsumer>(); });
services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var keyName = "RootManageSharedAccessKey";
var busName = configuration["ServiceBus:Name"];
var secret = configuration["ServiceBus:Secret"];
var host = cfg.Host(
"Endpoint=sb://" + busName + ".servicebus.windows.net/;" +
"SharedAccessKeyName=" + keyName + ";" +
"SharedAccessKey=" + secret,
z =>
{
TokenProvider
.CreateSharedAccessSignatureTokenProvider(keyName, secret);
});
cfg.ConfigureJsonSerializer(settings =>
{
settings.Converters.Add(new InterfaceConverter());
return settings;
});
cfg.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
cfg.ReceiveEndpoint(host, configuration["ServiceBus:Topic"],
e => { e.Consumer<NomeEmailChangeConsumer>(provider); });
}));
services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
services.AddScoped(provider => provider.GetRequiredService<IBus>().CreateRequestClient<NomeEmailChange>());
services.AddSingleton<IHostedService, BusService>();
return services;
}
}
但是我得到同样的错误
fail: MassTransit.Messages[0]
R-FAULT sb://dev.servicebus.windows.net/bff-queue 9ade19ec-238c-4c08-8e03-28bac695ea7b No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
System.Runtime.Serialization.SerializationException: No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
at MassTransit.Serialization.SupportedMessageDeserializers.Deserialize(ReceiveContext receiveContext)
at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
我尝试添加我在网上找到的JsonConverter,但是没有运气
I have tried to add a JsonConverter I found online, but no luck
public class InterfaceConverter : JsonConverter
{
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
serializer.Serialize(writer, value);
}
public override object ReadJson(JsonReader reader, Type objectType, object existingValue,
JsonSerializer serializer)
{
// Set TypeNameHandling to Auto for deserializing objects with $type
// Should be set directly in ConfigureJsonDeserializer when setting up MT Service bus
serializer.TypeNameHandling = TypeNameHandling.Auto;
return serializer.Deserialize(reader);
}
public override bool CanConvert(Type objectType)
{
return objectType.IsInterface;
}
}
推荐答案
我尝试了几次测试,并提出了可行的解决方案.在我的测试案例中,我将消息从EventGridTopic重定向到ServiceBusQueue,就像您的案例一样-如果我理解得很好.
I tried a couple of tests and came up with a working solution. In my test case, I am redirecting messages from the EventGridTopic to the ServiceBusQueue, just like in your case - if I understood well.
由于MassTransit要求消息采用某种格式才能解释它们,因此我们需要确保具有以下内容:
Since MassTransit requires messages to be in a certain format in order to interpret them, we need to make sure to have the following:
- 用于事件网格消息的自定义反序列化器,其类型为 EventGridEvent
- 确保MassTransit必须使用的所有邮件具有ContentType -如果没有此功能,它将无法正常工作
- Custom deserializer for Event grid messages which are the type of EventGridEvent
- Make sure that all the messages that have to be consumed by MassTransit have ContentType - without this, it won't work
因此,我构建了一个示例,该示例可以在您从EventGrid重定向消息时使用,也可以在将消息直接通过管道传递到服务总线时使用.以下代码是如何为EventGrid消息实现反序列化的示例:
Therefore I built an example that can work if you are redirecting messages from EventGrid, but also if you pipe messages directly to the Service Bus.The following code is an example of how to implement deserializer for EventGrid messages:
public class EventGridMessgeDeserializer : IMessageDeserializer
{
private string _contentType;
public EventGridMessgeDeserializer(string contentType)
{
_contentType = contentType;
}
public ContentType ContentType => new ContentType(_contentType);
public ConsumeContext Deserialize(ReceiveContext receiveContext)
{
var body = Encoding.UTF8.GetString(receiveContext.GetBody());
var customMessage = JsonConvert.DeserializeObject<EventGridEvent>(body);
var serviceBusSendContext = new AzureServiceBusSendContext<EventGridEvent>(customMessage, CancellationToken.None);
// this is the default scheme, that has to match in order messages to be processed
// EventGrid messages type of EventGridEvent within namespace Microsoft.Azure.EventGrid.Models
string[] messageTypes = { "urn:message:Microsoft.Azure.EventGrid.Models:EventGridEvent" };
var serviceBusContext = receiveContext as ServiceBusReceiveContext;
serviceBusSendContext.ContentType = new ContentType(JsonMessageSerializer.JsonContentType.ToString());
serviceBusSendContext.SourceAddress = serviceBusContext.InputAddress;
serviceBusSendContext.SessionId = serviceBusContext.SessionId;
// sending JToken because we are using default Newtonsoft deserializer/serializer
var messageEnv = new JsonMessageEnvelope(serviceBusSendContext, JObject.Parse(body), messageTypes);
return new JsonConsumeContext(JsonSerializer.CreateDefault(), receiveContext, messageEnv);
}
public void Probe(ProbeContext context)
{
}
}
这里重要的部分是您在自定义解串器中指定什么消息类型.由于MassTransit需要某种格式,并且会忽略不符合要求的消息,因此在这里我们按照MassTransit的要求指定该信息.
The important part here is that you specify in your custom deserializer what is the message type(s). Since MassTransit requires some format and ignores messages that do not comply, this is the place where we specify that piece of information as MassTransit requires.
string[] messageTypes = { "urn:message:Microsoft.Azure.EventGrid.Models:EventGridEvent" }
这是默认方案,必须匹配才能处理邮件
This is the default scheme, that has to match in order messages to be processed
最后,您可以在 Github 上找到完整的代码: https://github. com/kgalic/MassTransitSample
And finally, full code you can find on Github: https://github.com/kgalic/MassTransitSample
旁注::如果您要直接将消息发送到SB队列并想反序列化它们,则如前所述,您需要按如下所示指定ContentType:
Side note: If you are sending messages directly to the SB queue and want to deserialize them, as previously said, you need to specify the ContentType as it follows:
var message = new Message(UTF8Encoding.UTF8.GetBytes(request));
message.ContentType = "application/json"; //must have
await _senderClient.SendAsync(message);
如果您有类似的内容,则需要编写类似于EventGridEvent的解串器,您可以将其用作示例.
In case you have something like this, you need to write the deserializer similar as for EventGridEvent, which you can use as an example.
这篇关于如何为Azure服务编写MassTransit Json反序列化器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!