在并发大量异步任务处理程序中,每执行一个任务就需要创建一个线程,同时任务执行完毕之后需要将线程销毁.我们知道JVM创建线程的时候需要为其分配线程栈空间以及一些初始化操作,同时销毁的过程需要回收线程栈空间并由gc释放资源,期间都需要耗费一定的时间,因此一个任务的最终执行时间=创建线程newTime + 程序执行excuteTime + 线程销毁的gcTime,如果期间newTime+gcTime > excuteTime,那么这时候执行任务创建线程在程序中就显得十分不划算
线程执行需要JVM分配线程栈空间,需要从系统内存申请,如果创建的线程过多,那么就容易造成内存空间不够用导致内存溢出,默认的线程栈空间大小是1M
线程是由操作系统的CPU进行调度,因此并发多线程执行时CPU需要分配时间片并发执行线程,也就是线程并发执行是需要来回切换CPU的context,严重影响性能
并发环境下,如果创建的线程很多,增加对线程的维护和管理的困难
线程池核心接口与实现类类图
Executor & ExecutorService接口核心方法
// Executor.java
// 从线程池中分配一个线程执行任务
void execute(Runnable command);
// ExecutorService.java
// 关闭线程池,会将已提交的任务执行完毕再关闭线程池,不接受新的提交任务
void shutdown();
// 立即关闭,不论是否有任务正在执行还是已提交未执行,都立即退出jvm不再执行,
// 返回已提交未执行的任务
List<Runnable> shutdownNow();
// 线程池是否关闭
boolean isShutdown();
// 线程池调用shutdown()后,所有任务是否已经执行完成
boolean isTerminated();
// 不论是线程中断,超时还是线程池shutdown()发生,将会阻塞直到所有任务执行完成
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// 提交Callable任务返回Future,用于获取Callable执行结果
<T> Future<T> submit(Callable<T> task);
// 提交Runable任务,并返回Future对象,同时将执行结果传入result
<T> Future<T> submit(Runnable task, T result);
// 提交提交Runable任务,并返回Future对象,执行结果为null
Future<?> submit(Runnable task)
// 执行一系列的任务,并返回对应的Future集合对象,同时Future包含task的执行结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 执行一系列任务,其中任意一个执行成功将返回Future对象并取消其他任务的执行,
// Future包含成功执行task的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 主要用于处理定时任务
//ScheduledExecutorService.java
// 创建并执行一个一次性定时任务,指定在 delay&unit(时长+时间单位) 时间点将会执行
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit)// 下面两个方法主要是创建并执行一个周期性任务,如果发生异常会退出程序
// 区别在于
// scheduleAtFixedRate执行周期任务之后,如果任务超过指定的周期时间,下一个任务会立刻执行不会等待
// scheduleWithFixedDelay执行任务超过周期时间,仍然会等待delay时间再进行下一个任务的执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
线程池与线程池工具类图关系
// Executors.java
// 创建一个基于无界的链表阻塞队列(LinkedBlockingQueue)且为固定线程数为nThreads的线程池
// 无界: 队列没有指定容器大小,可以不断添加元素
public static ExecutorService newFixedThreadPool(int nThreads) {}
public static ExecutorService newFixedThreadPool(int nThreads,
ThreadFactory threadFactory){}
// 创建一个基于无界的同步队列(SynchronousQueue)且线程数无限制(0 - Integer.MAX_VALUE)的线程池
// 1.该线程池核心线程数为0且线程空闲时间为60s,意味着任务执行完成或者是超时将会销毁线程
// 2.不存储数据的同步队列(最多只有一个元素的队列),仅作为缓存作用
// 即等待内部创建工作线程完成之后就立即交由线程进行消费
// 3.适用于处理任务但是不确定线程个数的场景,同时为了防止不断创建线程造成CPU资源消耗过多
// 一般会自定义添加对应的线程最大数而不是Integer.MAX_VALUE
public static ExecutorService newCachedThreadPool() {}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {}
// 创建一个基于延迟的无界任务队列且能够执行定时任务的线程池
// 线程池的核心数量corePoolSize,最大线程数为Integer.MAX_VALUE
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,
ThreadFactory threadFactory)// 创建一个基于延迟的无界任务队列,能够执行定时任务且只有一个线程的线程池
// 当该线程池中的线程被中断或者异常退出的时候,线程池会新创建一个线程继续执行后续的任务
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory){}
Executors包含的组件
ThreadPoolExecutor类下核心组件
RejectedExecutionHandler策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
}
execute方法源代码
execute核心执行流程
execute核心流程小结
// 存储线程池状态以及工作线程个数(RUNNING, 0)
// rs表示状态,wc表示工作线程个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 状态存储在对应数值的高三位
// 接收新任务并能处理队列里的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 不再接收新任务但能处理队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 将会在处理任务过程被中断,不再接收和处理队列的任务
private static final int STOP = 1 << COUNT_BITS;
// 所有任务已经执行完毕,工作线程个数为0且在调用terminated()之前的状态
private static final int TIDYING = 2 << COUNT_BITS;
// terminated()执行之后的状态
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 获取当前线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取当前线程池的工作线程个数
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
getLargestPoolSize(): 返回核心线程池中当前最大的线程数,与workers的集合大小一致,表示当前线程池中的线程数量
// 核心代码
addWorker(Runnable firstTask, boolean core){
// ..
// 自旋检测线程个数与队列容量,corePool,maxPool的比较
mainLock.lock();
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// ...
mainLock.unlock();
}
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
getCompletedTaskCount(): 获取当前线程池中已经完成的任务个数(线程池完成的任务 + 工作线程完成的任务)
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
老铁们关注走一走,不迷路
本文分享自微信公众号 - 疾风先生(Gale2Writing)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。