Closed. This question needs details or clarity。它当前不接受答案。
想改善这个问题吗?添加详细信息并通过editing this post阐明问题。
6年前关闭。
我需要使用两个线程创建生产者-消费者realtionshiop。一种方法是从磁盘上的文件中读取文本对象,然后将对象插入FIFO中的队列中,而消费线程正在从队列中读取以处理对象。但是我对于使用什么类感到困惑? LinkedBlockingQueue还是PriorityBlockingQueue?甚至更好的东西?
目的和目的:
我正在尝试实时对推文进行聚类,但是我将推文存档在csv文件中,并且未使用Twitter Streaming API。因此,我试图通过从文件中读取推文并将其放入队列中来模拟流的效果,然后使用者开始从队列中进行读取。我真的有很大的csv文件,因此,我更喜欢流式传输方案。因此,当我收到推文时,消费者正在从队列中获取推文并实时将其聚类。
在生产者中,您继续做:
并在消费者中:
彼得·劳瑞(Peter Lawrey)的替代方法包括:
在您的生产者中:
由于生产者已经创建了消费者(任务),因此没有消费者。
注意:在线程池示例中,基于
如果是这种情况(受CPU限制),则可以使用
如果不是(受I / O限制),则您将使用更多线程-最佳数量难以事先估算,并且您需要使用不同的数量对应用进行配置文件以找到最佳值。
想改善这个问题吗?添加详细信息并通过editing this post阐明问题。
6年前关闭。
我需要使用两个线程创建生产者-消费者realtionshiop。一种方法是从磁盘上的文件中读取文本对象,然后将对象插入FIFO中的队列中,而消费线程正在从队列中读取以处理对象。但是我对于使用什么类感到困惑? LinkedBlockingQueue还是PriorityBlockingQueue?甚至更好的东西?
目的和目的:
我正在尝试实时对推文进行聚类,但是我将推文存档在csv文件中,并且未使用Twitter Streaming API。因此,我试图通过从文件中读取推文并将其放入队列中来模拟流的效果,然后使用者开始从队列中进行读取。我真的有很大的csv文件,因此,我更喜欢流式传输方案。因此,当我收到推文时,消费者正在从队列中获取推文并实时将其聚类。
最佳答案
在您的情况下,PriorityBlockingQueue似乎没有意义,因为您只想按其原始顺序处理消息。
如果您真的想自己处理队列,则可以使用有界的LinkedBlockingQueue:
//example with a limit of 100,000 messages being in the queue at any one time
private static final BlockingQueue<Message> queue =
new LinkedBlockingQueue<> (100_000);
在生产者中,您继续做:
Message msg = getMessage();
queue.put(msg); //blocks if the queue is full
并在消费者中:
Message msg = queue.take(); //blocks until there is a message
彼得·劳瑞(Peter Lawrey)的替代方法包括:
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
在您的生产者中:
final Message msg = getMessage();
Runnable task = new Runnable() {
public void run() { process(msg); }
}
executor.submit(task);
由于生产者已经创建了消费者(任务),因此没有消费者。
注意:在线程池示例中,基于
process
方法主要受CPU限制并且您拥有大约10个处理器的假设,我使用了10个线程的大小。在实践中:如果是这种情况(受CPU限制),则可以使用
Runtime.getRuntime().availableProcessors()
来获取处理器数量,而不是经过编码的数量。如果不是(受I / O限制),则您将使用更多线程-最佳数量难以事先估算,并且您需要使用不同的数量对应用进行配置文件以找到最佳值。
10-06 14:59