ThreadPoolExecutor 线程池源码学习

ThreadPoolExecutor 线程池源码学习-LMLPHP

1.阅读源码

1.ThreadPoolExecutor.execute

   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // ctl 高三位记录线程状态。低29位记录线程池中线程数
        int c = ctl.get();
        //位运算获取工作线程数 如果少于核心线程数,继续创建核心线程
        if (workerCountOf(c) < corePoolSize) {
        		//当前线程池添加线程任务  两个参数的意义放后面在看
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果当前线程池状态为 RUNNING 状态,将当前线程任务添加到等待队列中。调用的offer方法,队满返回false
        if (isRunning(c) && workQueue.offer(command)) {
        		//再次获取ctl
            int recheck = ctl.get();
            //如果当前线程池状态已经发送变更 不在是 RUNNING 状态 ,则调用remove方法删除刚刚添加到队列中的线程任务
            if (! isRunning(recheck) && remove(command))
            		//调用拒绝策略
                reject(command);
            //如果当前活跃线程数等于0
            else if (workerCountOf(recheck) == 0)
            		//当前线程池添加线程任务
                addWorker(null, false);
        }
        //添加非核心线程
        else if (!addWorker(command, false))
            reject(command);
    }

2. ThreadPoolExecutor.addWorker

 private boolean addWorker(Runnable firstTask, boolean core) {
 				//标记外部循环
        retry:
        for (;;) {
            int c = ctl.get();
            //获取线程池状态
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //线程池状态检查
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
						
						//内部循环
            for (;;) {
            		//获取活跃线程数
                int wc = workerCountOf(c);
                //如果大于最大上限 或 大于 核心线程数
                if (wc >= CAPACITY ||
                		// core -> addworker方法的第二个参数
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    // 添加失败返回
                    return false;
                //将当前线程池活跃线程数加一
                if (compareAndIncrementWorkerCount(c))
                		//跳出外部循环
                    break retry;
                //如果活跃线程数修改失败  再次检查线程池状态
                c = ctl.get();  // Re-read ctl
                //发生变化
                if (runStateOf(c) != rs)
                //结束本次外部循环,进入下一次循环,直到修改成功或返回false
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

				//结束循环到此
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        		//创建新的线程任务
            w = new Worker(firstTask);
            //从worker中获取当前线程
            final Thread t = w.thread;
            //如果不为null
            if (t != null) {
            		//上锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //获取线程池状态
                    int rs = runStateOf(ctl.get());
										//如果线程池状态是Running 或 (SHUTDOWN,但是任务为null)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //当前线程是活跃的(新建线程不会被启动)
                        if (t.isAlive()) // precheck that t is startable
                        		//抛出异常
                            throw new IllegalThreadStateException();
                        //添加到workers容器中
                        workers.add(w);
                        //获取当前线程池中所有的worker数量
                        int s = workers.size();
                        //记录历史最大线程数	
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                		//解锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                		//启动当前线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

3.Worker.run

public void run() {
            runWorker(this);
        }	
 final void runWorker(Worker w) {
 				//当前线程
        Thread wt = Thread.currentThread();
        //线程任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        		//如果当前worker包装的task不为null,执行当前task 如果为null,则调用getTask()从阻塞队列中获取线程任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                		//线程任务执行的前置方法  默认空方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    		//真正执行线程任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        		//线程任务的后置处理方法
            processWorkerExit(w, completedAbruptly);
        }
    }

4.ThreadPoolExecutor.getTask

private Runnable getTask() {
				//是否超时
        boolean timedOut = false; 

        for (;;) {
            int c = ctl.get();
            //当前线程池状态
            int rs = runStateOf(c);

           
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

						//当前线程池中活跃的线程数量
            int wc = workerCountOf(c);

            // allowCoreThreadTimeOut 是否需要进行超时控制 默认为false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
						
						//compareAndDecrementWorkerCount 是将活跃线程数减一
						//满足以下情况将执行 :
						// 1.活跃线程大于最大线程数 或者 上次从队列中获取超时
						// 2.活跃线程大于1,但等待队列是空的
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //将活跃线程数减一
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            		//上次从等待队列中获取是否超时? poll 队列为空时超时等待 : take 队列为空时一直等待
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                		//获取成功返回线程任务
                    return r;
                //获取失败标记超时
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

5.ThreadPoolExecutor.processWorkerExit

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            //从Set集合中移除已执行run方法的 worker,让jvm进行回收
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
        		//是否抛出异常
            if (!completedAbruptly) {
            		//线程替换逻辑判断
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //woker异常补偿
            addWorker(null, false);
        }
    }

2. 源码分析

1.核心线程和非核心线程的区别

//添加核心线程任务 command 为线程任务
addWorker(command, true)

//添加非核心线程任务 command 为线程任务
addWorker(command, false)

创建时机

 //线程池中活跃线程数少于核心线程数则调用addWorker(command, true)创建核心线程
 if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
        			....
        }
if (isRunning(c) && workQueue.offer(command)) {

// workqueue调用offer方法返回false则会调用addWorker(command, false),尝试创建非核心线程
}else if (!addWorker(command, false))


}

消亡时机 java.util.concurrent.ThreadPoolExecutor#getTask

  for (;;) {
  	boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  	
  	 if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            
    
      try {
          Runnable r = timed ?
              workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
              workQueue.take();
          if (r != null)
              return r;
          timedOut = true;
      } catch (InterruptedException retry) {
          timedOut = false;
      }
  	
  	}

当创建的是核心线程时,如果没有设置allowCoreThreadTimeOut的值true(默认值为false),则将调用queue.take方法,一直阻塞,直到队列中有任务。如果是临时线程,则调用queue.poll方法,在规定时间内未从队列中获取到线程任务,在将活跃线程数量减去一后直接返回null。只要不返回null,将会在java.util.concurrent.ThreadPoolExecutor#runWorker 一直循环获取任务并执行,而一旦超时返回null,将跳出runWorker方法,调用processWorkerExit 对临时线程所属的worker进行回收。

runWorker:

 while (task != null || (task = getTask()) != null){

		getTask:
			for (;;) {
			
			
			}

}

2.线程任务抛出异常怎么办

阅读源码部分我们看的是调用线程的execute方法,java.util.concurrent.ThreadPoolExecutor#runWorker 抛出异常后,completedAbruptly为true,java.util.concurrent.ThreadPoolExecutor#processWorkerExit

boolean completedAbruptly = true;
try {

	  try {
	  		task.run();
	  } catch (RuntimeException x) {
              thrown = x; throw x;
          } catch (Error x) {
              thrown = x; throw x;
          } catch (Throwable x) {
              thrown = x; throw new Error(x);
          } finally {
              afterExecute(task, thrown);
          }


		 completedAbruptly = false;

} finally {
            processWorkerExit(w, completedAbruptly);
        }
  try {
            completedTaskCount += w.completedTasks;
       				//移除老的worker
       				workers.remove(w);
        } finally {
            mainLock.unlock();
        }
 
 if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //添加一个新的worker。注意参数一task为null,将直接去队列中获取线程任务
            addWorker(null, false);
        }

代码演示

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                10,
                10,3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(100),
                new DefaultThreadFactory("main"),
                new ThreadPoolExecutor.DiscardPolicy()
        );

        for (int i = 0; i < 2; i++) {
            threadPoolExecutor.execute(()->{
                System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
                throw new RuntimeException();
            });
        }
        
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }


        for (int i1 = 0; i1 < 2; i1++) {
            threadPoolExecutor.execute(()-> {
                System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
            });
        }

输出结果. 3,4线程由于1,2抛出异常,已经被创建

Thread.currentThread().getName() = main-1-2
Thread.currentThread().getName() = main-1-1
Exception in thread "main-1-1" Exception in thread "main-1-2" java.lang.RuntimeException
	at org.example.Main.lambda$main$0(Main.java:36)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
	at java.lang.Thread.run(Thread.java:750)
java.lang.RuntimeException
	at org.example.Main.lambda$main$0(Main.java:36)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
	at java.lang.Thread.run(Thread.java:750)
Thread.currentThread().getName() = main-1-5
Thread.currentThread().getName() = main-1-6

所以当使用executor执行线程任务时,要避免抛出异常,对可能出现的异常try{}catch{}包裹处理

07-09 21:26