问题描述
我已经想出了一些code消耗从队列中的所有瓦亭项目。而不是1加工项目1,是有意义的处理都在等着项目为一组。
I've come up with some code to consume all wating items from a queue. Rather than processing the items 1 by 1, it makes sense to process all waiting items as a set.
我宣布我的队列是这样的。
I've declared my queue like this.
private BlockingCollection<Item> items =
new BlockingCollection<Item>(new ConcurrentQueue<Item>);
然后,在一个消费者线程,我打算读的项目在这样的批次,
Then, on a consumer thread, I plan to read the items in batches like this,
Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
var workToDo = new List<Item>();
workToDo.Add(nextItem);
while(this.items.TryTake(out nextItem))
{
workToDo.Add(nextItem);
}
// process workToDo, then go back to the queue.
}
此方法缺乏 GetConsumingEnumerable
,我不禁想,如果我错过了一个更好的办法,如果我的做法是错误的。
This approach lacks the utility of
GetConsumingEnumerable
and I can't help wondering if I've missed a better way, or if my approach is flawed.
有没有更好的方式来消耗
BlockingCollection&LT; T&GT;
分批
Is there a better way to consume a
BlockingCollection<T>
in batches?
推荐答案
虽然不如
ConcurrentQueue&LT; T&GT;
在某些方面,我自己的 LLQueue&LT; T&GT;
允许成批出列与 AtomicDequeueAll 的方法,其中,当前的所有项目的队列,从它发生在一个单一的(原子和线程安全的)的操作,并随后在非线程收集消费由单个线程。这种方法的目的是precisely为要批量读取操作的方案。
While not as good as
ConcurrentQueue<T>
in some ways, my own LLQueue<T>
allows for a batched dequeue with a AtomicDequeueAll method where all items currently on the queue are taken from it in a single (atomic and thread-safe) operation, and are then in a non-threadsafe collection for consumption by a single thread. This method was designed precisely for the scenario where you want to batch the read operations.
这是不堵,虽然,虽然它可以被用来创建一个阻塞收集足够容易:
This isn't blocking though, though it could be used to create a blocking collection easily enough:
public BlockingBatchedQueue<T>
{
private readonly AutoResetEvent _are = new AutoResetEvent(false);
private readonly LLQueue<T> _store;
public void Add(T item)
{
_store.Enqueue(item);
_are.Set();
}
public IEnumerable<T> Take()
{
_are.WaitOne();
return _store.AtomicDequeueAll();
}
public bool TryTake(out IEnumerable<T> items, int millisecTimeout)
{
if(_are.WaitOne(millisecTimeout))
{
items = _store.AtomicDequeueAll();
return true;
}
items = null;
return false;
}
}
这是一个起点,没有做到以下几点:
That's a starting point that doesn't do the following:
在应对处置时挂起等待的读者。
在担心与多个阅读器既是一个潜在的竞争正在由一个写发生,而一个正在读触发(它只是认为偶尔空的结果枚举会好起来的)。
将任何上限写作。
所有这一切都可以增加太多,但我想保持到最低限度的一些实际应用,有希望不在上述规定的限制马车。
All of which could be added too, but I wanted to keep to the minimum of some practical use, that hopefully isn't buggy within the defined limitations above.
这篇关于如何消耗BlockingCollection&LT; T&GT;分批的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!