ThreadPoolExecutor
线程池核心实现类
线程池的生命周期
RUNNING: 接受新任务,同时处理工作队列中的任务
SHUTDOWN: 不接受新任务,但是能处理工作队列中的任务
STOP: 不接受新任务,不处理工作队列中的任务,并且强制中断正在运行的工作者线程。
TIDYING: 所有的工作者线程都已经停止,将运行 terminated() 钩子函数。
TERMINATED: terminated() 钩子函数运行完毕
创建实例
/**
* 低 29 位设置为线程池的工作线程数
* 高 3 为设置为线程池的生命周期状态
*/
private final AtomicInteger ctl = new AtomicInteger(ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.RUNNING, 0));
// 线程池的工作线程数在 int 中占用的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 工作线程数掩码
private static final int COUNT_MASK = (1 << ThreadPoolExecutor.COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 线程池处于运行状态:接受新任务,同时处理工作队列中的任务
private static final int RUNNING = -1 << ThreadPoolExecutor.COUNT_BITS;
// 线程池正在停止:不接受新任务,但是能处理工作队列中的任务
private static final int SHUTDOWN = 0 << ThreadPoolExecutor.COUNT_BITS;
// 线程池已经停止:不接受新任务,不处理工作队列中的任务,并且强制中断正在运行的工作者线程
private static final int STOP = 1 << ThreadPoolExecutor.COUNT_BITS;
// 线程池正在执行清理:所有的工作者线程都已经停止,将运行 terminated() 钩子函数
private static final int TIDYING = 2 << ThreadPoolExecutor.COUNT_BITS;
// 线程池已经清理完毕:terminated() 钩子函数运行完毕
private static final int TERMINATED = 3 << ThreadPoolExecutor.COUNT_BITS;
/**
* 任务队列,
* 1)如果工作者线程允许过期,则使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 读取任务
* 2)否则使用 workQueue.take() 读取任务
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 添加工作者线程、关闭线程池、读取统计数据等操作中使用的互斥锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 线程池中的工作者线程集合,只有在持有 mainLock 时才能访问
*/
private final HashSet<Worker> workers = new HashSet<>();
/**
* 执行 awaitTermination 操作时的条件
*/
private final Condition termination = mainLock.newCondition();
/**
* 跟踪线程池同时存在的最大工作线程数
* Accessed only under mainLock.
*/
private int largestPoolSize;
/**
* 线程池完成的任务数,只在工作者线程退出时更新
* Accessed only under mainLock.
*/
private long completedTaskCount;
/**
* 任务队列,
* 1)如果工作者线程允许过期,则使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 读取任务
* 2)否则使用 workQueue.take() 读取任务
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 添加工作者线程、关闭线程池、读取统计数据等操作中使用的互斥锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 线程池中的工作者线程集合,只有在持有 mainLock 时才能访问
*/
private final HashSet<Worker> workers = new HashSet<>();
/**
* 执行 awaitTermination 操作时的条件
*/
private final Condition termination = mainLock.newCondition();
/**
* 跟踪线程池同时存在的最大工作线程数
* Accessed only under mainLock.
*/
private int largestPoolSize;
/**
* 线程池完成的任务数,只在工作者线程退出时更新
* Accessed only under mainLock.
*/
private long completedTaskCount;
/**
* 创建工作者线程的工厂,工作者线程创建失败会导致任务丢失
*/
private volatile ThreadFactory threadFactory;
/**
* 线程池满载或关闭过程中,任务被拒绝时的处理器
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲工作者线程的超时时间,以纳秒为单位。
* 1)当前工作者线程数 > 核心线程数
* 2)允许核心工作者线程超时 allowCoreThreadTimeOut=true
*/
private volatile long keepAliveTime;
/**
* 默认为 false,即使超时了,核心工作者线程也不会退出
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心工作者线程数
*/
private volatile int corePoolSize;
/**
* 最大工作者线程数
*/
private volatile int maximumPoolSize;
/**
* 默认的拒绝处理器
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), ThreadPoolExecutor.defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, ThreadPoolExecutor.defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 使用指定的初始化参数创建一个 ThreadPoolExecutor 实例
*
* @param corePoolSize 核心工作者线程所
* @param maximumPoolSize 最大工作者线程数
* @param keepAliveTime 工作者线程存活时间
* @param unit 时间单位
* @param workQueue 工作队列
* @param threadFactory 创建工作者线程的线程工厂
* @param handler 拒绝处理器
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
/**
* 必须保证
* corePoolSize >=0
* maximumPoolSize > 0
* maximumPoolSize > corePoolSize
* keepAliveTime > 0 表示工作者线程可超时退出
* keepAliveTime = 0 表示不可退出
*/
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) {
throw new IllegalArgumentException();
}
if (workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}
acc = System.getSecurityManager() == null
? null
: AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可提交的任务类型
Runnable 接口无返回值并且不能显示抛出异常。
Callable 接口有返回值,并且能显示抛出异常。
@FunctionalInterface
public interface Runnable {
void run();
}
@FunctionalInterface
public interface Callable<V> {
/**
* 计算并返回的一个结果,如果计算失败,则抛出异常
*/
V call() throws Exception;
}
执行一个 Runnable 任务,无返回值
/**
* 往线程池提交一个 Runnable 任务,
* 如果线程池已满或线程池关闭则,该任务会交给拒绝处理器处理。
*/
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// 读取控制变量
int c = ctl.get();
// 1)线程池工作线程数 < 核心线程数
if (ThreadPoolExecutor.workerCountOf(c) < corePoolSize) {
// 尝试创建一个新的工作者线程来处理这个任务
if (addWorker(command, true)) {
// 创建成功则直接返回
return;
}
// 创建失败,则重新读取控制变量
c = ctl.get();
}
/**
* 2)当前工作者线程数 >= 核心工作者线程
* && 线程池处于运行状态
* && 尝试向工作者队列中提交任务
*/
if (ThreadPoolExecutor.isRunning(c) && workQueue.offer(command)) {
// 重新读取控制变量
final int recheck = ctl.get();
// 1)如果线程池已经停止运行,则将目标任务从任务队列中移除,并尝试终止线程池
if (! ThreadPoolExecutor.isRunning(recheck) && remove(command)) {
// 执行拒绝处理器
reject(command);
// 2)如果已经没有可用的工作者线程
} else if (ThreadPoolExecutor.workerCountOf(recheck) == 0) {
// 尝试添加一个新的工作者线程
addWorker(null, false);
}
}
/**
* 3)当前工作者线程数 >= 核心工作者线程
* && 工作队列已满
* && 尝试增加一个新的工作者线程来处理该任务
*/
else if (!addWorker(command, false)) {
// 任务处理失败,则交给拒绝处理器处理
reject(command);
}
}
/**
* 读取线程池的工作线程数
*/
private static int workerCountOf(int c) { return c & ThreadPoolExecutor.COUNT_MASK; }
/**
* 尝试增加一个核心工作者线程来处理这个任务
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
/**
* 1)线程池状态在 STOP 及以上【线程池已经停止】
* 2)线程池正在停止,并且提交任务不为 null || 工作队列为空
* 则创建失败
*/
if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)
&& (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.STOP)
|| firstTask != null
|| workQueue.isEmpty())) {
return false;
}
for (;;) {
/**
* 1)工作者线程数已经 >= 核心线程数【任务队列未满时】
* 2)工作者线程数已经 >= 最大线程数【任务队列已满时】
* 则创建失败
*/
if (ThreadPoolExecutor.workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & ThreadPoolExecutor.COUNT_MASK)) {
return false;
}
// 尝试递增工作者线程数
if (compareAndIncrementWorkerCount(c)) {
// 如果计数值递增成功,则将正式添加工作者线程来处理任务
break retry;
}
// 如果其他线程优先递增了计数值,则重新读取计数值进行重试
c = ctl.get(); // Re-read ctl
// 线程池正在关闭,则重新进入循环后将直接退出
if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN))
{
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
}
// 工作者线程是否已经启动
boolean workerStarted = false;
// 工作者线程是否已经添加到集合中
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作者线程
w = new Worker(firstTask);
// 读取线程对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/**
* Recheck while holding lock. Back out on ThreadFactory failure or if shut down before lock acquired.
* 读取控制变量再次进行校验
*/
final int c = ctl.get();
/**
* 1)线程池处于运行状态
* 2)线程池处于关闭状态 && 提交任务为 null
*/
if (ThreadPoolExecutor.isRunning(c) ||
ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && firstTask == null) {
// 工作者线程已经启动
if (t.isAlive()) {
throw new IllegalThreadStateException();
}
// 将工作者线程添加到集合中
workers.add(w);
// 如果当前当前工作者线程数 > largestPoolSize,则更新它
final int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s;
}
// 工作者线程添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果添加成功,则启动工作者线程
if (workerAdded) {
t.start();
// 工作者线程启动成功
workerStarted = true;
}
}
} finally {
// 如果工作者线程启动失败,则进行回退和清理
if (!workerStarted) {
addWorkerFailed(w);
}
}
return workerStarted;
}
// 运行状态 c 小于指定状态 s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 运行状态 c 大于等于指定状态 s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/**
* 尝试原子的将工作者线程数 +1
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 1)从 workers 集合中移除工作者 w
if (w != null) {
workers.remove(w);
}
// 递减总工作者线程数
decrementWorkerCount();
// 尝试进行线程池终止
tryTerminate();
} finally {
mainLock.unlock();
}
}
/**
* 将工作者线程总数递减 1
*/
private void decrementWorkerCount() {
ctl.addAndGet(-1);
}
final void tryTerminate() {
for (;;) {
final int c = ctl.get();
/**
* 1)线程池在运行状态【RUNNING】
* 2)线程池在执行清理操作【TIDYING】
* 3)线程池正在停止【SHUTDOWN】并且工作队列非空
* 直接返回
*/
if (ThreadPoolExecutor.isRunning(c) ||
ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.TIDYING) ||
ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && ! workQueue.isEmpty()) {
return;
}
/**
* 工作者线程数不为 0,则强制中断还在运行的工作者
*/
if (ThreadPoolExecutor.workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ThreadPoolExecutor.ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将控制变量设置为 TIDYING
if (ctl.compareAndSet(c, ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.TIDYING, 0))) {
try {
// 执行线程池的终止钩子函数
terminated();
} finally {
// 将控制变量设置为 TERMINATED
ctl.set(ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.TERMINATED, 0));
// 唤醒在 termination 阻塞的所有线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
/**
* 线程池是否在运行
*/
private static boolean isRunning(int c) {
return c < ThreadPoolExecutor.SHUTDOWN;
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历所有的工作者
for (final Worker w : workers) {
// 读取工作者驻留线程
final Thread t = w.thread;
// 如果其为中断,则获取锁并将线程中断
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (final SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果只中断一个工作者线程,则退出
if (onlyOne) {
break;
}
}
} finally {
mainLock.unlock();
}
}
/**
* 合并工作线程数和运行状态
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
提交一个 Runnable、Callable 任务,有返回值
public abstract class AbstractExecutorService implements ExecutorService {
/**
* 将 Runnable 任务和 T 封装成一个 FutureTask 实例
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* 将一个 Callable 任务封装成一个 FutureTask 实例
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* 往线程池提交一个 Runnable 任务,无默认计算结果
*/
@Override
public Future<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
// 将 Runnable 任务封装成 RunnableFuture
final RunnableFuture<Void> ftask = newTaskFor(task, null);
// 执行任务
execute(ftask);
// 返回一个 Future 对象以异步读取计算结果
return ftask;
}
/**
* 往线程池提交一个 Runnable 任务,有默认计算结果
*/
@Override
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) {
throw new NullPointerException();
}
final RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* 往线程池提交一个 Callable 任务,自带计算结果
*/
@Override
public <T> Future<T> submit(Callable<T> task) {
if (task == null) {
throw new NullPointerException();
}
final RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
/**
* 可取消的异步计算类
* 1)计算任务可以通过调用 cancel() 方法进行取消。
* 2)如果计算未完成,则 get() 操作将一直阻塞直到计算完成。
* 3)已经完成的计算不能取消、不能再次启动。
*/
public class FutureTask<V> implements RunnableFuture<V> {
/**
* 任务的状态以及可能的状态转换:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
// 新建任务
private static final int NEW = 0;
// 任务正在执行
private static final int COMPLETING = 1;
// 任务正常执行完成
private static final int NORMAL = 2;
// 任务异常执行完成
private static final int EXCEPTIONAL = 3;
// 任务已经被取消
private static final int CANCELLED = 4;
// 任务正在中断
private static final int INTERRUPTING = 5;
// 任务已经中断
private static final int INTERRUPTED = 6;
/** 实际运行的任务载体 */
private Callable<V> callable;
/** 任务的计算结果或异常对象 */
private Object outcome;
/** 运行 Callable 计算任务的线程 */
private volatile Thread runner;
/** 在当前任务上阻塞等待的线程 */
private volatile WaitNode waiters;
/**
* 执行目标 Callable 任务并返回其计算结果
*/
public FutureTask(Callable<V> callable) {
if (callable == null) {
throw new NullPointerException();
}
this.callable = callable;
this.state = FutureTask.NEW;
}
/**
* 执行目标 Runnable 任务并返回指定的计算结果 result
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
// 写入任务状态
this.state = FutureTask.NEW;
}
}
Executors#
/**
* 将 Runnable 封装成一个【执行目标 Runnable 任务并返回计算结果 null 的 Callable 实例】
*/
public static Callable<Object> callable(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
return new RunnableAdapter<>(task, null);
}
/**
* Runnable 到 Callable 的适配器
*/
private static final class RunnableAdapter<T> implements Callable<T> {
// 目标任务的
private final Runnable task;
// 固定的计算结果
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
/**
* 运行目标任务并返回固定的计算结果
* created by ZXD at 9 Dec 2018 T 12:24:43
* @return
*/
@Override
public T call() {
task.run();
return result;
}
@Override
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 线程池中工作者的驻留线程,创建失败时为 null */
final Thread thread;
/** 第一个运行的任务,可能为 null */
Runnable firstTask;
/** 每个驻留线程完成的任务数,在线程退出时会累加到线程池中 */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/**
* 基于指定的初始任务和线程工厂创建工作者线程
*/
Worker(Runnable firstTask) {
// 禁止中断,直到工作者线程运行为止
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
/**
* Worker 本身实现了 Runnable 并且重写了 run 方法,
* 基于 Worker 创建驻留线程,并启动运行。
*/
thread = getThreadFactory().newThread(this);
}
/** 运行工作者线程 */
@Override
public void run() {
runWorker(this);
}
}
Executors#
/**
* 默认的线程工厂
*/
private static class DefaultThreadFactory implements ThreadFactory {
// 线程池计数器
private static final AtomicInteger poolNumber = new AtomicInteger(1);
// 线程组
private final ThreadGroup group;
// 工作者线程计数器
private final AtomicInteger threadNumber = new AtomicInteger(1);
// 工作者线程名称前缀
private final String namePrefix;
DefaultThreadFactory() {
// 读取安全管理器
final SecurityManager s = System.getSecurityManager();
// 读取线程组
group = s != null ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
// 写入工作者线程名称前缀,如:pool-1-thread-
namePrefix = "pool-" +
DefaultThreadFactory.poolNumber.getAndIncrement() +
"-thread-";
}
/**
* 基于目标 Runnable 创建线程
* created by ZXD at 9 Dec 2018 T 12:55:21
* @param r
* @return
*/
@Override
public Thread newThread(Runnable r) {
// 创建线程
final Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// 设置为非守护线程
if (t.isDaemon()) {
t.setDaemon(false);
}
// 设置线程优先级为 Thread.NORM_PRIORITY
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
工作者线程核心逻辑
/**
* 工作者线程的核心循环,重复的从任务队列中读取任务并执行。
*/
final void runWorker(Worker w) {
// 读取当前线程
final Thread wt = Thread.currentThread();
// 读取第一个任务
Runnable task = w.firstTask;
// 清理
w.firstTask = null;
w.unlock(); // 允许中断
/**
* 是否异常退出
* 1)前置钩子函数抛出异常
* 2)任务执行时抛出异常
* 3)后置钩子函数抛出异常
*/
boolean completedAbruptly = true;
try {
// 1)尝试从工作队列中读取任务
while (task != null || (task = getTask()) != null) {
w.lock();
/**
* If pool is stopping, ensure thread is interrupted;
* if not, ensure thread is not interrupted.
* This requires a recheck in second case to deal with shutdownNow race while clearing interrupt
*
* 1)如果线程池已经停止 && 当前线程未被中断,则中断当前线程
*
*/
if ((ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.STOP) ||
Thread.interrupted() &&
ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.STOP)) &&
!wt.isInterrupted()) {
wt.interrupt();
}
try {
/**
* 线程池钩子函数,在每个任务执行之前触发
*/
beforeExecute(wt, task);
try {
task.run();
/**
* 线程池钩子函数,在每个任务执行之后或执行异常时触发
*/
afterExecute(task, null);
} catch (final Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
// 将当前任务置空
task = null;
// 递增累积完成任务数
w.completedTasks++;
w.unlock();
}
}
// 正常完成任务
completedAbruptly = false;
} finally {
// 处理工作者线程退出后的统计和清理工作
processWorkerExit(w, completedAbruptly);
}
}
/**
* 阻塞读取任务或超时读取任务。
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
// 是否是超时读取任务
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 读取控制变量
final int c = ctl.get();
/**
* 1)线程池已经停止
* 2)线程池正在停止 && 任务队列为空
* 都需要返回 null 以终止当前工作者线程
*/
if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)
&& (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 计算当前工作者线程数
final int wc = ThreadPoolExecutor.workerCountOf(c);
/**
* 是否允许当前工作者线程退出
* 1)允许核心工作者线程退出
* 2)当前工作者线程数 > 核心工作者线程数
*/
final boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 1)当前工作者线程数 > 最大工作者线程数 ||
* 2)允许工作者线程退出 && 当次拉取任务超时
* 3)当前工作者线程数 > 1 || 任务队列为空
*/
if ((wc > maximumPoolSize || timed && timedOut)
&& (wc > 1 || workQueue.isEmpty())) {
// 递减工作者线程数
if (compareAndDecrementWorkerCount(c)) {
// 返回 null 以终止该工作者线程
return null;
}
continue;
}
try {
/**
* 1)如果是超时模式,则尝试在 keepAliveTime 纳秒内读取任务,允许当前工作者退出
* 2)否则,阻塞读取任务【不允许当前工作者退出】
*/
final Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 返回任务不为 null,则执行它
if (r != null) {
return r;
}
// 设置超时拉取标识,第二次循环中当前工作者可能退出
timedOut = true;
} catch (final InterruptedException retry) {
timedOut = false;
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是异常退出,则递减工作者线程数
if (completedAbruptly) {
decrementWorkerCount();
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将当前工作者 w 完成的任务数累加到线程池已完成任务数中
completedTaskCount += w.completedTasks;
// 从工作者集合中删除该工作者
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
final int c = ctl.get();
// 线程池处于 RUNNING 或 SHUTDOWN
if (ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP)) {
// 如果不是异常退出
if (!completedAbruptly) {
/**
* 计算需要保留的最小工作者线程数,如果允许核心工作者线程退出则为 0;
* 否则为 corePoolSize
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 任务队列不为空,则至少保留一个工作者线程
if (min == 0 && ! workQueue.isEmpty()) {
min = 1;
}
// 已有工作者线程 > 期望工作者线程数,则直接返回
if (ThreadPoolExecutor.workerCountOf(c) >= min)
{
return; // replacement not needed
}
}
// 否则尝试新增工作者线程
addWorker(null, false);
}
}
工作者线程退出的情况
1)线程池前置钩子函数 beforeExecute 或后置钩子函数 afterExecute 执行抛出异常
2)任务运行过程中出现异常
3)允许核心工作者线程退出 && 在 keepAliveTime 纳秒内没有读取到任何任务 && (当前工作者线程数 > 1 || 任务队列为空)
4)当前工作者线程数 > 核心工作者线程数 && 在 keepAliveTime 纳秒内没有读取到任何任务 && (当前工作者线程数 > 1 || 任务队列为空)
线程池拒绝策略
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/**
* 如果线程池还在运行,则在任务提交线程中运行被拒绝的任务
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
// 默认的拒绝执行处理器
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/**
* 不管线程池的运行状态,丢弃被拒绝的任务,并抛出 RejectedExecutionException 异常
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
/**
* 静默丢弃被拒绝的任务
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/**
* 如果线程池还在运行,则拉取并丢弃下一个任务,并将被拒绝的任务重新提交
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
线程池的关闭
@Override
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 当前线程是否允许关闭线程池
checkShutdownAccess();
// 将线程池状态更新为 SHUTDOWN
advanceRunState(ThreadPoolExecutor.SHUTDOWN);
// 中断所有空闲工作者,正在处理任务的工作者线程可以继续运行
interruptIdleWorkers();
// 执行钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
}
@Override
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 当前线程是否允许关闭线程池
checkShutdownAccess();
// 将线程池状态更新为 STOP
advanceRunState(ThreadPoolExecutor.STOP);
// 强制中断所有工作者线程,包括正在执行任务的线程
interruptWorkers();
// 读取所有未完成的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
// 返回所有未完成的任务
return tasks;
}
预启动核心工作者线程,以提高响应速度
/**
* 尝试预先启动一个核心工作者线程,阻塞等待获取任务,启动成功返回 true
*/
public boolean prestartCoreThread() {
return ThreadPoolExecutor.workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
/**
* 预启动所有核心工作者线程,并返回实际启动的线程数
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true)) {
++n;
}
return n;
}
状态查询和参数更新
/**
* 线程池是否处于 SHUTDOWN 及以上状态
*/
@Override
public boolean isShutdown() {
return ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.SHUTDOWN);
}
/**
* 线程池是否处于 STOP 及以上状态
*/
boolean isStopped() {
return ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.STOP);
}
/**
* 线程池是否正在停止
*/
public boolean isTerminating() {
final int c = ctl.get();
return ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN) && ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.TERMINATED);
}
/**
* 线程池是否已经停止
*/
@Override
public boolean isTerminated() {
return ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.TERMINATED);
}
/**
* 更新核心线程数,可能导致工作线程增加或退出
*/
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0 || maximumPoolSize < corePoolSize) {
throw new IllegalArgumentException();
}
// 计算增量
final int delta = corePoolSize - this.corePoolSize;
// 写入核心线程数
this.corePoolSize = corePoolSize;
// 如果旧的核心线程数比较大,则尝试中断空闲工作者
if (ThreadPoolExecutor.workerCountOf(ctl.get()) > corePoolSize) {
interruptIdleWorkers();
} else if (delta > 0) {
/**
* 计算所需的工作者线程,最大为 delta
*/
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
// 队列为空,则无需增加
if (workQueue.isEmpty()) {
break;
}
}
}
}
/**
* 设置最大工作者线程数
*/
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
throw new IllegalArgumentException();
}
this.maximumPoolSize = maximumPoolSize;
// 当前工作者线程数 > 新的最大工作者线程数
if (ThreadPoolExecutor.workerCountOf(ctl.get()) > maximumPoolSize) {
interruptIdleWorkers();
}
}
/**
* 更新空闲线程存活时间
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0) {
throw new IllegalArgumentException();
}
if (time == 0 && allowsCoreThreadTimeOut()) {
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
}
// 计算新的空闲线程存活时间
final long keepAliveTime = unit.toNanos(time);
// 计算增量
final long delta = keepAliveTime - this.keepAliveTime;
// 写入值
this.keepAliveTime = keepAliveTime;
if (delta < 0) {
interruptIdleWorkers();
}
}
/**
* 设置拒绝执行处理器
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null) {
throw new NullPointerException();
}
this.handler = handler;
}
/**
* 设置工作者线程工厂
*/
public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException();
}
this.threadFactory = threadFactory;
}
/**
* 设置允许核心工作者线程退出,为 true 时 keepAliveTime 必须 > 0
*/
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0) {
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
}
// 尝试更新值
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value) {
interruptIdleWorkers();
}
}
}