Jimmy Boagard描述了麦当劳快餐链式店here与scatter gather pattern.的比较
从以上文章中窃取的工作流图像:
最初的实现思路:
为所有食品站都将获得的所有类型的FoodOrdered事件具有一个公共(public)界面,然后每个食品站将能够使用/创建其各自的项目并发布一个公共(public)完成的事件。例如:炸薯条和汉堡站收到有关炸薯条订单的消息,炸薯条站消费该订单,以宣布传奇故事正在侦听的ItemDoneEvent。
初始关注点:
由于Saga并不关心完成的食物的类型,仅考虑所有食物都完成的事实,这似乎是一个好的解决方案。但是,之后的读取警告here有关队列共享并注意到Consumer.Conditional filtering has been removed with MassTransit 3.0的警告。该框架似乎在说这种类型的方法将“发生Bad Things(TM)”。但是我不确定如果不为厨房中的每个食品创建消息请求和响应以及关联事件,您还能怎么做。例如:FriesOrdered,BurgerOrdered FriesCooked,BurgerCooked。如果您必须对厨房中的每个物品都这样做,那将非常繁琐?
考虑到以上问题,此类工作流的一个传奇实例看起来像什么?
最佳答案
我遇到了类似的问题-需要发布几十个命令(所有相同的接口(interface),IMyRequest
)并等待所有。
实际上,我的命令启动了其他传奇,这些传奇在处理结束时发布IMyRequestDone
而不标记传奇已完成。 (需要在以后的某个时间完成它们。)因此,不是在父传奇中保存已完成的嵌套sagas的数量,而是查询子传奇实例的状态。
检查每条MyRequestDone
消息:
Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x =>
{
// timeout for all requests
x.Delay = TimeSpan.FromMinutes(10);
x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});
During(Active,
When(Xxx)
.ThenAsync(async context =>
{
await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));
context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay;
context.Instance.WaitingMyResponsesCount = 2;
})
.TransitionTo(WaitingMyResponses)
.Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance))
);
During(WaitingMyResponses,
When(MyRequestDone)
.Then(context =>
{
if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow)
throw new TimeoutException();
})
.If(context =>
{
var db = serviceProvider.GetRequiredService<DbContext>();
var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed
return allDone;
}, x => x
.Unschedule(FailSagaOnRequestsTimeout)
.TransitionTo(Active))
)
.Catch<TimeoutException>(x => x.TransitionTo(Failed))
);
During(WaitingMyResponses,
When(FailSagaOnRequestsTimeout.Received)
.TransitionTo(Failed)
定期检查是否已完成所有请求(通过“减少NServiceBus Saga负载”):
Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x =>
{
// check interval
x.Delay = TimeSpan.FromSeconds(15);
x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});
During(Active,
When(Xxx)
.ThenAsync(async context =>
{
await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));
context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10);
context.Instance.WaitingMyResponsesCount = 2;
})
.TransitionTo(WaitingMyResponses)
.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))
);
During(WaitingMyResponses,
When(CheckAllRequestsDone.Recieved)
.Then(context =>
{
var db = serviceProvider.GetRequiredService<DbContext>();
var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing));
if (!allDone)
{
if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)
throw new TimeoutException();
throw new NotAllDoneException();
}
})
.TransitionTo(Active)
.Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)))
.Catch<TimeoutException>(x => x.TransitionTo(Failed));
关于c# - 如何在MassTransit 3.0中使用分散/聚集模式实现传奇,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33579533/