我需要使用并发优先级队列,并且我正在考虑调整MSDN上How to: Add Bounding and Blocking Functionality to a Collection教程中提供的SimplePriorityQueue<TPriority, TValue>
示例。但是,令我惊讶的是,该样本似乎存在严重的错误。有人可以验证这些问题是否确实存在吗?
1)在TryAdd
和ToArray
之间存在种族冲突,这可能会导致从后者抛出ArgumentException
。 TryAdd
方法首先将一个项目添加到内部队列,然后递增m_count
计数器。另一方面,ToArray
首先初始化一个大小为m_count
的新数组,然后将内部队列复制到该数组上。如果在执行TryAdd
时调用了ToArray
,则ToArray
可能最终试图复制比其在数组中分配的空间更多的项目,从而导致 CopyTo
调用抛出ArgumentException
。
private ConcurrentQueue<KeyValuePair<int, TValue>>[] _queues;
private int m_count;
// ...
// IProducerConsumerCollection members
public bool TryAdd(KeyValuePair<int, TValue> item)
{
_queues[item.Key].Enqueue(item);
Interlocked.Increment(ref m_count);
return true;
}
public int Count
{
get { return m_count; }
}
public KeyValuePair<int, TValue>[] ToArray()
{
KeyValuePair<int, TValue>[] result;
lock (_queues)
{
result = new KeyValuePair<int, TValue>[this.Count];
// *** context switch here; new item gets added ***
int index = 0;
foreach (var q in _queues)
{
if (q.Count > 0)
{
q.CopyTo(result, index); // *** ArgumentException ***
index += q.Count;
}
}
return result;
}
}
2)
GetEnumerator
方法中存在另一个种族危险:内部队列的更新之间没有同步。public IEnumerator<KeyValuePair<int, TValue>> GetEnumerator()
{
for (int i = 0; i < priorityCount; i++)
{
foreach (var item in _queues[i])
yield return item;
}
}
考虑以下代码片段,该片段从队列中取出一个项目并以增加的优先级将其重新添加:
if (queue.TryTake(out item) && item.Key < maxPriority - 1)
queue.TryAdd(new KeyValuePair<int, string>(item.Key + 1, item.Value))
如果上述代码段是与枚举同时运行的,则可以期望该项目最多显示一次,要么是原始的,要么是优先级递增的,或者可能根本不出现。人们不会期望该项目在两个优先事项上都出现两次。但是,由于
GetEnumerator
顺序地对其内部队列进行迭代,因此它无法防止跨队列的这种顺序不一致。3)公共(public)
Count
属性可以返回陈旧的值,因为它读取共享的m_count
字段而没有任何内存限制。如果使用者在一个循环中访问此属性,而该循环不会生成自身的内存隔离,如下所示,则尽管其他线程将项目添加到了队列中,但它们仍可能陷入无限循环。while (queue.Count == 0)
{ }
在其他几篇文章中还讨论了读取共享变量时对内存隔离的需求:
4)在
_queues
数组的初始化与SimplePriorityQueue
构造函数的完成之间没有内存障碍。当另一个线程上的外部使用者调用TryAdd
并在初始化完成之前(或在其内存高速缓存中显示为已完成)访问_queues
时,可能会引起竞争危险。我在关于constructors and memory barriers的另一个问题中对此进行了进一步讨论。5)
TryTake
和ToArray
通过使用lock
关键字得到保护。除了不足(由于上面讨论的错误)之外,这还使设计并发集合的整个目的无法实现。鉴于其缺点,我认为最好的方法是将内部ConcurrentQueue
结构降级为纯Queue
,在各处添加锁,然后开始将其视为非并行但线程安全的结构。 最佳答案
我认为这取决于您将如何使用该类。如果将自己限制为TryAdd
和TryTake
(这是BlockingCollection<T>
所依赖的两个主要方面),那么您应该不会有任何问题,并且将拥有非常快的锁定最小优先级队列。
如果您开始使用Count
,CopyTo
或任何其他方法,则可能会遇到您指出的问题。
关于c# - MSDN上的SimplePriorityQueue示例中的严重错误,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38836447/