我正在使用以Mircosoft Orleans为基础的工作流引擎,因为它提供了许多有用的功能,例如自动分配工作和处理故障转移。
我有三种谷物:
工作流程-包含工作流程中的信息以及应在其中执行哪些订单工作块
工作块-实际完成工作的零件
执行-工作流程的一次执行
我的问题是,当运行大量当前执行时(即> 1000),性能确实受到影响。我做了一些分析,并将其范围缩小到谷物之间发生的交流。反正有什么我可以改善的呢?
这是我的代码的概述以及谷物之间的相互作用
执行粒度位于循环中,从工作流中获取下一个工作块,然后在工作块上调用execute。正是这种持续不断的调用导致我的一个测试工作流的执行时间从一次执行的10秒变为超过1000次的5分钟。这可以改善还是应该重新设计解决方案?删除粮食交流?
[StorageProvider(ProviderName = "WorkflowStore")]
[Reentrant]
[StatelessWorker]
public class Workflow : Grain<WorkflowState>, IWorkflow
{
public Task<BlockRef> GetNext(Guid currentBlockId, string connectionName)
{
//Lookup the next work block
}
}
[Reentrant]
[StatelessWorker]
public class WorkBlock : Grain<WorkBlock State>, IWorkBlock
{
public Task<string> Execute(IExecution execution)
{
//Do some work
}
}
[StorageProvider(ProviderName = "ExecutionStore")]
public class Execution : Grain<ExecutionState>, IExecution, IRemindable
{
private async Task ExecuteNext(bool skipBreakpointCheck = false)
{
if (State.NextBlock == null)
{
await FindAndSetNext(null, null);
}
...
var outputConnection = await workblock.Execute();
if (!string.IsNullOrEmpty(outputConnection))
{
await FindAndSetNext(State.NextBlock.Id, outputConnection);
ExecuteNext().Ignore();
}
}
private async Task FindAndSetNext(Guid? currentId, string outputConnection)
{
var next = currentId.HasValue ? await _flow.GetNextBlock(currentId.Value, outputConnection) : await _flow.GetNextBlock();
...
}
}
最佳答案
这里有几个问题:
1)Workflow是StatelessWorker并使用StorageProvider似乎并不正确。 StorageProvider表示它具有太在意的状态,StatelessWorker表示它不具有任何状态。而是使用常规的非StatelessWorker谷物。
2)让我们自上而下地进行建模:工作流只是有关工作流的数据和要执行的代码,工作块是多块工作流的一个块(多步工作流的一个步骤),对吗?在这种情况下,它们都不应该是谷物。他们只是状态。执行是唯一需要谷物的工具。执行接收工作流,工作流在其数据内编码下一个块是什么,而执行仅执行该块。
3)从可伸缩性的角度来看,您只需要很多执行粒度。如果工作流具有ID,则可以为每个工作流ID使用执行粒度。如果要并行执行相同的工作流(具有相同的ID),则现在要视情况而定。如果并行执行的次数不是太多,那么一个执行粒度就足够了。如果没有,则可以使用X执行粒度池(执行粒度的ID为“ WorkflowId-NumberBetween0AndX”)。