上篇写到了ThreadPoolExecutor构造方法前4个参数int corePoolSize、int maximumPoolSize,、long keepAliveTime、TimeUnit unit与工作流程机制,现在来看看后3个参数BlockingQueue workQueue、ThreadFactory threadFactory、RejectedExecutionHandler handler的含义与用法。

参数详解

workQueue:阻塞任务队列,用于保存任务以及为工作线程提供待执行的任务

block queue有以下几种实现:

  • ArrayBlockingQueue : 有界的数组队列
  • LinkedBlockingQueue : 可支持有界/无界的队列,使用链表实现
  • PriorityBlockingQueue : 优先队列,可以针对任务排序
  • SynchronousQueue : 队列长度为1的队列,和Array有点区别就是:client thread提交到block queue会是一个阻塞过程,直到有一个worker thread连接上来poll task。

    threadFactory:线程工厂,线程生成器

    handler:当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理;ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略:
  • CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务
  • AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
  • DiscardPolicy:直接抛弃任务,不做任何处理
  • DiscardOldestPolicy:去除任务队列中的第一个任务,重新提交。

首先若正在运行的线程数量大于或等于 maximumPoolSize时:

public static void main(String[] args) {
ExecutorService executor = new ThreadPoolExecutor(2, 3, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<>(1)); List<Future<String>> resultList = new ArrayList<Future<String>>();
for (int i = 0; i < 10 ; i++) {
Future<String> future = executor.submit(new MyWorker(i));
resultList.add(future);
} System.out.println("所有线程已经运行完成");
for (Future<String> future:resultList) {
try { System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
} } static class MyWorker implements Callable<String> {
private int i;
public MyWorker(int i) {
this.i = i;
} @Override
public String call() {
try {
System.out.println(i);
//模拟执行
Thread.sleep(500);
String result = "序号" + i+ "_当前线程==" + Thread.currentThread().getName();
return result;
} catch (Exception e) {
e.printStackTrace();
} return "error";
}
}

ThreadPoolExecutor(下篇)-LMLPHP

对此种情况进行处理,自定义拒绝处理类,实现RejectedExecutionHandler接口(和new ThreadPoolExecutor.CallerRunsPolicy()功能一致)

ExecutorService executor = new ThreadPoolExecutor(2, 3, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<>(1), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return null;
}
}, new MyRejectedExecutionHandler()); static class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
r.run();
}
}
}

ThreadPoolExecutor(下篇)-LMLPHP

新增ThreadFactory自定义类:

static class MyThreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}

ThreadPoolExecutor(下篇)-LMLPHP

05-11 15:20