问题描述
(注意:我使用的是.NET 4中,的不的.NET 4.5,所以我不能使用TPL的DataflowBlock班)
(NOTE: I'm using .Net 4, not .Net 4.5, so I cannot use the TPL's DataflowBlock classes.)
TL; DR版本
最后,我只是在寻找一种方式来处理使用多线程的方式,preserves他们在最后的输出顺序,连续的工作项目,而不需要一个无限输出缓冲。
Ultimately, I'm just looking for a way to process sequential work items using multiple threads in a way that preserves their order in the final output, without requiring an unbounded output buffer.
激励
我有现有的code提供加工,其中一个I / O密集型线程(以下简称供应商)是reponsible入队对数据块进行处理多个数据块多线程机制。数据的这些模块包括工作项目。
I have existing code to provide a multithreaded mechanism for processing multiple blocks of data where one I/O-bound thread (the "supplier") is reponsible for enqueuing blocks of data for processing. These blocks of data comprise the work items.
一个或多个线程(以下简称处理器)负责在同一时间,他们处理,然后出队他们的下一个工作项目之前写入处理的数据输出队列出队一部作品的项目。
One or more threads (the "processors") are responsible for dequeuing one work item at a time, which they process and then write the processed data to an output queue before dequeuing their next work item.
最后一个I / O密集型线程(消费者)是负责从输出队列出队完成的工作项目,并将其写入到最终目的地。这些工作项目(且必须)写在相同的顺序,它们被添加到队列。我实现此使用并发优先队列,其中每个项的优先级由其源索引定义
A final I/O-bound thread (the "consumer") is responsible for dequeuing completed work items from the output queue and writing them to the final destination. These work items are (and must be) written in the same order that they were enqueued. I implemented this using a concurrent priority queue, where the priority of each item is defined by its source index.
我用这个方案做一个大的数据流,其中COM pression本身就比较慢了一些自定义的COM pression但uncom $ P $的阅读pssed数据的写入在COM pressed数据是比较快的(虽然I / O限制)。
I'm using this scheme to do some custom compression on a large data stream, where the compression itself is relatively slow but the reading of the uncompressed data and the writing of the compressed data is relatively fast (although I/O-bound).
我处理中的64K的顺序的相当大的块中的数据,因此,管道的开销相对较小。
I process the data in fairly large chunks of the order of 64K, so the overhead of the pipeline is relatively small.
我目前的解决方案运作良好,但它涉及到很多使用多个同步事件写入6年前定制code,以及设计似乎有点笨重;因此,我已走上学术锻炼; Tibial,看看它是否可以用更现代的.NET库被改写。
My current solution is working well but it involves a lot of custom code written 6 years ago using many synchronisation events, and the design seems somewhat clunky; therefore I have embarked on academic excercise to see if it can be rewritten using more modern .Net libraries.
新的设计
我的新设计采用了 BlockingCollection<>
类,并且是基于上有所的。
My new design uses the BlockingCollection<>
class, and is based somewhat on this Microsoft article.
在特定的,看一节题为负载平衡使用多个生产者的。我一直在使用这种方法试过,所以我有几个处理任务每个从共享输入BlockingCollection需要的工作项目,并将它的完成项目自身BlockingCollection输出队列。
In particular, look at the section entitled Load Balancing Using Multiple Producers. I have tried using that approach, and therefore I have several processing tasks each of which takes work items from a shared input BlockingCollection and writes its completed items to its own BlockingCollection output queue.
由于每个处理任务都有自己的输出队列,我试图用 BlockingCollection.TakeFromAny()
出队的第一个可用完成的工作项目。
Because each processing task has its own output queue, I'm trying to use BlockingCollection.TakeFromAny()
to dequeue the first available completed work item.
复用器问题
到目前为止好,但现在这里来的问题。微软文章说:
So far so good, but now here comes the problem. The Microsoft article states:
的间隙是存在的问题。管道中,显示图像的阶段,下一阶段的需要显示在顺序和没有该序列中的间隙的图像。这是多路转换器的用武之地。使用TakeFromAny方法,多路复用等待来自两个滤波器级制片队列输入。当图像到达时,多路转换器查看是否图像的顺序号是在预期的序列中的下一个。如果是,则该多路转换器将其传送到显示图像的阶段。如果该图像不是序列中的下一个,该多路转换器保持在一个内部先行缓存器的值,并重复对不具有先行值输入队列的取操作。该算法允许多路复用器放在一起以确保顺序不排序值的方式从进入生产队列中的输入。
好了,会发生什么情况是,处理任务可以产生pretty太多任意顺序完成的项目。多路转换器是负责以正确的顺序输出这些产品
Ok, so what happens is that the processing tasks can produce finished items in pretty much any order. The multiplexer is responsible for outputting these items in the correct order.
不过...
假设我们有1000个项目进行处理。进一步假设,对于一些奇怪的原因,第一个项目需要更长的时间来处理所有其他项目结合起来。
Imagine that we have 1000 items to process. Further imagine that for some weird reason, the very first item takes longer to process that all the other items combined.
使用我目前的计划,多路复用器将保持阅读和缓冲项目从所有的处理输出队列,直到找到它应该输出下一个。由于其等待的是该项目(根据我的想象一下,如果上面)只会出现其他所有工作项目已被处理之后,我将有效地缓冲在整个输入的所有工作项目!
Using my current scheme, the multiplexer will keep reading and buffering items from all the processing output queues until it finds the next one that it's supposed to output. Since the item that its waiting for is (according to my "imagine if" above) only going to appear after ALL the other work items have been processed, I will effectively be buffering all the work items in the entire input!
数据量是远远太大,允许这种情况发生。我需要能够从输出完成的工作项停止处理任务时的输出队列已达到一定的最大大小(即它是一个有界输出队列),除非该工作项恰好是一个多路转换器在等待。
The amount of data is way too large to allow this to happen. I need to be able to stop the processing tasks from outputting completed work items when the output queue has reached a certain maximum size (i.e. it's a bounded output queue) UNLESS the work item happens to be the one the multiplexer is waiting for.
而这正是我变得有点卡住了。我能想到的实际执行这个方法很多,但他们似乎都过于复杂,他们是不超过code我想更换更好的程度!
And that's where I'm getting a bit stuck. I can think of many ways to actually implement this, but they all seem to be over-complex to the extent that they are no better than the code I'm thinking to replace!
什么是我的问题吗?
我的问题是:我要对这个正确的方式
My question is: Am I going about this the right way?
我还以为这将是一个很好理解的问题,但我的研究才止跌回升,似乎如果相比于其他所有工作项目的工作项目需要很长的时间来忽略发生的无界缓冲问题的文章
I would have thought this would be a well-understood problem, but my research has only turned up articles that seem to ignore the unbounded buffering problem that occurs if a work item takes a very long time compared to all the other work items.
任何人都可以点我在描述一个合理的方式实现这一目标的任何文章?
Can anyone point me at any articles that describe a reasonable way to achieve this?
TL; DR版本
最后,我只是在寻找一种方式来处理使用多线程的方式,preserves他们在最后的输出顺序,连续的工作项目,而不需要一个无限输出缓冲。
Ultimately, I'm just looking for a way to process sequential work items using multiple threads in a way that preserves their order in the final output, without requiring an unbounded output buffer.
推荐答案
在启动时创建,1000项池,说。在BlockingCollection存储他们 - 一个池队列
Create a pool of items at startup, 1000, say. Store them on a BlockingCollection - a 'pool queue'.
供应商从池队列获取项目,从文件中加载它们,加载在序列号/不管,并将其提交给处理器线程池。
The supplier gets items from the pool queue, loads them from the file, loads in the sequence-number/whatever and submits them to the processors threadpool.
该处理器做自己的东西,并发送输出到多路复用器。多路复用器做它存储了序任何项目,直到较早的项目已处理的工作。
The processors do their stuff and sends the output to the multiplexer. The multiplexer does it job of storing any out-of-order items until earlier items have been processed.
当一个项目已通过任何复用器输出被完全消耗,它们将返回到由供应商重新使用池队列
When an item has been completely consumed by whatever the multiplexer outputs to, they are returned to the pool queue for re-use by the supplier.
如果一个人缓慢的项目'确实需要处理巨额,乱序集合中的多路复用器将在其他池中的线程通过成长为快速项的滑,但由于多路实际上并不是喂养其各色其输出,池队列没有被补充。
If one 'slow item' does require enormous amounts of processing, the out-of-order collection in the multiplexer will grow as the 'quick items' slip through on the other pool threads, but because the multiplexer is not actually feeding its items to its output, the pool queue is not being replenished.
在池清空,供应商将阻止它,将无法提供任何更多的项目。
When the pool empties, the supplier will block on it and will be unable to supply any more items.
快速项目剩余的处理池投入将得到处理,然后将处理停止除了慢件。供应商受阻,多路复用器具有[poolSize-1]其集合中的项目。没有额外的内存被使用,没有CPU是被浪费掉的,发生的唯一的事情就是'慢项目'的处理。
The 'quick items' remaining on the processing pool input will get processed and then processing will stop except for the 'slow item'. The supplier is blocked, the multiplexer has [poolSize-1] items in its collection. No extra memory is being used, no CPU is being wasted, the only thing happening is the processing of the 'slow item'.
在'慢项目终于完成,它会输出到多路复用器。
When the 'slow item' is finally done, it gets output to the multiplexer.
多路复用器现在可以输出所有[poolSize]在规定的顺序项目。由于这些项目被消耗,池被重新填平,供应商,现在可以从池中获取项目,运行在再次阅读了文件中的排队项目到处理器池。
The multiplexer can now output all [poolSize] items in the required sequential order. As these items are consumed, the pool gets filled up again and the supplier, now able to get items from the pool, runs on, again reading its file an queueing up items to the processor pool.
自动调节,不需要界缓冲区,无记忆失控。
Auto-regulating, no bounded buffers required, no memory runaway.
编辑:我的意思是'不要求有界缓冲区:)
I meant 'no bounded buffers required' :)
此外,没有GC含率 - 由于物品被重新使用时,它们并不需要GC'ing
Also, no GC holdups - since the items are re-used, they don't need GC'ing.
这篇关于管道,多路复用和无界缓冲的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!