我需要使用并发优先级队列,并且我正在考虑调整MSDN上How to: Add Bounding and Blocking Functionality to a Collection教程中提供的SimplePriorityQueue<TPriority, TValue>示例。但是,令我惊讶的是,该样本似乎存在严重的错误。有人可以验证这些问题是否确实存在吗?

1)TryAddToArray之间存在种族冲突,这可能会导致从后者抛出ArgumentExceptionTryAdd方法首先将一个项目添加到内部队列,然后递增m_count计数器。另一方面,ToArray首先初始化一个大小为m_count的新数组,然后将内部队列复制到该数组上。如果在执行TryAdd时调用了ToArray,则ToArray可能最终试图复制比其在数组中分配的空间更多的项目,从而导致 CopyTo 调用抛出ArgumentException

private ConcurrentQueue<KeyValuePair<int, TValue>>[] _queues;
private int m_count;

// ...

// IProducerConsumerCollection members
public bool TryAdd(KeyValuePair<int, TValue> item)
{
    _queues[item.Key].Enqueue(item);
    Interlocked.Increment(ref m_count);
    return true;
}

public int Count
{
    get { return m_count; }
}

public KeyValuePair<int, TValue>[] ToArray()
{
    KeyValuePair<int, TValue>[] result;

    lock (_queues)
    {
        result = new KeyValuePair<int, TValue>[this.Count];
        // *** context switch here; new item gets added ***
        int index = 0;
        foreach (var q in _queues)
        {
            if (q.Count > 0)
            {
                q.CopyTo(result, index);   // *** ArgumentException ***
                index += q.Count;
            }
        }
        return result;
    }
}

2) GetEnumerator方法中存在另一个种族危险:内部队列的更新之间没有同步。
public IEnumerator<KeyValuePair<int, TValue>> GetEnumerator()
{
    for (int i = 0; i < priorityCount; i++)
    {
        foreach (var item in _queues[i])
            yield return item;
    }
}

考虑以下代码片段,该片段从队列中取出一个项目并以增加的优先级将其重新添加:
if (queue.TryTake(out item) && item.Key < maxPriority - 1)
    queue.TryAdd(new KeyValuePair<int, string>(item.Key + 1, item.Value))

如果上述代码段是与枚举同时运行的,则可以期望该项目最多显示一次,要么是原始的,要么是优先级递增的,或者可能根本不出现。人们不会期望该项目在两个优先事项上都出现两次。但是,由于GetEnumerator顺序地对其内部队列进行迭代,因此它无法防止跨队列的这种顺序不一致。

3)公共(public)Count属性可以返回陈旧的值,因为它读取共享的m_count字段而没有任何内存限制。如果使用者在一个循环中访问此属性,而该循环不会生成自身的内存隔离,如下所示,则尽管其他线程将项目添加到了队列中,但它们仍可能陷入无限循环。
while (queue.Count == 0)
{ }

在其他几篇文章中还讨论了读取共享变量时对内存隔离的需求:
  • How to correctly read an Interlocked.Increment'ed int field?
  • Reading an int that's updated by Interlocked on other threads
  • Do concurrent interlocked and reads require a memory barrier or locking?

  • 4)_queues数组的初始化与SimplePriorityQueue构造函数的完成之间没有内存障碍。当另一个线程上的外部使用者调用TryAdd并在初始化完成之前(或在其内存高速缓存中显示为已完成)访问_queues时,可能会引起竞争危险。我在关于constructors and memory barriers的另一个问题中对此进行了进一步讨论。

    5) TryTakeToArray通过使用lock关键字得到保护。除了不足(由于上面讨论的错误)之外,这还使设计并发集合的整个目的无法实现。鉴于其缺点,我认为最好的方法是将内部ConcurrentQueue结构降级为纯Queue,在各处添加锁,然后开始将其视为非并行但线程安全的结构。

    最佳答案

    我认为这取决于您将如何使用该类。如果将自己限制为TryAddTryTake(这是BlockingCollection<T>所依赖的两个主要方面),那么您应该不会有任何问题,并且将拥有非常快的锁定最小优先级队列。

    如果您开始使用CountCopyTo或任何其他方法,则可能会遇到您指出的问题。

    关于c# - MSDN上的SimplePriorityQueue示例中的严重错误,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38836447/

    10-11 01:12