我对与Java BlockingQueue相同的数据结构感兴趣,不同之处在于它必须能够批处理队列中的对象。换句话说,我希望生产者能够将对象放入队列中,但是让使用者在take()上阻塞,直到队列达到特定大小(批处理大小)为止。

然后,一旦队列达到批量大小,生产者就必须阻塞put(),直到消费者消耗完队列中的所有元素(在这种情况下,生产者将再次开始生产,而消费者则阻塞直到再次达到批量) 。

是否存在类似的数据结构?还是我应该写(我不介意),如果那里有东西,我只是不想浪费时间。

更新

也许可以澄清一下:

情况将始终如下。可以有多个生产者将项目添加到队列中,但是从队列中提取项目的消费者永远不会超过一个。

现在,问题在于并行和串行存在多个这些设置。换句话说,生产者为多个队列生产商品,而消费者本身也可以是生产者。可以更容易地将其视为生产者,消费者生产者以及最终消费者的有向图。

生产者应该阻塞直到队列为空(@Peter Lawrey)的原因是,每个队列都将在线程中运行。如果让它们仅在空间可用时就进行生产,那么最终将出现以下情况:您有太多线程试图一次处理太多东西。

也许将其与执行服务结合起来可以解决问题?

最佳答案

我建议您使用BlockingQueue.drainTo(Collection, int)。您可以将其与take()一起使用,以确保获得最少数量的元素。

使用此方法的优点是您的批量大小随工作负载动态增长,并且生产者不必在消费者忙时阻塞。即,它针对延迟和吞吐量进行了自我优化。

要完全按要求实现(我认为这是一个坏主意),可以将SynchronousQueue与繁忙的消费线程一起使用。

即使用线程执行

 list.clear();
 while(list.size() < required) list.add(queue.take());
 // process list.

生产者将在消费者忙时阻塞。

10-07 12:05
查看更多