本文首发于个人微信公众号《andyqian》,期待你的关注

前言

  在上一篇文章《Java线程池ThreadPoolExecutor》中描述了ThreadPoolExecutor的基本概念,以及一些常用方法。这对于我们来说,是远远不够的,今天就一起来看TreadPoolExecutor类的内部实现。

线程池状态

  在学习ThreadPoolExecutor源码时,首先来看看下面这段代码,也是非常重要的一段代码。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

这里使用 AtomicInteger 类型的 ctl 变量,同时记录线程池的运行状态以及线程池的容量。

其中

  1. 高三位用来存储线程池运行状态,其余位数表示线程池的容量。

  2. 线程池状态分为RUNNING状态,SHUTDOWN 状态,STOP状态,TIDYING 状态,TERMINATED 状态。

分别如下所述:

RUNNIN状态  

  1. 在该状态下,线程池接受新任务并会处理阻塞队列中的任务。

  2. 其二进制表示的,高三位值是 111。

源码:

private static final int RUNNING    = -1 << COUNT_BITS;

其中 COUNT_BITS = 29,二进制表示如下:

1110 0000 0000 0000 0000 0000 0000 0000

SHUTDOWN 状态  

  1. 在该状态下,线程池不接受新任务,但会处理阻塞队列中的任务。

  2. 其二进制的高三位为: 000。

源码:

private static final int SHUTDOW = -1 << COUNT_BITS;

其中 COUNT_BITS = 29,二进制如下所述:

0000 0000 0000 0000 0000 0000 0000 0000

STOP 状态  

  1. 在该状态下,线程池不接受新的任务且不会处理阻塞队列中的任务,并且会中断正在执行的任务。

  2. 其二进制的高三位为: 010。

源码:

private static final int STOP  = 1 << COUNT_BITS;

其中 COUNT_BITS = 29,二进制如下所述:

010 0000 0000 0000 0000 0000 0000 0000

TIDYING状态  

  1. 所有任务都执行完成,且工作线程数为0,将要调用terminated方法。

  2. 其二进制的高三位为: 010。

源码:

private static final int TIDYING  = 2<< COUNT_BITS;

其中 COUNT_BITS = 29,二进制如下所述:

0100 0000 0000 0000 0000 0000 0000 0000

TERMINATED状态  

  1. 最终状态,为执行terminated()方法后的状态。

  2. 二进制的高三位为110。

源码:

private static final int TERMINATE = 3 << COUNT_BITS;

其中 COUNT_BITS = 29,二进制如下所述:

1100 0000 000 0000 0000 0000 0000 0000

其状态的转换关系如下:

当调用:shutdown() 方法时,其状态由 RUNNING 状态 转换为 SHUTDOWN (状态)。

当调用:shutdownNow() 方法是,其状态由 (RUNNING or SHUTDOWN) 转换为 STOP。

当阻塞队列与线程池两者均为空时,状态由 SHUTDOWN 转换为 TIDYING。

当线程池任务为空时,状态由 STOP 转换为 TIDYING 。

当 terminated() 方法执行完成后,状态由 TIDYING 转换为 TERMIN。

execute 执行方法

  下面方法是执行任务的方法,代码不难,其核心逻辑如下所示:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 判断workerCount线程数是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 当线程池处于运行状态,并且向workQueue中添加执行任务。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次检查(并发考虑),当线程池处于非running状态时,则从workQueue移除刚添加的任务。并执reject策略。
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //当线程池中的workerCount为0时,此时workQueue中还有待执行的任务,则新增一个addWorker,消费workqueue中的任务。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 当队列满时,则调用addWorker方法。如果addWorker失败,表示当前执行任务超过了当前workerQueue容量,且工作线程数数大于maximumPoolSize,则执行reject策略。
        else if (!addWorker(command, false))
            reject(command);
    }

其处理逻辑是:

  1. 当待执行任务为空时,则抛出 NullPointerExecption 异常。

  2. 通过workerCountOf方法获取线程池的工作线程数,当其小于核心线程数时,则通过addWorker方法添加一个核心线程,并将任务给之执行。

  3. 当工作线程数大于corePoolSize时,则判断线程池是否处于运行状态,并向workQueue中添加任务。基于防止并发的目的,进行了双重检查,如果线程池处于非运行状态且remove任务失败时,则执行reject方法。

  4. 当工作线程为0时,则调用addWorker方法,创建worker消费workqueue存在的task。

  5. 当workQueue满时,则调用addWorker方法进行添加worker。如果addWorker失败,则说明workQueue已满,且线程池工作数已大于maximumPoolSize,则执行reject方法。

addWorker 方法

  我们现在将目光移到addWorker方法上,其源码如下所示:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty(
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 如果workcount数大于线程池最大值,或者大于corePoolSize,maximumPoolSize时,则返回false。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 进行WorkCount的 CAS 操作,并结束循环。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 当CASC操作失败,且运行状态已改变时,则继续执行CAS操作。
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
          // 其中Worker 为实现 AbstractQueuedSynchronizer 和 Runnable 的内部类
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 加锁,控制并发
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {

                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 校验线程是否处于处于活跃状态
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将 worker 添加到线程池中,其实现为:HashSet<Worker> workers = new HashSet<Worker>();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 添加成功时,则调用start进行执行。
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果线程未启动成功,则执行addWorkerFailed方法。
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker 类  

构造函数:

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //通过ThreadFactory()工厂创建线程
            this.thread = getThreadFactory().newThread(this);
        }

因为 Worker 实现了Runable接口,在调用start()方法候,实际执行的是run方法,代码如下所示:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行前的操作方法,空实现。可根据需求进行实现。
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //任务执行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                       // 执行后方法,空实现。可根据实际需求进行实现。
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }

最后

  上面简单分析了ThreadPoolExecutor源码,其中涉及到比较多的知识点,有锁,有位运算等等。如果不清楚的,可以先忽略掉,对主要流程理清楚后,再回过头来看看不清楚的知识点,这算是我看源码时的一个小方法。


相关阅读:

Java线程池ThreadPoolExecutor

使用 Mybatis 真心不要偷懒!

再谈Java 生产神器 BTrace

ThreadPoolExecutor 原理解析-LMLPHP

06-15 05:36