我有一个MassTransit传奇状态机(源自Automatonymous.MassTransitStateMachine),并且我尝试解决仅在将端点配置prefetchCount设置为大于1时才出现的问题。
问题是'StartupCompletedEvent'已发布,然后立即处理,然后将传奇状态保存到数据库中。
状态机的配置如下:
State(() => Initialising);
State(() => StartingUp);
State(() => GeneratingFiles);
Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId));
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Initially(
When(Requested)
.ThenAsync(async ctx =>
{
Console.WriteLine("Starting up...");
await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId }));
Console.WriteLine("Done starting up...");
}
.TransitionTo(StartingUp)
);
During(StartingUp,
When(StartupCompleted)
.ThenAsync(InitialiseSagaInstanceData)
.TransitionTo(Initialising)
);
// snip...!
我的传奇收到Requested事件时会发生以下情况:
Initially块的ThenAsync处理程序被命中。在这一点上,没有传奇数据持久化到仓库(如预期)。
StartupCompletedEvent已发布到总线。这里也没有任何传奇数据持久化到仓库中。
初始声明的ThenAsync块完成。此后,传奇数据最终得以保留。
什么都没发生。
此时,队列中没有消息,并且StartupCompletedEvent丢失。但是,数据库中存在一个传奇实例。
我玩过启动游戏,并确定其他线程之一(因为我的预取> 1)已经拾取了该事件,在数据库中未找到任何与correlationId相关的传奇,并丢弃了该事件。因此,该事件将在传奇得以持久之前被发布和处理。
如果我将以下内容添加到Initially处理程序中:
When(StartupCompleted)
.Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance"))
然后让Console.WriteLine执行。我对此的理解是,该事件已被接收,但是由于没有相关ID的传奇存在,因此被路由到Initially处理程序。如果我在此处放置一个断点并检查传奇故事回购,则还没有传奇故事。
可能值得一提其他几点:
我将我的传奇回购上下文设置为使用IsolationLevel.Serializable
我正在使用EntityFrameworkSagaRepository
当“预取”计数设置为1时,一切正常
我正在使用Ninject for DI,并且我的SagaRepository是Thread作用域的,所以我想象每个预取计数允许的处理程序都有其自己的saga存储库副本
如果我将StartupCompletedEvent发布在一个单独的线程中,并且该线程之前的睡眠时间为1000ms,那么事情就可以正常进行。我认为这是因为传奇存储库已完成持久存储传奇状态,因此当事件最终被发布并由处理程序接收时,可以从存储库中正确检索传奇状态。
如果我遗漏了任何东西,请告诉我;我试图提供我认为有价值的一切,而又不要让这个问题过长。
最佳答案
我也有这个问题,我想发表克里斯的评论作为答案,以便人们可以找到它。
解决方案是启用发件箱,以便保留邮件,直到持续保留传奇。
c.ReceiveEndpoint("queue", e =>
{
e.UseInMemoryOutbox();
// other endpoint configuration here
}
关于c# - 使用预取> 1运行的MassTransit传奇,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38716143/