问题描述
我有一种情况,我希望添加到线程池比处理更快。我不认为无限队列是一个好主意,因为有足够的数据,如果不加以检查,队列可能会增长以占用所有内存。鉴于此,我正在尝试确定ThreadPoolExecutor的正确设置。
I have a situation where I expect adding to the thread pool to be faster than processing. I don't think an unbounded queue will be a good idea because there is enough data that the queue might grow to eat all memory if left unchecked. Given this I'm trying to determine the correct setup for a ThreadPoolExecutor.
我首先想到的是一个固定的线程池,它具有直接切换和调用者运行失败策略。但是我想知道这是否会影响吞吐量(因为每次调用者运行策略时都会调用线程池任务可能会完成并暂停一段时间)。
My first thought is a fixed thread pool with direct handoff and caller runs failure policy. But I wonder if this will hurt throughput (because every time caller runs policy is invoked the thread pool tasks will likely complete and sit idle for some time).
另一个想法是使用ArrayBlockingQueue修复了线程池,但我实际上并不确定它的行为。我希望这意味着Executor更喜欢创建线程,如果小于coreThread大小,然后排队,如果队列已满,它会阻止等待队列获得空间。但在阅读这里的文档时:
Another idea is fixed thread pool with ArrayBlockingQueue, but I'm actually not sure of the behavior of that. I'm hoping it means that the Executor prefers creating threads if less than coreThread size, then queues, and if the queue is full it blocks waiting for the queue to get space. But in reading the docs here:
似乎它更喜欢创建线程到corePoolSize,然后添加到队列,如果队列已满,它将尝试创建最大为maxThreads的线程(在这种情况下与coreThreads相同) ),并且未能运行失败政策。
It seems that it will prefer creating threads up to corePoolSize, then add to the queue, and if the queue is full it will try creating threads up to maxThreads (same as coreThreads in this case), and failing that it will run the failure policy.
有人可以澄清上述案例的行为吗?并且还建议针对这个特定情况的最佳设置(我建议的一个想法或其他一些可能更好的设置)?
Can someone clarify the behavior for the case above? And also suggest what the best setup might be for this particular case (one of my suggested ideas or some other one that might work better)?
推荐答案
我想这是另一个答案,因为它是对同一问题的不同解决方案。
I figure I make this another answer because its a different solution to the same problem.
您只能使用ThreadPoolExecutor和Semaphore。信号量将使用您希望在队列中允许的最大数量创建,并且在每个线程完成执行后,您将调用release(beforeExecute,即从项目中取出项目时)
You can use just a ThreadPoolExecutor and Semaphore. The semaphore would be created with the max number you want to allow in the queue and after each thread finishes execution, you would invoke release (beforeExecute, which is when the item is pulled off the queue)
Semaphore semaphore = new Semaphore(1000);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()){
protected void beforeExecute(Runnable r, Throwable t) {
semaphore.release();
}
}
public void doSubmit(Runnable r){
sempahore.acquire();
executor.submit(r);
}
所以这里所有线程都将暂停,直到有可用的许可证(队列)。
So here all threads will suspend until there is a permit available (entry on the queue).
这篇关于ThreadPoolExecutor配置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!