不用任何与创建线程、资源互斥有关系的API写多线程程序
这次的例子,是一个很简单的控制台,她将面对瞬间提交的百万的数据,而面不改色(CPU、内存非常平稳),队列中始终只保存最新的数据,每次只处理cpu 个数据(我的机器是双核的,所以,在我这里,就是每个CPU一个线程,真正的并行运行哦....),OK不废话,进入正题:
呃,既然是实例,那么就直接看代码好了:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using Microsoft.Ccr.Core; namespace CCRDemo1 { class Program { static void Main(string[] args) { int maxiQueueDepth = ; // step1: 创建一个Dispatcher对象 Dispatcher dispatcher = new Dispatcher(, "调度器名称"); // step2: 创建一个与step1创建对象关联的DispatcherQueue对象 DispatcherQueue depthThrottledQueue = new DispatcherQueue( "任务队列的名称", // 关联到该队列的调度器 dispatcher, // 队列保存数据的策略:保存最近消息策略 TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks, // 队列的深度 maxiQueueDepth ); // step3: 创建一个能够接收整型数据的Port Port<int> intPort = new Port<int>(); // step4: 把Port与处理函数关联,然后再与DispatcherQueue关联 Arbiter.Activate(depthThrottledQueue, Arbiter.Receive(true, intPort, delegate(int i) // 这里用了一个匿名方法,作为处理函数 { Thread.Sleep(); Console.WriteLine("[{0}] {1}", DateTime.Now.ToString("o"), i); } ) ); // step5: 快速的提交大量的任务 Console.WriteLine("[{0}] 开始提交大量的任务", DateTime.Now.ToString("o")); for (int i = ; i < maxiQueueDepth * ; i++) { // 把数据Post到intPort内 intPort.Post(i); } Console.WriteLine("[{0}] 大量任务提交完毕。", DateTime.Now.ToString("o")); Console.WriteLine("Press any key to exit"); Console.ReadKey(); dispatcher.Dispose(); } } }
二、原理讲解
其实在代码里面的注释,我想应该写的很明白了,不过毕竟是第一个例子,我还是稍微讲些CCR的大致原理(至于详细的实现原理,我后面会专门起文想说,这次主要的目的还是,先领略下CCR的神奇之处):
首先,多线程是以操作系统线程池的形式被Dispatcher管理的,因此创建线程的工作实际上由Dispatcher代劳了;
其次,在CCR内,他规定Dispatcher只能处理任务,而这个任务只能从任务队列DispatcherQueue内获取,因此,我们要创建任务队列,并关联上Dispatcher;
然后,我们把自己要提交给处理函数处理的数据,封装在Port<T>内,Port其实是一个FIFO队列,专门用来接收用户提交的数据的;
最后,我们要把数据、处理函数、任务队列 组合起来,这就是上面代码中的step4,这步其实做了2个工作:
1、把port和处理函数,封装为Receive关联起来;
2、把Receive和DispatcherQueue关联起来;
这样,我们就完成了,所有的工作。
总之,CCR提供了一个模式,让我们只需要把需要并发、异步处理的工作,分解为:
1、输入数据--->post到Port内;
2、处理过程--->做成委托关联到任务队列中
这种方法,是的编写多线程程序的工作大大简化了,而且也能够让后台的代码能够被编译器统一优化。
CCR任务调度的原理和策略
二、CCR的任务调度
查阅MSDN:http://msdn.microsoft.com/en-us/library/bb648756.aspx
可知:(下面引用了Ncindy翻译的部分内容,感谢ncindy的辛苦劳动)
【一】:当一个元素被投递到附加了接收器的port,port的实现中将会发生如下操作:
step1. 为投递进来的元素创建一个容器。容器的类型(IPortElement)允许CCR在不知道元素类型的情况下将元素排队并将元素赋值给Task实例。
step2. 容器被放入队列。
step3. 如 果接收器列表不是null,并且其中有一个以上的接收器,port对象将会调用ReceiverTask.Evaluate方法来让接收器和它里面的仲裁 器层次检测元素是否可以被使用,在这个例子中,Evaluate方法将会返回true,并使用收到的元素和用户的delegate作为参数创建一个 Task<int>实例。
step4. port使用调用Evaluate方法返回的Task对象作为参数调用taskQueue.Enqueue,注意,当一个接收器是第一次被激活,它会被关联到由Arbiter.Activate方法提供的DispatcherQueue实例。
当上面的4步完成之后,生成的Task对象现在已经被调度逻辑分发(dealt)给了对应的DispatcherQueue。
【二】:一旦一个元素被放入DispatcherQueue,接下来将会做如下操作:
step1. DispatcherQueue向它所属的Dispatcher发信号,告诉Dispatcher一个新的任务可以被执行了。
step2. Dispatcher通知一个或者多个TaskExecutionWorker类型对象。每个TaskExecutionWorker对象管理一个操作系统线程。它将线程设置到一种高效的休眠状态,直到Dispatcher发出信号通知有元素可以被调度时。
step3. TaskExecutionWorker对象调用DispatcherQueue.Test方法从队列中获取一个任务。如果是可用的任务,TaskExecutionWorker对象则调用ITask.Execute。
step4. Task.Execute方法调用关联在task对象上的delegate,并将一个或者多个关联在task上的参数传递进去。
总之:在CCR中,线程池处理的任务,是由DispatcherQueue产生的;而DispathcerQueue有是根据用户线程通过Port或PortSet提交给的数据 和 初始化时指定的委托来产生任务的。因此可知影响任务调度的地方有3处:
1、客户端提交数据的地方:Port/PortSet的Post方法;
2、DispatcherQueue产生任务的地方:ReceiverTask的Evaluate方法;
3、Dispacher内线程池处理任务的地方:Task执行关联delegate的Execute方法;
而CCR就是通过给上面三处加入调度机制来达到任务调度的负载均衡目的的。
三、CCR的四种任务调度策略
using System; namespace Microsoft.Ccr.Core { public enum TaskExecutionPolicy { Unconstrained = , ConstrainQueueDepthDiscardTasks = , ConstrainQueueDepthThrottleExecution = , ConstrainSchedulingRateDiscardTasks = , ConstrainSchedulingRateThrottleExecution = , } }
这4中策略分别应用在一下场景:
1、ConstrainQueueDepthDiscardTasks 按队列深度丢弃最旧任务
适用于:要处理的消息可以丢弃但是必须保存最近N条的情况。这对于CPU处理速度低于消息产生速度的情况很有好处,该策略能够保证丢弃的最旧任务的同时最新的N个任务能都得到调度。特别是在阻塞深度为1的时候,队列中保存的始终都是最新的任务。
2、ConstrainQueueDepthThrottleExecution 按照队列深度阻塞任务产生
适用于:消息不是规律产生,而是随机、爆炸性到达的情况。这对于来自网络获知其他机器的消息很相似,该策略保证任务不会被丢失,通过阻塞消息Post到Port/PortSet的方法来降低任务产生的速度。
3、ConstrainSchedulingRateDiscardTasks 按照固定速度处理消息且丢失未处理的最旧消息
适用于:处理产生速度有规律的消息,比如播放视频。在这种情况下一般所有的消息已经不是最重要的了,但保存最新的消息却很有意义,该策略能够保证代码会以固定的速度执行,即使消息以爆炸式的速度产生也没关系。
4、ConstrainSchedulingRateThrottleExecution 按照固定速度处理消息且阻塞任务缠上
适用于:消息产生源是同一处理器中的另一线程时。该策略会让消息的产生源慢下来,适应消息的处理速度,保证不会有任务丢失。
四、问题&解答
问题
例子程序在运行的时候出现:数据会乱序 和 数据丢失的现象。(下面是例子程序的主要代码,与上一篇代码完全相同,此处贴出,是为了阅读方便)
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading; 6 using Microsoft.Ccr.Core; 7 8 namespace CCRDemo1 9 { class Program { static void Main(string[] args) { int maxiQueueDepth = ; // step1: 创建一个Dispatcher对象 Dispatcher dispatcher = new Dispatcher(, "调度器名称"); // step2: 创建一个与step1创建对象关联的DispatcherQueue对象 DispatcherQueue depthThrottledQueue = new DispatcherQueue( "任务队列的名称", // 关联到该队列的调度器 dispatcher, // 队列保存数据的策略:保存最近消息策略 TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks, // 队列的深度 maxiQueueDepth ); // step3: 创建一个能够接收整型数据的Port Port<int> intPort = new Port<int>(); // step4: 把Port与处理函数关联,然后再与DispatcherQueue关联 Arbiter.Activate(depthThrottledQueue, Arbiter.Receive(true, intPort, delegate(int i) // 这里用了一个匿名方法,作为处理函数 { Thread.Sleep(); Console.WriteLine("[{0}] {1}", DateTime.Now.ToString("o"), i); } ) ); // step5: 快速的提交大量的任务 Console.WriteLine("[{0}] 开始提交大量的任务", DateTime.Now.ToString("o")); for (int i = ; i < maxiQueueDepth * ; i++) { // 把数据Post到intPort内 intPort.Post(i); } Console.WriteLine("[{0}] 大量任务提交完毕。", DateTime.Now.ToString("o")); Console.WriteLine("Press any key to exit"); Console.ReadKey(); dispatcher.Dispose(); } } }
解答
1、数据乱序问题
CCR内部创建了多线程池来执行这个匿名方法,而且执行的方式是并发、异步,因此改匿名方法打印出来的数字的顺序自然就应该是不可预知的,也就是说:顺序是乱的;
2、数据丢失问题
这
个也是正常的,例子代码在瞬间提交的大量数据,提交的速度,远远超过匿名方法处理的速度(里面sleep了2秒),因此这意味着会有大量的任务堆积在
DispatcherQueue内,然而,改队列在创建的时候,已经指明了任务调度策略
为:TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks,因此DispatcherQueue内
只会保存最新的任务,旧的就会丢失。
CCR的心脏Dispatcher。
在我们使用Dispatcher的时候,最常用到的就是他的构造函数和资源释放函数了,因此本节,先介绍这2类函数,其他的函数因为涉及到DispatcherQueue等内部的运作机制,属于CCR内部的运作调度机制,留待前面这些基础知识交代完毕后再另起一篇细说。
Dispatcher内含的方法、属性预览
public sealed class Dispatcher : IDisposable { // 1、构造函数 public Dispatcher(); public Dispatcher(int threadCount, string threadPoolName); public Dispatcher(int threadCount, ThreadPriority priority, bool useBackgroundThreads, string threadPoolName); public Dispatcher(int threadCount, ThreadPriority priority, DispatcherOptions options, string threadPoolName); public Dispatcher(int threadCount, ThreadPriority priority, DispatcherOptions options, ApartmentState threadApartmentState, string threadPoolName); // 2、资源释放函数 public void Dispose(); // 3、属性 public static ICollection<Causality> ActiveCausalities { get; } public List<DispatcherQueue> DispatcherQueues { get; } public string Name { get; set; } public DispatcherOptions Options { get; set; } public int PendingTaskCount { get; set; } public long ProcessedTaskCount { get; set; } public static int ThreadsPerCpu { get; set; } public int WorkerThreadCount { get; set; } // 4、因果关系函数 public static void AddCausality(Causality causality); public static void AddCausalityBreak(); public static void ClearCausalities(); public static bool RemoveCausality(Causality causality); public static bool RemoveCausality(string name); }
一、构造函数
Dispatcher给我们提供了5个构造函数,其实是为了方便大家使用而对一个构造函数做的封装,我们看看具体的实现代码(我在关键的地方都加上了中文的注释):
1/// <summary> 2/// Constructs a Dispatcher instance using the default number of threads and no friendly tag 3/// 使用默认线程数和空名作为参数构造一个Dispatcher实例 4/// </summary> 5public Dispatcher() 6 : this(, null) 7{ 8} 9 10/// <summary> 11/// Constructs a Dispatcher instance. 12/// The instance is usable only after AddPort is called at least once 13/// 该实例仅在调用AddPort之后才生效 14/// </summary> 15/// <param name="threadCount"> 16/// Number of OS threads to use for processing CCR Tasks 17/// 处理CCR任务的操作系统线程数 18/// </param> 19/// <param name="threadPoolName"> 20/// Friendly name to use for the OS Threads and this dispatcher instance 21/// Dispatcher实例和系统线程的名称 22/// </param> 23public Dispatcher(int threadCount, string threadPoolName) 24 : this(threadCount, ThreadPriority.Normal, DispatcherOptions.None, threadPoolName) 25{ 26} 27 28/// <summary> 29/// Constructs a Dispatcher instance. 30/// The instance is usable only after AddPort is called at least once 31/// </summary> 32/// <param name="threadCount"> 33/// Number of OS threads to use for processing CCR Tasks 34/// </param> 35/// <param name="priority"> 36/// OS Thread priority to use for threads exexuting CCR tasks 37/// 系统线程执行CCR任务所使用的优先级 38/// </param> 39/// <param name="options"> 40/// Dispatcher scheduling options 41/// 调度选项 42/// </param> 43/// <param name="threadPoolName"> 44/// Friendly name to use for the OS Threads and this dispatcher instance 45/// </param> 46public Dispatcher(int threadCount, ThreadPriority priority, DispatcherOptions options, string threadPoolName) 47 : this(threadCount, priority, options, ApartmentState.Unknown, threadPoolName) 48{ 49} 50 51/// <summary> 52/// Constructs a Dispatcher instance. 53/// The instance is usable only after AddPort is called at least once 54/// </summary> 55/// <param name="threadCount"> 56/// Number of OS threads to use for processing CCR Tasks 57/// </param> 58/// <param name="priority"> 59/// OS Thread priority to use for threads exexuting CCR tasks 60/// </param> 61/// <param name="useBackgroundThreads"> 62/// If true, background threads are used, which do not prevent application exit 63/// 是否使用后台线程,若是用,则应用程序可以自由的退出,而不用担心 64/// </param> 65/// <param name="threadPoolName"> 66/// Friendly name to use for the OS Threads and this dispatcher instance 67/// </param> 68public Dispatcher(int threadCount, ThreadPriority priority, bool useBackgroundThreads, string threadPoolName) 69 : this(threadCount, 70 priority, 71 useBackgroundThreads ? DispatcherOptions.UseBackgroundThreads : DispatcherOptions.None, 72 threadPoolName) 73{ 74} 75 76/// <summary> 77/// Constructs a Dispatcher instance. 78/// The instance is usable only after AddPort is called at least once 79/// </summary> 80/// <param name="threadCount"> 81/// Number of OS threads to use for processing CCR Tasks 82/// </param> 83/// <param name="priority"> 84/// OS Thread priority to use for threads exexuting CCR tasks 85/// </param> 86/// <param name="options"> 87/// Dispatcher scheduling options 88/// </param> 89/// <param name="threadApartmentState"> 90/// Thread apartment state. Use ApartmentState.Unknown when STA/MTA is not required for interop 91/// 线程单元状态,当COM interop不需要STA/MTA的时候可以使用ApartmentState.Unknown 92/// </param> 93/// <param name="threadPoolName"> 94/// Friendly name to use for the OS Threads and this dispatcher instance 95/// </param> 96public Dispatcher(int threadCount, ThreadPriority priority, DispatcherOptions options, 97 ApartmentState threadApartmentState, string threadPoolName) 98{ 99 this._startupCompleteEvent = new ManualResetEvent(false); this._dispatcherQueues = new List<DispatcherQueue>(); this._taskExecutionWorkers = new List<TaskExecutionWorker>(); this._nameToQueueTable = new Dictionary<string, DispatcherQueue>(); // 线程数 if (threadCount == ) {// 默认情况下 threadCount = Math.Max(NumberOfProcessorsInternal, ) * ThreadsPerCpu; } else if (threadCount < ) { throw new ArgumentException("Cannot create a negative number of threads. Pass 0 to use default.", "threadCount"); } if (threadPoolName == null) { this._name = string.Empty; } else { this._name = threadPoolName; } this._options = options; for (int i = ; i < threadCount; i++) { this.AddWorker(priority, threadApartmentState); } this.StartWorkers(); } /// <summary> /// Creates one TaskExecutionWorker instance associated with one OS thread /// 创建一个与线程想关联的TaskExecutionWorker实例 /// </summary> /// <remarks> /// This routine should only be called once per dispatcher instance /// 每个Dispatcher实例只能调用该函数一次 /// </remarks> private void AddWorker(ThreadPriority priority, ApartmentState apartmentState) { TaskExecutionWorker item = new TaskExecutionWorker(this); Thread thread = new Thread(new ThreadStart(item.ExecutionLoop)); thread.SetApartmentState(apartmentState); thread.Name = this._name; thread.Priority = priority; thread.IsBackground = DispatcherOptions.None < (this._options & DispatcherOptions.UseBackgroundThreads); item._thread = thread; this._taskExecutionWorkers.Add(item); this._cachedWorkerListCount++; }
二、Dispose函数
使用该函数才能够释放Dispatcher内创建的线程池等资源,记得在退出程序之前,调用它,内部的实现细节很简单,详细请参考代码。
1 /// <summary> 2 /// Stops all scheduling and disposes this dispatcher instance 3 /// 停止所有的调度并释放当前Dispatcher实例 4 /// </summary> 5 public void Dispose() 6 { 7 if (this._cachedWorkerListCount != ) 8 { 9 lock (this._taskExecutionWorkers) { foreach (TaskExecutionWorker worker in this._taskExecutionWorkers) { worker.Shutdown(); } this._cachedWorkerListCount = ; } if (this._startupCompleteEvent != null) { this._startupCompleteEvent.Close(); } this.Shutdown(true); } } /// <summary> /// Stops all dispatcher worker threads and cleans up the dispatcher /// 停止所有的工作线程并清理资源 /// </summary> private void Shutdown(bool wait) { //this.Dispose(); lock (this._taskExecutionWorkers) { this._hasShutdown = true; Monitor.PulseAll(this._taskExecutionWorkers); if (wait) { while (!this._hasShutdown) { Monitor.Wait(this._taskExecutionWorkers); } } } }
三、几个重要的枚举
1、 ThreadPriority: 指定Thread 的调度优先级。
public enum ThreadPriority { // 可以将 System.Threading.Thread 安排在具有任何其他优先级的线程之后。 Lowest = , // 可以将 System.Threading.Thread 安排在具有 Normal 优先级的线程之后, // 在具有 Lowest 优先级的线程之前。 BelowNormal = , // 可以将 System.Threading.Thread 安排在具有 AboveNormal 优先级的线程之后, // 在具有 BelowNormal 优先级的线程之前。默认情况下,线程具有 // Normal 优先级。 Normal = , // 可以将 System.Threading.Thread 安排在具有 Highest 优先级的线程之后, // 在具有 Normal 优先级的线程之前。 AboveNormal = , // 可以将 System.Threading.Thread 安排在具有任何其他优先级的线程之前。 Highest = , }
2、DispatcherOptions: 线程池运行选项
public enum DispatcherOptions { None, /// <summary> /// 时候后台线程 /// </summary> UseBackgroundThreads, /// <summary> /// 按照CPU来分配线程 /// </summary> UseProcessorAffinity }
3、ApartmentState: 指定 System.Threading.Thread 的单元状态
public enum ApartmentState { // System.Threading.Thread 将创建并进入一个单线程单元。 STA = , // System.Threading.Thread 将创建并进入一个多线程单元。 MTA = , // 尚未设置 System.Threading.Thread.ApartmentState 属性。 Unknown = , }
DispatcherQueue是整个CCR中,唯一能让用户与Dispatcher交互的地方,所有外界想与Dispatcher交互都只能直接或间接通过DispatcherQueu来做,可见DispatcherQueue在整个CCR中处于“咽喉”之地。
二、预览 DispatcherQueue类内,主要实现了5部分的功能: 1、构造实例对象; public DispatcherQueue(); public DispatcherQueue(string name); public DispatcherQueue(string name, Dispatcher dispatcher); public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, double schedulingRate); public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, int maximumQueueDepth); 2、销毁相关资源;
public void Dispose(); protected virtual void Dispose(bool disposing);
3、任务进队、出队操作;
public virtual bool Enqueue(ITask task); public virtual bool TryDequeue(out ITask task);
4、队列状态控制;
public virtual void Suspend(); public virtual void Resume();
5、因果关系相关控制;
public virtual Timer EnqueueTimer(TimeSpan timeSpan, Port<DateTime> timerPort); 这5个功能中,常用的是前面4个,因此本基础篇就只讲这4个功能相关的函数的使用和实现原理,最后一个功能不常用,计划与Dispatcher内的因果关系部分放在后面作为高级篇细说。
三、构造函数 DispatcherQueue类内根据所使用的线程池的种类不同,而分为2类: 1、一类是使用CLR的线程池的构造函数: /// <summary> /// Default constructor /// 默认构造函数 /// </summary> public DispatcherQueue() : this("Unnamed queue using CLR Threadpool") { } /// <summary> /// Constructs an instance of the dispatcher port using the CLR thread pool for task execution /// 构建一个不使用CCR的线程池,而是使用CLR线程池执行任务的实例 /// </summary> /// <param name="name"> /// 名称 /// </param> public DispatcherQueue(string name) { this._taskQueue = new Store<ITask>(); this._timescale = 1.0; this._timerTable = new Dictionary<long, Timer>(); this._name = name; } 2、一类是使用CCR线程池(也即操作系统线程池)的构造函数: /// <summary> /// Constructs an instance of the dispatcher port using the specified CCR dispatcher /// </summary> /// <param name="name"></param> /// <param name="dispatcher"> /// 指定的Dispatcher对象 /// </param> public DispatcherQueue(string name, Dispatcher dispatcher) : this(name, dispatcher, TaskExecutionPolicy.Unconstrained, , 1.0) { } /// <summary> /// Constructs an instance of the dispatcher port using the specified CCR dispatcher /// </summary> /// <param name="name">Friendly name</param> /// <param name="dispatcher"> /// Dispatcher instance for executing tasks /// 执行任务的Dispatcher实例 /// </param> /// <param name="policy"> /// Task scheduling policy /// 任务调度策略 /// </param> /// <param name="schedulingRate"> /// Average desired scheduling rate, in tasks per second. /// 期望的任务平均调度速率(每秒执行几个任务) /// Only valid when appropriate policy is specified /// 仅当指定对应策略的时候才生效 /// </param> public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, double schedulingRate) : this(name, dispatcher, policy, , schedulingRate) { } /// <summary> /// Constructs an instance of the dispatcher port using the specified CCR dispatcher /// </summary> /// <param name="name"> /// Friendly name /// </param> /// <param name="dispatcher"> /// Dispatcher instance for executing tasks /// </param> /// <param name="policy"> /// Task scheduling policy /// </param> /// <param name="maximumQueueDepth"> /// Maximum number of pending tasks. /// 最大待处理任务数 /// Only valid when appropriate policy is specified /// 仅当指定对应策略的时候才生效 /// </param> public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, int maximumQueueDepth) : this(name, dispatcher, policy, maximumQueueDepth, 0.0) { } /// <summary> /// Constructs an instance of the dispatcher port using the specified CCR dispatcher /// </summary> /// <param name="name"> /// Friendly name /// </param> /// <param name="dispatcher"> /// Dispatcher instance for executing tasks /// </param> /// <param name="policy"> /// Task scheduling policy /// </param> /// <param name="maximumQueueDepth"> /// Maximum number of pending tasks. /// Only used when appropriate policy is specified /// </param> /// <param name="schedulingRate"> /// Average desired scheduling rate, in tasks per second. /// Only used when appropriate policy is specified /// </param> private DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, int maximumQueueDepth, double schedulingRate) { // 1.初始化 任务队列、时间刻度、定时器表 this._taskQueue = new Store<ITask>(); this._timescale = 1.0; this._timerTable = new Dictionary<long, Timer>(); // 2.初始化 任务调度策略、 if (dispatcher == null) { throw new ArgumentNullException("dispatcher"); } if (((policy == TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks) || (policy == TaskExecutionPolicy.ConstrainQueueDepthThrottleExecution)) && (maximumQueueDepth <= )) { throw new ArgumentOutOfRangeException("maximumQueueDepth"); } if (((policy == TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks) || (policy == TaskExecutionPolicy.ConstrainSchedulingRateThrottleExecution)) && (schedulingRate <= 0.0)) { throw new ArgumentOutOfRangeException("schedulingRate"); } this._dispatcher = dispatcher; this._name = name; this._policy = policy; this._maximumQueueDepth = maximumQueueDepth; this._maximumSchedulingRate = schedulingRate; // 3.把DispatcherQueue关联到指定的Dispatcher上 dispatcher.AddQueue(name, this); // 4.判断是否需要开启CCR秒表 if (policy >= TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks) { this._watch = CcrStopwatch.StartNew(); } }
四、资源释放函数 DispatcherQueue内含任务队列,因此也需要做资源的释放,而且改函数的调用应该在Diapatcher的Dispose函数调用之前,详细原因看代码: /// <summary> /// Dispose releases resources associated with this instance /// 释放DispatcherQueue内的相关资源 /// </summary> public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } /// <summary> /// Implementation of dispose /// </summary> /// <param name="disposing"></param> protected virtual void Dispose(bool disposing) { // 移除对应Dispatcher与当前DispatcherQueue的关联 if ((disposing && (this._dispatcher != null)) && !this._dispatcher.RemoveQueue(this._name)) { // 释放任务队列内未处理的任务 int elementCount = ; lock (this._taskQueue) { elementCount = this._taskQueue.ElementCount; this._taskQueue = null; } // 调整对应Dispatcher内的未处理任务数 this._dispatcher.AdjustPendingCount(-elementCount); } }
五、任务操作函数 用户除了可以通过Port向DispatcherQueue推入任务外,还可以自己生产ITask任务,然后把它推入DiapatcherQueue内调度执行。为此DispatcherQueue具备了进队、出队2个功能。而且前面第三篇所提到的任务调度策略,也是在进队这个函数内实现的。 /// <summary> /// Enqueue ITask instance /// 任务实例进队 /// </summary> /// <exception cref="T:Microsoft.Ccr.Core.PortNotFoundException"> /// Thrown if message type is not derived from ITask /// 若消息没有实现ITask接口,则抛出异常 /// </exception> /// <param name="task"> /// ITask instance /// 任务实例 /// </param> public virtual bool Enqueue(ITask task) { bool flag = true; // 1.空消息异常 if (task == null) { throw new ArgumentNullException("message"); } task.TaskQueue = this; // 2.CCR线程池 if (this._dispatcher == null) { lock (this._taskQueue) { this._scheduledTaskCount += 1L; } ThreadPool.QueueUserWorkItem(new WaitCallback(TaskExecutionWorker.ThreadPoolExecute), task); return flag; } // 3.任务进队列 if (this._taskQueue == null) { throw new ObjectDisposedException(typeof(DispatcherQueue).Name + ":" + this.Name); } lock (this._taskQueue) { // 4.根据不同的任务调度策略,来把任务插入队列不同的位置 int num; switch (this._policy) { case TaskExecutionPolicy.Unconstrained: this._taskQueue.ElementListAddLast(new PortElement<ITask>(task)); goto Label_0285; case TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks: if (this._taskQueue.ElementCount >= this._maximumQueueDepth) { Dispatcher.LogInfo( "DispatcherQueue.Enqueue: Discarding oldest task because queue depth limit reached"); this._taskQueue.ElementListRemoveFirst(); Interlocked.Decrement(ref this._dispatcher._pendingTaskCount); flag = false; } this._taskQueue.ElementListAddLast(new PortElement<ITask>(task)); goto Label_0285; case TaskExecutionPolicy.ConstrainQueueDepthThrottleExecution: if (this._taskQueue.ElementCount < this._maximumQueueDepth) { goto Label_0172; } Microsoft.Ccr.Core.Dispatcher.LogInfo( "DispatcherQueue.Enqueue: Forcing thread sleep because queue depth limit reached"); goto Label_015F; case TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks: this.RecalculateSchedulingRate(); if (this._currentSchedulingRate >= this._maximumSchedulingRate) { Microsoft.Ccr.Core.Dispatcher.LogInfo( "DispatcherQueue.Enqueue: Discarding task because task scheduling rate exceeded"); this._taskQueue.ElementListRemoveFirst(); Interlocked.Decrement(ref this._dispatcher._pendingTaskCount); this.RecalculateSchedulingRate(); flag = false; } this._scheduledItems++; this._taskQueue.ElementListAddLast(new PortElement<ITask>(task)); goto Label_0285; case TaskExecutionPolicy.ConstrainSchedulingRateThrottleExecution: this.RecalculateSchedulingRate(); if (this._currentSchedulingRate < this._maximumSchedulingRate) { goto Label_025E; } num = ; Microsoft.Ccr.Core.Dispatcher.LogInfo( "DispatcherQueue.Enqueue: Forcing thread sleep because task scheduling rate exceeded"); goto Label_0250; default: goto Label_0285; } Label_0142: Monitor.Exit(this._taskQueue); Thread.Sleep(); Monitor.Enter(this._taskQueue); Label_015F: if (this._taskQueue.ElementCount >= this._maximumQueueDepth) { goto Label_0142; } Label_0172: this._taskQueue.ElementListAddLast(new PortElement<ITask>(task)); goto Label_0285; Label_0219: Monitor.Exit(this._taskQueue); Thread.Sleep( + num); num *= ; if (num > 0x3e8) { num = 0x3e8; } Monitor.Enter(this._taskQueue); this.RecalculateSchedulingRate(); Label_0250: if (this._currentSchedulingRate > this._maximumSchedulingRate) { goto Label_0219; } Label_025E: this._scheduledItems++; this._taskQueue.ElementListAddLast(new PortElement<ITask>(task)); Label_0285: this._scheduledTaskCount += 1L; } this._dispatcher.Signal(); return flag; } // 重新计算调度速率 private void RecalculateSchedulingRate() { this._currentSchedulingRate = this._scheduledItems / this._watch.Elapsed.TotalSeconds; } /// <summary> /// Atomically removes an ITask instance if the port is non empty /// 若当前待执行任务队列不空,则原子地取走一个任务 /// </summary> /// <param name="task"> /// ITask instance if port is not empty. Null otherwise /// 任务实例 /// </param> /// <returns> /// True if port is not empty /// 若队列不空则返回True /// </returns> public virtual bool TryDequeue(out ITask task) { // 1.异常判断 if (this._dispatcher == null) { // 不支持CCR线程池 throw new InvalidOperationException(Resource1.DispatcherPortTestNotValidInThreadpoolMode); } if (this._taskQueue == null) { // 任务队列已经被释放了 throw new ObjectDisposedException(typeof(DispatcherQueue).Name + ":" + this.Name); } // 2.从任务队列的头部取一个任务 lock (this._taskQueue) { // 判断是否挂起 if (this._isSuspended) { task = null; return false; } if (this._taskQueue.ElementCount > ) { task = this._taskQueue.ElementListRemoveFirst().TypedItem; } else { task = null; return false; } } Interlocked.Decrement(ref this._dispatcher._pendingTaskCount); return true; }
六、运行状态控制函数 DispatcherQueue提供了挂起、恢复的操作,以调度线程池对任务的运行,不过要注意的是,挂起状态下,用户仍然可以向DispatcherQueue提交任务。 /// <summary> /// Suspend scheduling of tasks. Tasks can still be queued /// 挂起调度中的任务,但任务仍然保持排列 /// </summary> public virtual void Suspend() { lock (this._taskQueue) { this._isSuspended = true; } } /// <summary> /// Resumes execution of tasks, including any tasks queued while in paused state /// 恢复任务的执行,包含所有处于暂停状态的排队任务 /// </summary> public virtual void Resume() { lock (this._taskQueue) { this._isSuspended = false; } this._dispatcher.Signal(); }