上篇写到了ThreadPoolExecutor构造方法前4个参数int corePoolSize、int maximumPoolSize,、long keepAliveTime、TimeUnit unit与工作流程机制,现在来看看后3个参数BlockingQueue workQueue、ThreadFactory threadFactory、RejectedExecutionHandler handler的含义与用法。
参数详解
workQueue:阻塞任务队列,用于保存任务以及为工作线程提供待执行的任务
block queue有以下几种实现:
- ArrayBlockingQueue : 有界的数组队列
- LinkedBlockingQueue : 可支持有界/无界的队列,使用链表实现
- PriorityBlockingQueue : 优先队列,可以针对任务排序
- SynchronousQueue : 队列长度为1的队列,和Array有点区别就是:client thread提交到block queue会是一个阻塞过程,直到有一个worker thread连接上来poll task。
threadFactory:线程工厂,线程生成器
handler:当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理;ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略: - CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务
- AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
- DiscardPolicy:直接抛弃任务,不做任何处理
- DiscardOldestPolicy:去除任务队列中的第一个任务,重新提交。
首先若正在运行的线程数量大于或等于 maximumPoolSize时:
public static void main(String[] args) {
ExecutorService executor = new ThreadPoolExecutor(2, 3, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<>(1));
List<Future<String>> resultList = new ArrayList<Future<String>>();
for (int i = 0; i < 10 ; i++) {
Future<String> future = executor.submit(new MyWorker(i));
resultList.add(future);
}
System.out.println("所有线程已经运行完成");
for (Future<String> future:resultList) {
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
static class MyWorker implements Callable<String> {
private int i;
public MyWorker(int i) {
this.i = i;
}
@Override
public String call() {
try {
System.out.println(i);
//模拟执行
Thread.sleep(500);
String result = "序号" + i+ "_当前线程==" + Thread.currentThread().getName();
return result;
} catch (Exception e) {
e.printStackTrace();
}
return "error";
}
}
对此种情况进行处理,自定义拒绝处理类,实现RejectedExecutionHandler接口(和new ThreadPoolExecutor.CallerRunsPolicy()功能一致)
ExecutorService executor = new ThreadPoolExecutor(2, 3, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<>(1), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return null;
}
}, new MyRejectedExecutionHandler());
static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
r.run();
}
}
}
新增ThreadFactory自定义类:
static class MyThreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}