本文介绍了具有固定大小FIFO队列的生产者/消费者模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要围绕固定大小的FIFO队列实现生产者/消费者模式.我认为围绕ConcurrentQueue的包装器类可能对此有效,但我不确定(并且我之前从未使用过ConcurrentQueue).与此不同的是,队列仅需要容纳固定数量的项目(在我的情况下为字符串).我的应用程序将有一个生产者任务/线程和一个消费者任务/线程.当我的使用者任务运行时,它需要及时排出队列中存在的所有项目,并对其进行处理.

I need to implement the producer/consumer pattern around a fixed-size FIFO queue. I think a wrapper class around a ConcurrentQueue might work for this but I'm not completely sure (and I've never worked with a ConcurrentQueue before). The twist in this is that the queue needs to only hold a fixed number of items (strings, in my case). My application will have one producer task/thread and one consumer task/thread. When my consumer task runs, it needs to dequeue all of the items that exist in the queue at that moment in time and process them.

就其价值而言,我的客户处理排队的项目无非就是通过SOAP将其上传到并非100%可靠的Web应用程序.如果无法建立连接或调用SOAP调用失败,则应该丢弃这些项目,然后再返回队列以获取更多信息.由于SOAP的开销,我试图最大化可以在一个SOAP调用中发送的队列中的项目数.

For what it's worth, processing of the queued items by my consumer is nothing more than uploading them via SOAP to a web app that isn't 100% reliable. If the connection can't be established or the call SOAP call fails, I'm supposed to discard those items and go back to the queue for more. Because of the overhead of SOAP, I was trying to maximize the number of items from the queue that I could send in one SOAP call.

有时候,我的生产者添加商品的速度可能比我的消费者删除和处理商品的速度快.如果队列已满,并且生产者需要添加其他项目,则需要排队新项目,然后将最旧的项目出队,以使队列的大小保持固定.基本上,我需要一直保留队列中最新生产的商品(即使这意味着某些商品由于我的消费者当前正在处理以前的商品而没有被消耗).

At times, my producer may add items faster than my consumer is able to remove and process them. If the queue is already full and my producer needs to add another item, I need to enqueue the new item but then dequeue the oldest item so that the size of the queue remains fixed. Basically, I need to keep the most recent items that are produced in the queue at all time (even if it means some items don't get consumed because my consumer is currently processing previous items).

关于生产者在队列中的项目固定时保持编号的问题,我从这个问题中发现了一个潜在的想法:

With regard to the producer keeping the number if items in the queue fixed, I found one potential idea from this question:

固定大小的队列,该队列自动使旧队列出队新入伍者的价值观

我目前在ConcurrentQueue周围使用包装类(基于该答案),该包装类具有Enqueue()方法,如下所示:

I'm currently using a wrapper class (based on that answer) around a ConcurrentQueue with an Enqueue() method like this:

public class FixedSizeQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizeQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        // add item to the queue
        queue.Enqueue(obj);

        lock (this) // lock queue so that queue.Count is reliable
        {
            while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
            {
                T objOut;
                queue.TryDequeue(out objOut);
            }
        }
    }
}

我创建此类的实例,并在队列上设置大小限制,如下所示:

I create an instance of this class with a size limit on the queue like this:

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit

我启动了生产者任务,它开始填充队列.当添加项目导致队列计数超过最大大小时,我的Enqueue()方法中的代码对于从队列中删除最旧的项目似乎正常工作.现在,我需要执行我的消费者任务来使项目出队并进行处理,但这是我的大脑感到困惑的地方.为我的使用者实现Dequeue方法的最佳方法是什么,该方法将在某个时刻拍摄队列快照并使所有项目出队(处理程序在此过程中生产者可能仍将项目添加到队列中)?

I start up my producer task and it begins filling the queue. The code in my Enqueue() method seems to be working properly with regard to removing the oldest item from the queue when adding an item causes the queue count to exceed the max size. Now I need my consumer task to dequeue items and process them but here's where my brain gets confused. What's the best way to implement a Dequeue method for my consumer that will take a snapshot of the queue at a moment in time and dequeue all items for processing (the producer may still be adding items to the queue during this process)?

推荐答案

简单地说,ConcurrentQueue有一个"ToArray"方法,当输入该方法时,它将锁定集合并生成队列中所有当前项目的快照" .如果希望为消费者提供一些需要处理的东西,则可以在返回数组之前,锁定入队方法具有的相同对象,调用ToArray(),然后旋转while(!queue.IsEmpty) queue.TryDequeue(out trash)循环以清除队列.您提取了.

