本文首发于个人微信公众号《andyqian》,期待你的关注
前言
在上一篇文章《Java线程池ThreadPoolExecutor》中描述了ThreadPoolExecutor的基本概念,以及一些常用方法。这对于我们来说,是远远不够的,今天就一起来看TreadPoolExecutor类的内部实现。
线程池状态
在学习ThreadPoolExecutor源码时,首先来看看下面这段代码,也是非常重要的一段代码。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
这里使用 AtomicInteger 类型的 ctl 变量,同时记录线程池的运行状态以及线程池的容量。
其中:
-
高三位用来存储线程池运行状态,其余位数表示线程池的容量。
-
线程池状态分为RUNNING状态,SHUTDOWN 状态,STOP状态,TIDYING 状态,TERMINATED 状态。
分别如下所述:
RUNNIN状态
-
在该状态下,线程池接受新任务并会处理阻塞队列中的任务。
-
其二进制表示的,高三位值是 111。
源码:
private static final int RUNNING = -1 << COUNT_BITS;
其中 COUNT_BITS = 29,二进制表示如下:
1110 0000 0000 0000 0000 0000 0000 0000
SHUTDOWN 状态
-
在该状态下,线程池不接受新任务,但会处理阻塞队列中的任务。
-
其二进制的高三位为: 000。
源码:
private static final int SHUTDOW = -1 << COUNT_BITS;
其中 COUNT_BITS = 29,二进制如下所述:
0000 0000 0000 0000 0000 0000 0000 0000
STOP 状态
-
在该状态下,线程池不接受新的任务且不会处理阻塞队列中的任务,并且会中断正在执行的任务。
-
其二进制的高三位为: 010。
源码:
private static final int STOP = 1 << COUNT_BITS;
其中 COUNT_BITS = 29,二进制如下所述:
010 0000 0000 0000 0000 0000 0000 0000
TIDYING状态
-
所有任务都执行完成,且工作线程数为0,将要调用terminated方法。
-
其二进制的高三位为: 010。
源码:
private static final int TIDYING = 2<< COUNT_BITS;
其中 COUNT_BITS = 29,二进制如下所述:
0100 0000 0000 0000 0000 0000 0000 0000
TERMINATED状态
-
最终状态,为执行terminated()方法后的状态。
-
二进制的高三位为110。
源码:
private static final int TERMINATE = 3 << COUNT_BITS;
其中 COUNT_BITS = 29,二进制如下所述:
1100 0000 000 0000 0000 0000 0000 0000
其状态的转换关系如下:
当调用:shutdown() 方法时,其状态由 RUNNING 状态 转换为 SHUTDOWN (状态)。
当调用:shutdownNow() 方法是,其状态由 (RUNNING or SHUTDOWN) 转换为 STOP。
当阻塞队列与线程池两者均为空时,状态由 SHUTDOWN 转换为 TIDYING。
当线程池任务为空时,状态由 STOP 转换为 TIDYING 。
当 terminated() 方法执行完成后,状态由 TIDYING 转换为 TERMIN。
execute 执行方法
下面方法是执行任务的方法,代码不难,其核心逻辑如下所示:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 判断workerCount线程数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 当线程池处于运行状态,并且向workQueue中添加执行任务。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查(并发考虑),当线程池处于非running状态时,则从workQueue移除刚添加的任务。并执reject策略。
if (! isRunning(recheck) && remove(command))
reject(command);
//当线程池中的workerCount为0时,此时workQueue中还有待执行的任务,则新增一个addWorker,消费workqueue中的任务。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 当队列满时,则调用addWorker方法。如果addWorker失败,表示当前执行任务超过了当前workerQueue容量,且工作线程数数大于maximumPoolSize,则执行reject策略。
else if (!addWorker(command, false))
reject(command);
}
其处理逻辑是:
-
当待执行任务为空时,则抛出 NullPointerExecption 异常。
-
通过workerCountOf方法获取线程池的工作线程数,当其小于核心线程数时,则通过addWorker方法添加一个核心线程,并将任务给之执行。
-
当工作线程数大于corePoolSize时,则判断线程池是否处于运行状态,并向workQueue中添加任务。基于防止并发的目的,进行了双重检查,如果线程池处于非运行状态且remove任务失败时,则执行reject方法。
-
当工作线程为0时,则调用addWorker方法,创建worker消费workqueue存在的task。
-
当workQueue满时,则调用addWorker方法进行添加worker。如果addWorker失败,则说明workQueue已满,且线程池工作数已大于maximumPoolSize,则执行reject方法。
addWorker 方法
我们现在将目光移到addWorker方法上,其源码如下所示:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty(
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果workcount数大于线程池最大值,或者大于corePoolSize,maximumPoolSize时,则返回false。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 进行WorkCount的 CAS 操作,并结束循环。
if (compareAndIncrementWorkerCount(c))
break retry;
// 当CASC操作失败,且运行状态已改变时,则继续执行CAS操作。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 其中Worker 为实现 AbstractQueuedSynchronizer 和 Runnable 的内部类
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁,控制并发
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 校验线程是否处于处于活跃状态
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将 worker 添加到线程池中,其实现为:HashSet<Worker> workers = new HashSet<Worker>();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功时,则调用start进行执行。
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程未启动成功,则执行addWorkerFailed方法。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker 类
构造函数:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//通过ThreadFactory()工厂创建线程
this.thread = getThreadFactory().newThread(this);
}
因为 Worker 实现了Runable接口,在调用start()方法候,实际执行的是run方法,代码如下所示:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try
while (task != null || (task = getTask()) != null) {
w.lock();
//
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行前的操作方法,空实现。可根据需求进行实现。
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行后方法,空实现。可根据实际需求进行实现。
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
最后
上面简单分析了ThreadPoolExecutor源码,其中涉及到比较多的知识点,有锁,有位运算等等。如果不清楚的,可以先忽略掉,对主要流程理清楚后,再回过头来看看不清楚的知识点,这算是我看源码时的一个小方法。
相关阅读: