我有4个线程。一种是从网络中读取一些信息,然后将其写入变量中,并应在每条信息之后发出信号。他们中的3个正在读取此变量,应该准确地读取一次。当前的解决方案是编写者在写入事件后设置事件,然后等待读取者事件。读者等待事件,然后阅读并设置他们的事件(即他们所阅读的意思)。问题是读者可以阅读多次,而我也有重复。我如何才能实现规则,使读者只能阅读一次?

最佳答案

实现此目的的一种方法如下

数据在线程之间共享为一个单链表。列表中的每个节点都可以是标记或具有数据。列表从键入标记的单个节点开始。读取数据后,将形成一个新列表,该列表具有一系列数据节点,后跟一个标记。该列表被附加到添加到列表的最新标记中。

每个阅读器线程均以对原始标记节点的引用和一个AutoResetEvent开始。每当写入器中出现新数据时,它将为每个读取器线程发送AutoResetEvent信号。然后,读取器线程将简单地走直到找到没有下一个节点的标记。

此方案可确保所有阅读器仅看到一次数据。最大的麻烦是构造列表,以便可以以无锁方式将其写入和读取。虽然这是Interlocked.CompareExchange相当简单

链表类型

class Node<T> {
  public bool IsMarker;
  public T Data;
  public Node<T> Next;
}

样本作者类型
class Writer<T> {
  private List<AutoResetEvent> m_list;
  private Node<T> m_lastMarker;

  public Writer(List<AutoResetEvent> list, Node<T> marker) {
    m_lastMarker = marker;
    m_list = list;
  }

  // Assuming this can't overlap.  If this can overload then you will
  // need synchronization in this method around the writing of
  // m_lastMarker
  void OnDataRead(T[] items) {
    if (items.Length == 0) {
      return;
    }

    // Build up a linked list of the new data followed by a
    // Marker to signify the end of the data.
    var head = new Node<T>() { Data = items[0] };
    var current = head;
    for (int i = 1; i < items.Length; i++) {
      current.Next = new Node<T>{ Data = items[i] };
      current = current.Next;
    }
    var marker = new Node<T> { IsMarker = true };
    current.Next = marker;

    // Append the list to the end of the last marker node the writer
    // created
    m_lastMarker.Next = head;
    m_lastMarker = marker;

    // Tell each of the readers that there is new data
    foreach (var e in m_list) {
      e.Set();
    }
  }
}

sample 阅读器类型
class Reader<T> {
  private AutoResetEvent m_event;
  private Node<T> m_marker;

  void Go() {
    while(true) {
      m_event.WaitOne();
      var current = m_marker.Next;
      while (current != null) {
        if (current.IsMarker) {
          // Found a new marker.  Always record the marker because it may
          // be the last marker in the chain
          m_marker = current;
        } else {
          // Actually process the data
          ProcessData(current.Data);
        }
        current = current.Next;
      }
    }
  }
}

09-30 15:08
查看更多