可指定并行度的任务调度器

/// <summary>
/// 指定最大并行度的任务调度器
/// </summary>
public class SpecifyDegreeOfParallelismTaskScheduler : TaskScheduler
{
/// <summary>
/// 信号量锁
/// </summary>
private static System.Threading.SemaphoreSlim _lock = new System.Threading.SemaphoreSlim(1); /// <summary>
/// 当前线程是否正在处理任务
/// </summary>
[ThreadStatic]
private static bool _currentThreadIsProcessingItems; /// <summary>
/// 执行的任务队列
/// </summary>
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); /// <summary>
/// 指定的最大并行度
/// </summary>
private readonly int _maxDegressOfParallelism; /// <summary>
///当前调度器中正在执行的任务数
/// </summary>
private int _runingTasks = 0; /// <summary>
/// 指示此调度器能够支持的最大并发级别。
/// </summary>
public override int MaximumConcurrencyLevel { get { return this._maxDegressOfParallelism; } } /// <summary>
/// 初始化一个可指定最大并行度的任务调度器
/// </summary>
/// <param name="maxDegreeOfParallelism">最大并行度</param>
public SpecifyDegreeOfParallelismTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1)
throw new ArgumentOutOfRangeException("maxDegreeOfParallelism至少为1");
this._maxDegressOfParallelism = maxDegreeOfParallelism;
} /// <summary>
/// 将Task排队到调度器中
/// </summary>
/// <param name="task">要排队的任务</param>
protected override void QueueTask(Task task)
{
_lock.Wait();
try
{
this._tasks.AddLast(task);
if (this._runingTasks < this._maxDegressOfParallelism)
{
++this._runingTasks;
ConsumeTaskOfPending();
}
}
finally
{
_lock.Release();
}
} /// <summary>
/// 尝试在当前线程上执行指定的任务
/// </summary>
/// <param name="task">被执行的任务</param>
/// <param name="taskWasPreviouslyQueued">指定的任务之前是否已经排队</param>
/// <returns>是否能在当前线程执行此任务</returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
//如果当前前程没有正在处理项目,无法内联
if (!_currentThreadIsProcessingItems)
return false; //如果任务之前已经被排队,将其从队列中删除
if (taskWasPreviouslyQueued)
TryDequeue(task);
return base.TryExecuteTask(task);
} /// <summary>
/// 消费队列中等待的任务
/// </summary>
private void ConsumeTaskOfPending()
{
ThreadPool.UnsafeQueueUserWorkItem(p =>
{
_currentThreadIsProcessingItems = true;
try
{
while (true)
{
Task item;
_lock.Wait();
try
{
if (this._tasks.Count == 0)
{
--this._runingTasks;
break;
}
item = this._tasks.First.Value;
this._tasks.RemoveFirst();
}
finally
{
_lock.Release();
}
base.TryExecuteTask(item);
}
}
finally
{
_currentThreadIsProcessingItems = false;
}
}, null);
} /// <summary>
/// 尝试将任务从队列移除
/// </summary>
/// <param name="task">要移除的任务</param>
/// <returns>是否成功将任务从队列中移除</returns>
protected override bool TryDequeue(Task task)
{
_lock.Wait();
try
{
return this._tasks.Remove(task);
}
finally
{
_lock.Release();
}
} /// <summary>
/// 获取当前调度器中已调度任务序列
/// </summary>
/// <returns>可遍历已调度任务序列</returns>
protected override IEnumerable<Task> GetScheduledTasks()
{
_lock.Wait();
try
{
return this._tasks.ToArray();
}
finally
{
_lock.Release();
}
} }
04-05 13:39