问题描述
我正在尝试编写一个解决方案,在该解决方案中,单个线程产生可以并行执行的 I/O 密集型任务.每个任务都有重要的内存数据.所以我希望能够限制当前待处理的任务数量.
I am trying to code a solution in which a single thread produces I/O-intensive tasks that can be performed in parallel. Each task have significant in-memory data. So I want to be able limit the number of tasks that are pending at a moment.
如果我像这样创建 ThreadPoolExecutor:
If I create ThreadPoolExecutor like this:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxQueue));
然后 executor.submit(callable)
在队列已满且所有线程都已忙时抛出 RejectedExecutionException
.
Then the executor.submit(callable)
throws RejectedExecutionException
when the queue fills up and all the threads are already busy.
当队列已满且所有线程都忙时,如何使 executor.submit(callable)
阻塞?
What can I do to make executor.submit(callable)
block when the queue is full and all threads are busy?
编辑:我试过这个:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
它在某种程度上达到了我想要的效果,但方式不雅(基本上被拒绝的线程在调用线程中运行,因此这会阻止调用线程提交更多).
And it somewhat achieves the effect that I want achieved but in an inelegant way (basically rejected threads are run in the calling thread, so this blocks the calling thread from submitting more).
(提出问题5年后)
对于阅读此问题及其答案的任何人,请不要将已接受的答案视为正确的解决方案.请通读所有答案和评论.
To anyone reading this question and its answers, please don't take the accepted answer as one correct solution. Please read through all answers and comments.
推荐答案
我也做过同样的事情.诀窍是创建一个 BlockingQueue,其中 offer() 方法实际上是一个 put().(你可以使用任何你想要的基本 BlockingQueue impl).
I have done this same thing. The trick is to create a BlockingQueue where the offer() method is really a put(). (you can use whatever base BlockingQueue impl you want).
public class LimitedQueue<E> extends LinkedBlockingQueue<E>
{
public LimitedQueue(int maxSize)
{
super(maxSize);
}
@Override
public boolean offer(E e)
{
// turn offer() and add() into a blocking calls (unless interrupted)
try {
put(e);
return true;
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
}
请注意,这仅适用于 corePoolSize==maxPoolSize
的线程池,所以要小心(见评论).
Note that this only works for thread pool where corePoolSize==maxPoolSize
so be careful there (see comments).
这篇关于Java:ExecutorService 在特定队列大小后阻止提交的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!