//--------------------------------------------------------------------------
//
// Copyright (c) BUSHUOSX. All rights reserved.
//
// File: AsyncTaskManager.cs
//
// Version:1.1.0.6
//
// Datetime:20170813
//
//-------------------------------------------------------------------------- using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks; namespace BUSHUOSX
{
class AsyncTaskManager
{
/// <summary>
/// 缓存的任务队列
/// </summary>
readonly Queue<Task> _taskQueue = new Queue<Task>(); /// <summary>
/// 工作锁,保护_taskQueue
/// </summary>
SpinLock _workLock; /// <summary>
/// 工作信号,与MaxConcurrencyLevel控制并行量
/// </summary>
SemaphoreSlim _workSemaphore; /// <summary>
/// 工作线程取消标志
/// </summary>
CancellationTokenSource _ctsCancel;
/// <summary>
/// 工作器每次启动的工作时限
/// </summary>
int _timeoutMillisecond;
/// <summary>
/// 工作线程
/// </summary>
Task _worker; /// <summary>
/// 工作器状态
/// </summary>
private bool IsWorking { get; set; } /// <summary>
/// 任务最大并发量
/// </summary>
public int MaxConcurrencyLevel { get; } /// <summary>
/// 内部工作器将在队列中有任务时自动启动。否则由Start方法启动。
/// </summary>
public bool AutoRunWorker { get; } /// <summary>
/// 队列中的任一任务完成时,都将调用
/// </summary>
private Action<Task> _callbackOnAnyTaskComplited; /// <summary>
/// 控制异步任务的并发量。
/// 注意:只能严格控制stauts为Created的任务
/// </summary>
/// <param name="maxConcurrencyLevel">最大并发数。小于等于0时设置为int.MaxValue</param>
/// <param name="callbackOnAnyTaskComplited">不为null时,队列中的任一任务完成后都将传递给此回调方法</param>
/// <param name="autoRunWorker">指示内部工作器是在内部队列排入任务时自动启动,还是由Start方法启动。</param>
/// <param name="timeout">调度完所有任务的时限。小于等于0时不设置超时</param>
public AsyncTaskManager(int maxConcurrencyLevel, Action<Task> callbackOnAnyTaskComplited = null, bool autoRunWorker = true, int timeoutMillisecond = )
{
_callbackOnAnyTaskComplited = callbackOnAnyTaskComplited;
AutoRunWorker = autoRunWorker;
MaxConcurrencyLevel = maxConcurrencyLevel <= ? int.MaxValue : maxConcurrencyLevel;
_timeoutMillisecond = timeoutMillisecond <= ? : timeoutMillisecond;
} /// <summary>
/// 排入一个任务到内部队列,该队列中的任务将被依次调用。
/// </summary>
/// <param name="task">要排队的任务。注意:只能严格控制stauts为Created的任务</param>
/// <param name="callbackOnTaskComplited">task任务完成时回调。如果所有任务使用同样的回调方法,建议使用构造函数中的callbackOnAnyTaskComplited</param>
public void QueueTask(Task task, Action<Task> callbackOnTaskComplited = null)
{
if (task == null) return;
if (null == callbackOnTaskComplited)
{
EnqueueTask(task);
}
else
{
EnqueueTask(task.ContinueWith(callbackOnTaskComplited));
}
if (AutoRunWorker)
{
notifyStartWork();
}
} /// <summary>
/// 枚举任务到内部队列,该队列中的任务将被依次调用。
/// </summary>
/// <param name="tasks">要排队的任务。注意:只能严格控制stauts为Created的任务</param>
/// <param name="callbackOnTaskComplited">tasks中的每个任务完成时回调</param>
public void QueueTask(IEnumerable<Task> tasks, Action<Task> callbackOnTaskComplited = null)
{
foreach (var item in tasks)
{
if (item == null) break;
if (null == callbackOnTaskComplited)
{
EnqueueTask(item);
}
else
{
EnqueueTask(item.ContinueWith(callbackOnTaskComplited));
}
}
if (AutoRunWorker)
{
notifyStartWork();
}
} /// <summary>
/// 返回此刻队列中的任务。
/// </summary>
/// <returns></returns>
public Task[] GetQueueTask()
{
bool gotlock = false;
try
{
_workLock.Enter(ref gotlock);
if (_taskQueue.Count > )
{
return _taskQueue.ToArray();
}
else
{
return null;
}
}
finally
{
if (gotlock) _workLock.Exit();
}
} /// <summary>
/// 启动内部工作器。
/// 注意:为降低资源占用,该工作器在内部队列为空时会自动退出。
/// </summary>
public void Start()
{
notifyStartWork();
} /// <summary>
/// 阻塞线程,等待内部工作器运行结束
/// </summary>
/// <returns>RanToCompletion:所有队列任务已被调度。Canceled:手动取消或挂起了任务,或任务超时。Faulted:未知错误。</returns>
public TaskStatus WaitTaskSchdulerComplited()
{
if (_worker == null) throw new NotSupportedException("_worker is null");
try
{
_worker.Wait();
}
catch (Exception)
{
}
return _worker.Status;
} /// <summary>
/// 挂起队列中剩余的任务。稍后可以使用Continue方法继续。
/// </summary>
public void Suspend()
{
stopWorkThread(false);
} /// <summary>
/// 停止工作器,并清空内部任务队列还未调用的任务。
/// 已调用的任务还将继续运行。
/// </summary>
public void Cancel()
{
stopWorkThread(true);
} /// <summary>
/// 停止工作器
/// </summary>
/// <param name="clearTasks">true时清空内部任务队列</param>
private void stopWorkThread(bool clearTasks)
{
if (IsWorking)
{
_ctsCancel.Cancel();
if (clearTasks)
{
bool gotlock = false;
try
{
_workLock.Enter(ref gotlock);
_taskQueue.Clear();
}
finally
{
if (gotlock) _workLock.Exit();
}
}
}
} /// <summary>
/// 继续之前挂起的任务。
/// </summary>
public void Continue()
{
notifyStartWork();
} /// <summary>
/// 内部启动工作器
/// </summary>
private void notifyStartWork()
{
if (IsWorking) return; //初始化
_ctsCancel = new CancellationTokenSource();
if (_timeoutMillisecond > )
{
_ctsCancel.CancelAfter(_timeoutMillisecond);
}
_workLock = new SpinLock();
_workSemaphore = new SemaphoreSlim(MaxConcurrencyLevel, MaxConcurrencyLevel); _worker = Task.Run(new Action(workerThread), _ctsCancel.Token);
} /// <summary>
/// 任一任务完成时调用
/// </summary>
/// <param name="task"></param>
private void anyTaskComplited(Task task)
{
_workSemaphore.Release();
//todo task
_callbackOnAnyTaskComplited?.Invoke(task);
//Debug.WriteLine("完成任务{0}:{1}", task.Id, task.Status.ToString());
} /// <summary>
/// 工作器线程执行方法。只应存在一个。
/// </summary>
private void workerThread()
{
IsWorking = true;
Debug.WriteLine("工作线程启动……");
try
{
Task tmp = null;
while (true)
{
#if DEBUG
//不恰当的操作,只为屏蔽调试时错误
//会导致_worker状态为RanToCompletion
try
{
_workSemaphore.Wait(_ctsCancel.Token);
}
catch (OperationCanceledException)
{
//_ctsCancel.Token.ThrowIfCancellationRequested();
return;
}
#else
_workSemaphore.Wait(_ctsCancel.Token); //传递取消状态
_ctsCancel.Token.ThrowIfCancellationRequested();
#endif tmp = DequeueTask();
if (tmp != null)
{
if (tmp.Status == TaskStatus.Created)
{
tmp.Start();
}
tmp.ContinueWith(anyTaskComplited);
}
else
{
if (_taskQueue.Count == )
{
Debug.WriteLine("workerAsync:taskQueue is empty");
break;
}
}
}
}
finally
{
//notifyEndWork();
IsWorking = false;
Debug.WriteLine("工作线程结束……");
}
} /// <summary>
/// 排入任务,期望线程安全
/// </summary>
/// <param name="task"></param>
private void EnqueueTask(Task task)
{
bool gotlock = false;
try
{
_workLock.Enter(ref gotlock);
_taskQueue.Enqueue(task);
}
finally
{
if (gotlock) _workLock.Exit();
} } /// <summary>
/// 弹出任务,期望线程安全
/// </summary>
/// <returns></returns>
private Task DequeueTask()
{
bool gotlock = false;
try
{
_workLock.Enter(ref gotlock);
if (_taskQueue.Count > )
{
return _taskQueue.Dequeue();
}
else
{
return null;
}
}
finally
{
if (gotlock) _workLock.Exit();
}
}
}
}