Simply stated, the ConcurrentQueue has a "ToArray" method which, when entered, will lock the collection and produce a "snapshot" of all current items in the queue. If you want your consumer to be given a block of things to work on, you can lock the same object the enqueueing method has, Call ToArray(), and then spin through a while(!queue.IsEmpty) queue.TryDequeue(out trash) loop to clear the queue, before returning the array you extracted.

这将是您的GetAll()方法:

public T[] GetAll()
{
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
    {
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);
    }
}

由于您必须清除队列,因此可以简单地将这两个操作组合在一起;创建一个适当大小的数组(使用queue.Count),然后在队列不为空的情况下,使一个项目出队并将其放入数组中,然后再返回.

Since you have to clear out the queue, you could simply combine the two operations; create an array of the proper size (using queue.Count), then while the queue is not empty, Dequeue an item and put it in the array, before returning.

现在,这就是特定问题的答案.现在,我必须出于良心戴上我的CodeReview.SE帽子,并指出一些要点:

Now, that's the answer to the specific question. I must now in good conscience put on my CodeReview.SE hat and point out a few things:

  • 从不使用lock(this).您永远不知道还有哪些其他对象会将您的对象用作锁定焦点,因此当该对象从内部锁定自身时会被阻止.最佳实践是锁定一个私有范围的对象实例,通常是一个被锁定的对象实例:private readonly object syncObj = new object();

  • NEVER use lock(this). You never know what other objects may be using your object as a locking focus, and thus would be blocked when the object locks itself from the inside. The best practice is to lock a privately scoped object instance, usually one created just to be locked: private readonly object syncObj = new object();

由于您还是要锁定包装程序的关键部分,因此我将使用普通的List<T>而不是并发集合.访问速度更快,更容易清除,因此您可以比ConcurrentQueue允许的操作简单得多.要排队,请在索引零之前锁定同步对象Insert(),然后使用RemoveRange()从索引Size到列表的当前Count中删除所有项目.要出队,请锁定相同的同步对象,从Linq名称空间调用myList.ToArray();与ConcurrentQueue的功能几乎相同),然后在返回数组之前调用myList.Clear().再简单不过了:

Since you're locking critical sections of your wrapper anyway, I would use an ordinary List<T> instead of a concurrent collection. Access is faster, it's more easily cleaned out, so you'll be able to do what you're doing much more simply than ConcurrentQueue allows. To enqueue, lock the sync object, Insert() before index zero, then remove any items from index Size to the list's current Count using RemoveRange(). To dequeue, lock the same sync object, call myList.ToArray() (from the Linq namespace; does pretty much the same thing as ConcurrentQueue's does) and then call myList.Clear() before returning the array. Couldn't be simpler:

public class FixedSizeQueue<T>
{
private readonly List<T> queue = new List<T>();
private readonly object syncObj = new object();

public int Size { get; private set; }

public FixedSizeQueue(int size) { Size = size; }

public void Enqueue(T obj)
{
    lock (syncObj)
    {
        queue.Insert(0,obj)
        if(queue.Count > Size)
           queue.RemoveRange(Size, Count-Size);
    }
}

public T[] Dequeue()
{
    lock (syncObj)
    {
        var result = queue.ToArray();
        queue.Clear();
        return result;
    }
}
}

  • 您似乎知道您正在使用此模型丢弃排队的物品.这通常不是一件好事,但我愿意为您带来疑问的好处.但是,我要说的是,使用BlockingCollection实现这一目标的方法是无损的. BlockingCollection可包装任何IProducerConsumerCollection,包括大多数System.Collections.Concurrent类,并允许您指定队列的最大容量.然后,该集合将阻止尝试从空队列中出队的任何线程,或试图添加到完整队列中的任何线程,直到已添加或删除项以致于需要插入或插入空间为止.这是实现具有最大大小的生产者-消费者队列的最佳方法,否则将需要进行轮询"以查看是否有需要处理的东西.如果您走这条路线,那么只有消费者必须扔掉的东西才会被扔掉.消费者将看到生产者放入的所有行,并对每个行做出自己的决定.

  • You seem to understand that you are throwing enqueued items away using this model. That's usually not a good thing, but I'm willing to give you the benefit of the doubt. However, I will say there is a lossless way to achieve this, using a BlockingCollection. A BlockingCollection wraps any IProducerConsumerCollection including most System.Collections.Concurrent classes, and allows you to specify a maximum capacity for the queue. The collection will then block any thread attempting to dequeue from an empty queue, or any thread attempting to add to a full queue, until items have been added or removed such that there is something to get or room to insert. This is the best way to implement a producer-consumer queue with a maximum size, or one that would otherwise require "polling" to see if there's something for the consumer to work on. If you go this route, only the ones the consumer has to throw away are thrown away; the consumer will see all the rows the producer puts in and makes its own decision about each.

    这篇关于具有固定大小FIFO队列的生产者/消费者模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

  • 07-25 07:04