Closed. This question needs details or clarity。它当前不接受答案。
                            
                        
                    
                
            
                    
                
                        
                            
                        
                    
                        
                            想改善这个问题吗?添加详细信息并通过editing this post阐明问题。
                        
                        6年前关闭。
                    
                
        

我需要使用两个线程创建生产者-消费者realtionshiop。一种方法是从磁盘上的文件中读取文本对象,然后将对象插入FIFO中的队列中,而消费线程正在从队列中读取以处理对象。但是我对于使用什么类感到困惑? LinkedBlockingQueue还是PriorityBlockingQueue?甚至更好的东西?

目的和目的:

我正在尝试实时对推文进行聚类,但是我将推文存档在csv文件中,并且未使用Twitter Streaming API。因此,我试图通过从文件中读取推文并将其放入队列中来模拟流的效果,然后使用者开始从队列中进行读取。我真的有很大的csv文件,因此,我更喜欢流式传输方案。因此,当我收到推文时,消费者正在从队列中获取推文并实时将其聚类。

最佳答案

在您的情况下,PriorityBlockingQueue似乎没有意义,因为您只想按其原始顺序处理消息。

如果您真的想自己处理队列,则可以使用有界的LinkedBlockingQueue:

//example with a limit of 100,000 messages being in the queue at any one time
private static final BlockingQueue<Message> queue =
                                      new LinkedBlockingQueue<> (100_000);


在生产者中,您继续做:

Message msg = getMessage();
queue.put(msg); //blocks if the queue is full


并在消费者中:

Message msg = queue.take(); //blocks until there is a message




彼得·劳瑞(Peter Lawrey)的替代方法包括:

private static final ExecutorService executor = Executors.newFixedThreadPool(10);


在您的生产者中:

final Message msg = getMessage();
Runnable task = new Runnable() {
    public void run() { process(msg); }
}
executor.submit(task);


由于生产者已经创建了消费者(任务),因此没有消费者。



注意:在线程池示例中,基于process方法主要受CPU限制并且您拥有大约10个处理器的假设,我使用了10个线程的大小。在实践中:


如果是这种情况(受CPU限制),则可以使用Runtime.getRuntime().availableProcessors()来获取处理器数量,而不是经过编码的数量。
如果不是(受I / O限制),则您将使用更多线程-最佳数量难以事先估算,并且您需要使用不同的数量对应用进行配置文件以找到最佳值。

10-06 14:59