问题描述
我需要实施阻止ThreadPoolExecutor
.
这是我们企业应用程序中非常关键的要求.
This is a very crucial requirement in our enterprise application.
如果ThreadPoolExecutor.submit()
或ThreadPoolExecutor.execute()
方法阻塞,直到某个线程被释放以接管新任务,它就会执行类似的操作.
It would do something like if ThreadPoolExecutor.submit()
or ThreadPoolExecutor.execute()
method blocks until a thread gets freed up for picking up a new task.
但是在当前实现中,如果所有池化线程都变得繁忙,则ThreadPoolExecutor.submit()
和ThreadPoolExecutor.execute()
方法将引发RejectedExecutionException异常.
But in current implementation ThreadPoolExecutor.submit()
and ThreadPoolExecutor.execute()
methods throw RejectedExecutionException exception if all pooled threads get busy.
例如下面的代码抛出RejectedExecutionException
:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BlockingTPE {
public static void main(String[] args) {
ArrayBlockingQueue queue = new ArrayBlockingQueue(3);
ThreadPoolExecutor tpExe = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, queue);
int numJobs = 50;
for (int i = 1; i <= numJobs; i++) {
try {
tpExe.submit(new WorkerThread(i));
System.out.println("Added#" + (i));
} catch (RejectedExecutionException e) {
e.printStackTrace();
}
}
}
}
class WorkerThread implements Runnable {
int jobId;
public WorkerThread(int jobId) {
this.jobId = jobId;
}
public void run() {
try {
Thread.sleep(1000);
}
catch (Exception excep) {
}
}
}
推荐答案
如ThreadPoolExecutor
的javadoc所述:
As the javadoc of ThreadPoolExecutor
states:
被拒绝的执行程序处理程序是AbortPolicy
的实例,如果队列不接受其他任务,则将调用该实例.从javadoc开始的行为:
The rejected executor handler is an instance of AbortPolicy
which will be called if the queue does not accept another task. The behavior as of the javadoc:
因此,阻塞队列对您没有任何影响.我以这种方式更改了您的代码,它的运行没有任何问题:
Hence the blocking queue does not have any effect for you. I changed your code this way and it runs without any issues:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 3, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(3));
try {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
int numJobs = 50;
for (int i = 1; i <= numJobs; i++) {
try {
executor.submit(new WorkerThread(i));
System.out.println("Added#" + (i));
} catch (RejectedExecutionException e) {
e.printStackTrace();
}
}
} finally {
executor.shutdown();
}
}
您必须做出的决定是:
- 使用未绑定队列来支持所有延迟的任务.例如
LinkedBlockingQueue
. - 使用绑定队列,让当前线程执行不适合整个队列的任务.例如,请参阅我在答案中发布的代码. 如果边界队列已满,请
- 放弃任务.例如,使用
ThreadPoolExecutor.DiscardPolicy
作为拒绝的执行处理程序.
- use an unbound queue to support all delayed tasks. For example
LinkedBlockingQueue
. - use a bound queue and let the current thread execute the task which does not fit into the full queue. For example see the code I posted along my answer.
- discard tasks if the bounded queue is full. For example use
ThreadPoolExecutor.DiscardPolicy
as rejected execution handler.
这篇关于如何实现阻塞ThreadPoolExecutor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!