• 在并发大量异步任务处理程序中,每执行一个任务就需要创建一个线程,同时任务执行完毕之后需要将线程销毁.我们知道JVM创建线程的时候需要为其分配线程栈空间以及一些初始化操作,同时销毁的过程需要回收线程栈空间并由gc释放资源,期间都需要耗费一定的时间,因此一个任务的最终执行时间=创建线程newTime + 程序执行excuteTime + 线程销毁的gcTime,如果期间newTime+gcTime > excuteTime,那么这时候执行任务创建线程在程序中就显得十分不划算

  • 线程执行需要JVM分配线程栈空间,需要从系统内存申请,如果创建的线程过多,那么就容易造成内存空间不够用导致内存溢出,默认的线程栈空间大小是1M

  • 线程是由操作系统的CPU进行调度,因此并发多线程执行时CPU需要分配时间片并发执行线程,也就是线程并发执行是需要来回切换CPU的context,严重影响性能

  • 并发环境下,如果创建的线程很多,增加对线程的维护和管理的困难



  • 线程池核心接口与实现类类图

并发编程之线程池原理-LMLPHP

  • Executor & ExecutorService接口核心方法

// Executor.java
// 从线程池中分配一个线程执行任务

void execute(Runnable command);

// ExecutorService.java
// 关闭线程池,会将已提交的任务执行完毕再关闭线程池,不接受新的提交任务
void shutdown();

// 立即关闭,不论是否有任务正在执行还是已提交未执行,都立即退出jvm不再执行,

// 返回已提交未执行的任务
List<Runnable> shutdownNow();

// 线程池是否关闭
boolean isShutdown();

// 线程池调用shutdown(),所有任务是否已经执行完成
boolean isTerminated();

// 不论是线程中断,超时还是线程池shutdown()发生,将会阻塞直到所有任务执行完成
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

// 提交Callable任务返回Future,用于获取Callable执行结果
<T> Future<T> submit(Callable<T> task);

// 提交Runable任务,并返回Future对象,同时将执行结果传入result
<T> Future<T> submit(Runnable task, T result);

// 提交提交Runable任务,并返回Future对象,执行结果为null
Future<?> submit(Runnable task)

// 执行一系列的任务,并返回对应的Future集合对象,同时Future包含task的执行结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 执行一系列任务,其中任意一个执行成功将返回Future对象并取消其他任务的执行,

// Future包含成功执行task的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException
;

// 主要用于处理定时任务
//ScheduledExecutorService.java

// 创建并执行一个一次性定时任务,指定在 delay&unit(时长+时间单位) 时间点将会执行

public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit)



// 下面两个方法主要是创建并执行一个周期性任务,如果发生异常会退出程序
// 区别在于
// scheduleAtFixedRate执行周期任务之后,如果任务超过指定的周期时间,下一个任务会立刻执行不会等待
// scheduleWithFixedDelay执行任务超过周期时间,仍然会等待delay时间再进行下一个任务的执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit)
;

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

  • 线程池与线程池工具类图关系

并发编程之线程池原理-LMLPHP


  

// Executors.java
// 创建一个基于无界的链表阻塞队列(LinkedBlockingQueue)且为固定线程数为nThreads的线程池

// 无界: 队列没有指定容器大小,可以不断添加元素

public static ExecutorService newFixedThreadPool(int nThreads) {}
public static ExecutorService newFixedThreadPool(int nThreads,
ThreadFactory threadFactory)
{}


// 创建一个基于无界的同步队列(SynchronousQueue)且线程数无限制(0 - Integer.MAX_VALUE)的线程池
// 1.该线程池核心线程数为0且线程空闲时间为60s,意味着任务执行完成或者是超时将会销毁线程
// 2.不存储数据的同步队列(最多只有一个元素的队列),仅作为缓存作用
// 即等待内部创建工作线程完成之后就立即交由线程进行消费
// 3.适用于处理任务但是不确定线程个数的场景,同时为了防止不断创建线程造成CPU资源消耗过多
// 一般会自定义添加对应的线程最大数而不是Integer.MAX_VALUE
public static ExecutorService newCachedThreadPool() {}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {}


// 创建一个基于延迟的无界任务队列且能够执行定时任务的线程池
// 线程池的核心数量corePoolSize,最大线程数为Integer.MAX_VALUE
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,
ThreadFactory threadFactory)


// 创建一个基于延迟的无界任务队列,能够执行定时任务且只有一个线程的线程池
// 当该线程池中的线程被中断或者异常退出的时候,线程池会新创建一个线程继续执行后续的任务
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
{}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory){}

  • Executors包含的组件

并发编程之线程池原理-LMLPHP

  • ThreadPoolExecutor类下核心组件

  • RejectedExecutionHandler策略

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{

}

  • execute方法源代码


  
  • execute核心执行流程

并发编程之线程池原理-LMLPHP

  • execute核心流程小结

并发编程之线程池原理-LMLPHP

// 存储线程池状态以及工作线程个数(RUNNING, 0)
// rs表示状态,wc表示工作线程个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 状态存储在对应数值的高三位
// 接收新任务并能处理队列里的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 不再接收新任务但能处理队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 将会在处理任务过程被中断,不再接收和处理队列的任务
private static final int STOP = 1 << COUNT_BITS;
// 所有任务已经执行完毕,工作线程个数为0且在调用terminated()之前的状态
private static final int TIDYING = 2 << COUNT_BITS;
// terminated()执行之后的状态
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
// 获取当前线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取当前线程池的工作线程个数
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

并发编程之线程池原理-LMLPHP


  • getLargestPoolSize(): 返回核心线程池中当前最大的线程数,与workers的集合大小一致,表示当前线程池中的线程数量

// 核心代码
addWorker(Runnable firstTask, boolean core){
// ..
// 自旋检测线程个数与队列容量,corePool,maxPool的比较
mainLock.lock();
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// ...
mainLock.unlock();
}
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}


  

  

  
  • getCompletedTaskCount(): 获取当前线程池中已经完成的任务个数(线程池完成的任务 + 工作线程完成的任务)

public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}

并发编程之线程池原理-LMLPHP

老铁们关注走一走,不迷路


本文分享自微信公众号 - 疾风先生(Gale2Writing)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

09-10 11:34
查看更多