转载至 https://www.codeproject.com/Articles/28785/Thread-synchronization-Wait-and-Pulse-demystified
/*
* 这是一个完整源代码示例,演示了此模式的多功能性。它实现了一个可以停止的阻塞队列。
* 阻塞队列是固定大小的队列。如果队列已满,则尝试添加项目将被阻止。
* 如果队列为空,则尝试删除项目将阻止。
* 当Quit()被调用时,队列停止。这意味着您无法再添加任何项目,但可以删除现有项目,直到队列为空。此时,队列已完成。
* 这是一组非常复杂的条件。您可以使用更高级别的结构组合来实现它,但它会更难。该模式使得该实现相对简单。
*
*/
using System;
using System.Threading;
using System.Collections.Generic; namespace TestBlockQueue
{
public class BlockingQueue<T>
{
readonly int _Size = ;
readonly Queue<T> _Queue = new Queue<T>();
readonly object _Key = new object();
bool _Quit = false; public BlockingQueue(int size)
{
_Size = size;
} public void Quit()
{
lock (_Key)
{
_Quit = true; Monitor.PulseAll(_Key);
}
} public bool Enqueue(T t)
{
lock (_Key)
{
while (!_Quit && _Queue.Count >= _Size) Monitor.Wait(_Key); if (_Quit) return false; _Queue.Enqueue(t); Monitor.PulseAll(_Key);
} return true;
} public bool Dequeue(out T t)
{
t = default(T); lock (_Key)
{
while (!_Quit && _Queue.Count == ) Monitor.Wait(_Key); if (_Queue.Count == ) return false; t = _Queue.Dequeue(); Monitor.PulseAll(_Key);
} return true;
}
} /// <summary>
/// 对于任意数量的生产者和消费者的并发访问,此实现是安全的。以下是一个快速生产者和两个慢速消费者的示例
/// </summary>
public class Program
{
public static void Main(string[] args)
{
var q = new BlockingQueue<int>(); // Producer
new Thread(() =>
{
for (int x = ; ; x++)
{
if (!q.Enqueue(x)) break;
Console.WriteLine(x.ToString("") + " >");
}
Console.WriteLine("Producer quitting");
}).Start(); // Consumers
for (int i = ; i < ; i++)
{
new Thread(() =>
{
for (; ; )
{
Thread.Sleep();
int x = ;
if (!q.Dequeue(out x)) break;
Console.WriteLine(" < " + x.ToString(""));
}
Console.WriteLine("Consumer quitting");
}).Start();
} Thread.Sleep(); Console.WriteLine("Quitting"); q.Quit();
}
}
}