(注意:我使用的是.Net 4,而不是.Net 4.5,所以我不能使用TPL的DataflowBlock类。)
TL; DR版本
最终,我只是在寻找一种使用多个线程来处理顺序工作项的方式,该方式可以在最终输出中保留其顺序,而无需无限制的输出缓冲区。
动机
我现有的代码提供了一种用于处理多个数据块的多线程机制,其中一个I/O绑定(bind)线程(“供应商”)负责将数据块排队以进行处理。这些数据块构成工作项。
一个或多个线程(“处理器”)负责一次使一个工作项出队,它们进行处理,然后在将其下一个工作项出队之前将处理后的数据写入输出队列。
最终的I/O绑定(bind)线程(“消费者”)负责将已完成的工作项从输出队列中取出并写入最终目标。这些工作项(并且必须)按照入队的顺序来编写。我使用并发优先级队列实现了这一点,其中每个项目的优先级由其源索引定义。
我正在使用此方案对大型数据流进行一些自定义压缩,其中压缩本身相对较慢,但是未压缩数据的读取和压缩数据的写入相对较快(尽管受I/O限制)。
我以大约64K的较大块处理数据,因此管道的开销相对较小。
我当前的解决方案运行良好,但是它涉及6年前使用许多同步事件编写的自定义代码,并且设计似乎有些笨拙。因此,我已开始从事学术练习,以查看是否可以使用更现代的.Net库来重写它。
新设计
我的新设计使用了BlockingCollection<>
类,并且在某种程度上基于this Microsoft article。
特别是,请查看标题为“使用多个生产者进行负载平衡”的部分。我已经尝试过使用这种方法,因此我有多个处理任务,每个处理任务都从共享输入BlockingCollection中获取工作项,并将完成的项写入其自己的BlockingCollection输出队列中。
因为每个处理任务都有其自己的输出队列,所以我试图使用BlockingCollection.TakeFromAny()
使第一个可用的已完成工作项出队。
复用器问题
到目前为止,一切都很好,但是现在出现了问题。 Microsoft文章指出:
好的,所以发生的事情是,处理任务几乎可以按任何顺序生产成品。多路复用器负责以正确的顺序输出这些项目。
然而...
想象一下,我们要处理1000个项目。进一步想象一下,由于某种奇怪的原因,第一个项目要花更长的时间才能处理所有其他项目的总和。
使用我当前的方案,多路复用器将继续从所有处理输出队列中读取和缓冲项目,直到找到应该输出的下一个为止。由于它等待的项目(根据我上面的“假设是否存在”)仅在所有其他工作项目都已处理之后才会出现,因此我将有效地在整个输入中缓冲所有工作项目!
数据量太大而无法发生这种情况。我需要能够在输出队列达到某个最大大小(即它是有界输出队列)时停止处理任务输出已完成的工作项目,除非工作项目恰好是多路复用器正在等待的项目。
那就是我有点卡住的地方。我可以想到许多实际实现此方法的方法,但是它们似乎都过于复杂,以至于它们并不比我想替换的代码好!
我的问题是什么?
我的问题是:我要这样做正确吗?
我本来以为这将是一个容易理解的问题,但是我的研究只发现了一些文章,这些文章似乎忽略了一个工作项与所有其他工作项相比花费很长时间会发生的无限缓冲问题。
谁能指出任何描述实现此目标的合理方法的文章?
TL; DR版本
最终,我只是在寻找一种使用多个线程来处理顺序工作项的方式,该方式可以在最终输出中保留其顺序,而无需无限制的输出缓冲区。
最佳答案
例如,在启动时创建一个项目池,例如1000。将它们存储在BlockingCollection中-“池队列”。
供应商从池队列中获取项目,从文件中加载它们,以序列号/任何内容加载,然后将它们提交给处理器线程池。
处理器完成其工作,然后将输出发送到多路复用器。多路复用器会完成所有杂乱的项目的存储,直到处理了较早的项目为止。
当多路复用器输出的任何物品完全消耗掉某个物品时,它们将返回到池队列中,以供供应商重复使用。
如果一个“慢项”确实需要大量处理,则多路复用器中的乱序收集将随着“快速项”在其他池线程中的通过而增加,但由于多路复用器实际上并未将其项馈送到它的输出,不补充池队列。
当池为空时,供应商将阻止该池,并且将无法再供应任何物品。
处理池输入中剩余的“快速项目”将被处理,然后除“慢速项目”外将停止处理。供应商被阻止,多路复用器在其集合中有[poolSize-1]个项目。不会使用额外的内存,不会浪费CPU,唯一发生的是对“慢速项目”的处理。
当“慢项”最终完成时,它将输出到多路复用器。
现在,多路复用器可以按要求的顺序输出所有[poolSize]项。随着这些项目的消耗,池再次被填满,现在可以从池中获取项目的供应商继续运行,再次读取其文件,将项目排队到处理器池中。
自动调节,不需要限制的缓冲区,没有内存失控。
编辑:我的意思是“不需要任何限制的缓冲区” :)
而且,没有GC保留-由于这些项目已被重复使用,因此它们不需要GC'ing。