问题描述
我正在使用Rebus,我想介绍 CQRS旅程中所述的内容在"避免多次处理事件"中,但我无法弄清楚.
I'm using Rebus and I want to introduce something like described in CQRS Journey at paragraph "Avoid processing events multiple times", but I cannot figure it out.
我将Rebus配置为将SQL Server用于 Transport ,将MongoDB用于 Subscriptions 和 Sagas . 路由配置为 TypeBased ,并将所有命令处理程序的类型映射到 Transport 中配置的队列.
I configured Rebus to use SQL Server for Transport and MongoDB for Subscriptions and Sagas. The Routing is configured TypeBased and maps all the commands handlers' types to the queue configured in Transport.
var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
.Logging(l => l.Trace())
.Transport(t =>
{
t.UseSqlServer(connectionstring, "TestMessages", "messageQueueName");
})
.Routing(r => r.TypeBased()
.MapAssemblyOf<Assembly1.Commands.DoSomething>("messageQueueName")
.MapAssemblyOf<Assembly2.Commands.DoSomethingElse>("messageQueueName")
)
.Sagas(s => s.StoreInMongoDb(db, (sagaType) =>
{
return sagaType.Name;
}))
.Subscriptions(s => s.StoreInMongoDb(db, "Subscriptions"))
.Options(o =>
{
o.SetNumberOfWorkers(1);
o.SetMaxParallelism(1);
o.EnableSagaAuditing().StoreInMongoDb(db, "Snapshots");
})
.Start();
现在,我应该以一种方式配置Rebus:当命令 Publish 一个事件时,该事件将复制到与现有订户类型一样多的单独主题(虚拟或物理队列)中.
Now I should configure Rebus in a way that when a command Publish an event, this is replicated in as many separate topics (virtual or physical queues) as existing subscribers' types.
类似的东西:
bus.Subscribe<Assembly1.EventHandler1>("Assembly1.EventHandler1Queue").Wait();
bus.Subscribe<Assembly1.EventHandler2>("Assembly1.EventHandler2Queue").Wait();
bus.Subscribe<Assembly2.EventHandler1>("Assembly2.EventHandler1Queue").Wait();
感谢您的帮助.
推荐答案
有些问题似乎使您的问题感到困惑.
There's a few things that seem confusing with your question.
但是我想您的基本问题是如何确保每条消息仅由每个订户处理一次.
But I guess your fundamental question is how to be sure that each message is processed one time only by each subscriber.
答案很简单:每个订户都有一个单独的终结点-这意味着每个订户将有自己的输入队列,将从中处理消息,并向失败的消息返回.
The answer is pretty easy: Have a separate endpoint for each subscriber - this means that each subscriber will have its own input queue which messages get processed from, and which a failed message will be returned to.
然后,您可以根据需要在每个订户中具有任意数量的处理程序.所有兼容的处理程序将针对每条传入的消息执行.
You can then have as many or as few handlers in each subscriber as you want. All compatible handlers will be executed for each incoming message.
使用Rebus,对Configure.With(...).(...).Start()
的每次调用都将为您提供一个单独的终结点-因此,在您的情况下,我建议您将用户终结点创建包装到一个方法中,然后可以像这样调用:
With Rebus, each invocation to Configure.With(...).(...).Start()
will give you a separate endpoint - so in your case, I suggest you wrap the subscriber endpoint creation in a method, which you can then invoke like this:
var event1Subscriber = CreateSubscriber("subscriber_event1");
event1Subscriber.Subscribe<Event1>().Wait();
var event2Subscriber = CreateSubscriber("subscriber_event2");
event2Subscriber.Subscribe<Event2>().Wait();
var event3Subscriber = CreateSubscriber("subscriber_event3");
event3Subscriber.Subscribe<Event3>().Wait();
// ...
其中CreateSubscriber
将是这样的:
public IBus CreateSubscriber(string queueName)
{
return Configure.With(GetContainerAdapter())
.Transport(t => t.UseMsmq(queueName))
.Start();
}
这篇关于如何将Rebus配置为具有基于处理程序类型的主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!