我想实现一个优先的ActionBlock<T>
。这样我就可以通过使用TInput
有条件地优先考虑某些Predicate<T>
项。
我读了 Parallel Extensions Extras Samples和Guide to Implementing Custom TPL Dataflow Blocks。
但是仍然不知道如何实现这种情况。
- - - - - - - - - - - - - - 编辑 - - - - - - - - - - - ------
有一些任务,其中5个可以同时运行。当用户按下按钮时,某些任务(取决于谓词功能)应以最高优先级运行。
实际上我写这段代码
TaskScheduler taskSchedulerHighPriority;
ActionBlock<CustomObject> actionBlockLow;
ActionBlock<CustomObject> actionBlockHigh;
...
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5);
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0);
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1);
...
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow });
...
if (predicate(customObject))
actionBlockHigh.Post(customObject);
else
actionBlockLow.Post(customObject);
但是,似乎优先权根本没有生效。- - - - - - - - - - - - - - 编辑 - - - - - - - - -
我发现以下事实:当我使用以下代码行时:
actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow });
使应用程序正确观察任务的优先级,但一次只能执行一个任务,同时使用流中显示的第一个代码块,使应用程序同时运行5个任务,但优先级顺序不正确。actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow });
更新:坦克要检查,我应该为
MaxMessagesPerTask
指定taskSchedulerLow
。 最佳答案
您的问题并不包含很多详细信息,因此以下内容仅是您可能需要的猜测。
我认为最简单的方法是让两个ActionBlock
在 QueuedTaskScheduler
from ParallelExtensionsExtras上以不同的优先级运行。您将使用谓词链接到高优先级的一个,然后再链接到低优先级的一个。另外,为确保高优先级Task
不等待,请设置低优先级块的MaxMessagesPerTask
。
在代码中,它看起来像:
static ITargetBlock<T> CreatePrioritizedActionBlock<T>(
Action<T> action, Predicate<T> isPrioritizedPredicate)
{
var buffer = new BufferBlock<T>();
var scheduler = new QueuedTaskScheduler(1);
var highPriorityScheduler = scheduler.ActivateNewQueue(0);
var lowPriorityScheduler = scheduler.ActivateNewQueue(1);
var highPriorityBlock = new ActionBlock<T>(
action, new ExecutionDataflowBlockOptions
{
TaskScheduler = highPriorityScheduler
});
var lowPriorityBlock = new ActionBlock<T>(
action, new ExecutionDataflowBlockOptions
{
TaskScheduler = lowPriorityScheduler,
MaxMessagesPerTask = 1
});
buffer.LinkTo(highPriorityBlock, isPrioritizedPredicate);
buffer.LinkTo(lowPriorityBlock);
return buffer;
}
这只是您可以执行的操作的草图,例如,返回的块的
Completion
不能正确运行。