前言
在熟练掌握如何使用线程池之后,我们来对ThreadPoolExecutor进行源码分析。希望大家保持对源码的阅读热情,不仅要知其然,也要知其所以然。阅读源码比较苦涩,请养成反复研究琢磨为什么这么写的精神,多推敲。冲鸭!
其实有时候想不通的时候可以看一下英文注释,还是作者解释的精准
1 ThreadPoolExecutor类图
2 ThreadPoolExecutor重要变量
2.1 ctl
这个变量是整个类的核心,AtomicInteger保证了原子性,这个变量存储了2个内容
- 线程池的状态
- 所有工作线程的数量
// int是4个字节,有32位,这里的ctl前3位表示线程池的状态,后29位标识工作线程的数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // Integer.SIZE - 3 = 29 private static final int COUNT_BITS = Integer.SIZE - 3; // 容量 000 11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 运行中状态 111 00000000000000000000000000000 (-536870912) 括号内为十进制的 private static final int RUNNING = -1 << COUNT_BITS; // 关闭状态 000 00000000000000000000000000000 (0) private static final int SHUTDOWN = 0 << COUNT_BITS; // 停止状态 001 00000000000000000000000000000 (536870912) private static final int STOP = 1 << COUNT_BITS; // 整理状态 010 00000000000000000000000000000 (1073741824) private static final int TIDYING = 2 << COUNT_BITS; // 终结状态 011 00000000000000000000000000000 (1610612736) private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl // 先非然后位与运算符获取线程池运行的状态,也就是前3位 private static int runStateOf(int c) { return c & ~CAPACITY; } // 位与运算符获取工作线程数量,也就是后29位 private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池的状态
- RUNNING:接收任务,处理workQueue队列里的任务
- SHUTDOWN:不再接收新的任务,但是处理workQueue队列里的任务
- STOP:拒绝新任务并且抛弃队列里的任务
- TIDYING:将要调用terminated方法
- TERMINATED:终结状态
2.2 Woker静态内部类
Worker实现了Runnable接口,说明可以当做一个可执行的任务。Woker也继承了AbstractQueuedSynchronizer,说明可以实现锁的功能,他是一个简单的不可重入的互斥锁,工作线程执行任务的时候,会先加锁,如果想要中断工作线程,需要先获取锁,否则无法中断,工作线程执行完任务才会释放锁,然后接着从workQueue获取任务继续执行。Worker的主要作用是执行队列的任务,并管理工作线程和统计一些东西。
/** * Class Worker mainly maintains interrupt control state for * threads running tasks, along with other minor bookkeeping. * This class opportunistically extends AbstractQueuedSynchronizer * to simplify acquiring and releasing a lock surrounding each * task execution. This protects against interrupts that are * intended to wake up a worker thread waiting for a task from * instead interrupting a task being run. We implement a simple * non-reentrant mutual exclusion lock rather than use * ReentrantLock because we do not want worker tasks to be able to * reacquire the lock when they invoke pool control methods like * setCorePoolSize. Additionally, to suppress interrupts until * the thread actually starts running tasks, we initialize lock * state to a negative value, and clear it upon start (in * runWorker). */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ // 工作线程 final Thread thread; /** Initial task to run. Possibly null. */ // 第一个任务 Runnable firstTask; /** Per-thread task counter */ // 该工作线程已经完成任务的数量 volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { // 直到runWorker方法禁止被中断 setState(-1); this.firstTask = firstTask; // 从线程工厂获取线程,并把第一个任务给worker this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
3 ThreadPoolExecutor重要函数
3.1 execute(Runnable command)
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 获取线程池状态和线程数 int c = ctl.get(); // 工作线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 添加工作线程 if (addWorker(command, true)) return; // 添加工作线程失败则重新获取线程池状态和线程数 c = ctl.get(); } // 线程池正在运行且任务可以将任务放进队列里 if (isRunning(c) && workQueue.offer(command)) { // 重新检查 - 获取线程池状态和线程数 int recheck = ctl.get(); // 这里重新检查是为了以下2种情况 // 1.当offer方法执行之后,线程池关闭了,回滚之前放入队列的操作并拒绝任务 if (! isRunning(recheck) && remove(command)) reject(command); // 2.线程池里没有可用的消费线程,比如现在核心线程数就1个,前一个任务抛异常了 // 那么现在就没有可用的消费线程了,所以要判断还有没有Worker,这步很关键 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 新增线程失败则拒绝任务 else if (!addWorker(command, false)) reject(command); }
3.2 addWorker(Runnable firstTask, boolean core)
/* * Methods for creating, running and cleaning up after workers */ /** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry: // 外循环 for (;;) { // 获取线程池状态和线程数 int c = ctl.get(); // 线程池状态 int rs = runStateOf(c); // Check if queue empty only if necessary. // 这里我做了一个小调整,看着舒服点,以下几种情况会返回false // 1.线程池状态为STOP,TIDYING,TERMINATED // 2.线程池状态为SHUTDOWN且工作线程的firstTask不为空 // 3.线程池状态为SHUTDOWN且队列为空 if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())) return false; // 内循环 for (;;) { // 获取工作线程数 int wc = workerCountOf(c); // 如果工作线程大于容量或者工作线程大于核心线程数(或者最大线程数)返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 添加工作线程+1 if (compareAndIncrementWorkerCount(c)) break retry; // 重新获取线程池状态和线程数 c = ctl.get(); // Re-read ctl // 如果线程池状态变了,那么重新走外循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop // 如果CAS操作失败,那么重新走内循环 } } // 线程是否开始工作 boolean workerStarted = false; // 线程是否添加到工作线程集合 boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 利用显式锁加锁添加Worker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 如果线程池状态是RUNNING或者是SHUTDOWN&&第一个任务为空 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 检查这个线程是否处于活动状态 - RUNNABLE或者RUNNING if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //添加到工作线程集合 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 如果添加到工作线程集合则开始工作 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果线程没有开始工作,那么工作线程数量-1 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
3.3 runWorker(Worker w)
/** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ final void runWorker(Worker w) { // 此处获取的wt就是Worker里的thread Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 这里为什么要先unlock一下呢?到这一行代码为止,我们没有进行任何的任务处理 // Worker的构造函数中,setState(-1);这一行代码抑制了线程中断,所以这里需要unlock从而允许中断 w.unlock(); // allow interrupts // 是否是异常终止的标识,默认为true。有2中情况为true // 1.执行任务抛出了异常 // 2.worker被中断 boolean completedAbruptly = true; try { // 获取任务,如果getTask()方法返回null,那么随之worker也要-1,之后有getTask()方法分析 // 只有在等待从workQueue队列里获取任务的时候才能中断。 // 第一次执行传入的任务,之后从workQueue队列里获取任务,如果队列为空则等待keepAliveTime这么久 while (task != null || (task = getTask()) != null) { // 加锁的目的在于防止在执行任务的时候,中断当前worker w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 这个方法比较重要,当线程池正在关闭,确保worker被中断 // 有2次runStateAtLeast(ctl.get(), STOP)方法调用是因为double-check // 第2次检查Thread.interrupted(),该方法会直接擦除线程的interrupt标识 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 执行任务之前的操作,如统计日志等,子类自己实现 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { // 将异常包装成Error抛出 thrown = x; throw new Error(x); } finally { // 执行任务之前的操作,如统计日志等,子类自己实现 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; // 解锁,一次任务的执行结束 w.unlock(); } } completedAbruptly = false; } finally { // 结束worker的清理工作 processWorkerExit(w, completedAbruptly); } }
3.4 getTask()
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 当线程池状态是STOP或者SHUTDOWN并且workQueue队列是空的,返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // timed用来判断该工作线程是否有超时控制? // allowCoreThreadTimeOut参数是是否允许核心线程也有keepAliveTime这么一个属性 // 核心线程默认是没有超时限制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 条件1:如果工作线程大于最大线程数或者超时了 // 条件2:如果工作线程大于1或者workQueue队列为空 // 满足以上2个条件则返回null if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 一个是阻塞方法,一个是非阻塞方法,关键还是看timed这个变量,见上 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
3.5 shutdown
线程池将不会再接收新的任务,将先前放在队列中的任务执行完成。
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * * @throws SecurityException {@inheritDoc} */ public void shutdown() { // 获取显式锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查shutdown权限 checkShutdownAccess(); // 将线程池状态改为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲worker // 如果该线程正在工作,则不中断 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 保证workQueue里的剩余任务可以执行完 tryTerminate(); }
参考资料:
《Java concurrence in practice》