我有一个线程,它创建可变数量的工作线程并在它们之间分配任务。通过向线程传递TaskQueue对象可以解决此问题,您将在下面看到其实现。

这些工作线程只是简单地遍历给定的TaskQueue对象,从而执行每个任务。

private class TaskQueue : IEnumerable<Task>
{
    public int Count
    {
        get
        {
            lock(this.tasks)
            {
                return this.tasks.Count;
            }
        }
    }

    private readonly Queue<Task> tasks = new Queue<Task>();
    private readonly AutoResetEvent taskWaitHandle = new AutoResetEvent(false);

    private bool isFinishing = false;
    private bool isFinished = false;

    public void Enqueue(Task task)
    {
        Log.Trace("Entering Enqueue, lock...");
        lock(this.tasks)
        {
            Log.Trace("Adding task, current count = {0}...", Count);
            this.tasks.Enqueue(task);

            if (Count == 1)
            {
                Log.Trace("Count = 1, so setting the wait handle...");
                this.taskWaitHandle.Set();
            }
        }
        Log.Trace("Exiting enqueue...");
    }

    public Task Dequeue()
    {
        Log.Trace("Entering Dequeue...");
        if (Count == 0)
        {
            if (this.isFinishing)
            {
                Log.Trace("Finishing (before waiting) - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }

            Log.Trace("Count = 0, lets wait for a task...");
            this.taskWaitHandle.WaitOne();
            Log.Trace("Wait handle let us through, Count = {0}, IsFinishing = {1}, Returned = {2}", Count, this.isFinishing);

            if(this.isFinishing)
            {
                Log.Trace("Finishing - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }
        }

        Log.Trace("Entering task lock...");
        lock(this.tasks)
        {
            Log.Trace("Entered task lock, about to dequeue next item, Count = {0}", Count);
            return this.tasks.Dequeue();
        }
    }

    public void Finish()
    {
        Log.Trace("Setting TaskQueue state to isFinishing = true and setting wait handle...");
        this.isFinishing = true;

        if (Count == 0)
        {
            this.taskWaitHandle.Set();
        }
    }

    public IEnumerator<Task> GetEnumerator()
    {
        while(true)
        {
            Task t = Dequeue();
            if(this.isFinished)
            {
                yield break;
            }

            yield return t;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

如您所见,我正在使用 AutoResetEvent 对象来确保工作线程不会过早退出,即在执行任何任务之前。

简而言之:
  • 主线程通过将任务Enqeueue到其TaskQueue中来将任务分配给线程
  • 主线程通过调用TaskQueue的Finish()方法
  • 来通知不再执行任务的线程
  • 工作线程通过调用TaskQueue的Dequeue()方法
  • 检索分配给它的下一个任务

    问题在于,出队()方法通常抛出InvalidOperationException ,称队列为空。如您所见,我添加了一些日志记录,事实证明,即使没有对其Set()方法的调用,AutoResetEvent也不会阻止Dequeue()。

    据我了解,调用AutoResetEvent.Set()将允许一个等待线程继续运行(该线程先前称为AutoResetEvent.WaitOne()),然后自动调用AutoResetEvent.Reset(),从而阻止了下一个服务员。

    那么有什么问题呢?我做错什么了吗?我在某处有错误吗?
    我现在在上面坐了3个小时,但我不知道出了什么问题。
    请帮我!

    非常感谢你!

    最佳答案

    您的出队代码不正确。您检查“计数”是否处于锁定状态,然后沿着裤子的接缝飞行,然后您期望任务中会有一些东西。释放锁时,您无法保留假设:)。您的计数检查和任务。出站必须在锁定下发生:

    bool TryDequeue(out Tasks task)
    {
      task = null;
      lock (this.tasks) {
        if (0 < tasks.Count) {
          task = tasks.Dequeue();
        }
      }
      if (null == task) {
        Log.Trace ("Queue was empty");
      }
      return null != task;
     }
    

    您的Enqueue()代码同样充满问题。入队/出队不能确保进度(即使队列中有项目,出队线程也会被阻塞等待)。您的Enqueue()签名错误。总体而言,您的帖子的代码非常非常糟糕。坦白说,我认为您正在尝试的咀嚼能力超过了您在这里可以忍受的能力...哦,永远不要登录锁。

    我强烈建议您只使用ConcurrentQueue

    如果您无权访问.Net 4.0,则可以通过以下实现开始:
    public class ConcurrentQueue<T>:IEnumerable<T>
    {
        volatile bool fFinished = false;
        ManualResetEvent eventAdded = new ManualResetEvent(false);
        private Queue<T> queue = new Queue<T>();
        private object syncRoot = new object();
    
        public void SetFinished()
        {
            lock (syncRoot)
            {
                fFinished = true;
                eventAdded.Set();
            }
        }
    
        public void Enqueue(T t)
        {
            Debug.Assert (false == fFinished);
            lock (syncRoot)
            {
                queue.Enqueue(t);
                eventAdded.Set();
            }
        }
    
        private bool Dequeue(out T t)
        {
            do
            {
                lock (syncRoot)
                {
                    if (0 < queue.Count)
                    {
                        t = queue.Dequeue();
                        return true;
                    }
                    if (false == fFinished)
                    {
                        eventAdded.Reset ();
                    }
                }
                if (false == fFinished)
                {
                    eventAdded.WaitOne();
                }
                else
                {
                    break;
                }
            } while (true);
            t = default(T);
            return false;
        }
    
    
        public IEnumerator<T> GetEnumerator()
        {
            T t;
            while (Dequeue(out t))
            {
                yield return t;
            }
        }
    
        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }
    }
    

    关于c# - AutoResetEvent没有正确阻止,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/3568432/

    10-11 17:05