第一.线程的含义
第二.线程池的优势
第三.线程池的API
只需要直接使用Executors 的工厂方法,就可以使用线程池:
public class ExecutorsTest implements Runnable{
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
service.execute(new ExecutorsTest());
}
service.shutdown();
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
public class ExecutorsTest implements Runnable{
public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
service.execute(new ExecutorsTest());
}
service.shutdown();
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
public class ExecutorsTest implements Runnable{
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
service.execute(new ExecutorsTest());
}
service.shutdown();
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
public class ExecutorsTest implements Runnable{
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
for (int i = 0; i < 100; i++) {
scheduledExecutorService.schedule(new ExecutorsTest(), 3,TimeUnit.SECONDS);
}
scheduledExecutorService.shutdown();
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
第四ThreadpoolExecutor
4.1线程池的参数介绍
ThreadpoolExecutor的参数详细介绍:
public ThreadPoolExecutor(int corePoolSize, //核心线程数量
int maximumPoolSize, //最大线程数
long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //保存执行任务的队列
ThreadFactory threadFactory,//创建新线程使用的工厂
RejectedExecutionHandler handler //当任务无法执行的时候的处理方式)
corePoolSize:线程池中的核心线程数
1在创建线程之后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近似理解为今日当值线程
2当核心线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize :线程池能够容纳同时执行的最大线程数,此值必须大于等于1;
keepAliveTime :多余的空闲线程的存活时间,当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余空闲线程会被销毁知道剩下corePoolSize个线程为止;
默认情况下:
只有当线程池中的线程数大于corePoolSize时keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。
unit:keepAliveTime 的单位
workQueue :阻塞队列,被提交但尚未被执行的任务。
threadFactory :表示生成线程池中工作线程的线程工厂,用于创建线程一般用默认的即可。
handler:拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize )时如何拒绝ruannble的策略
newFixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool的核心线程数和最大线程数都是指定值,也就是说当线程池中的线程数超过了核心线程数后,任务都会放在缓存队列中。
另外KeepAliveTime为0,
也就是超出核心线程数量以外的线程空余存活时间,而这里选用阻塞队列是LinkedBlockingQueue,使用的是Integer.MAX_VALUE,
相当于没有上限这个线程的执行任务流程如下:
1、当线程数小于核心线程数,也就是设置的线程数时,新建线程执行任务;
2、当线程数等于核心线程数后,将任务放在阻塞队列
3、由于队列容量非常大,可以一直添加
4、执行完任务的线程反复去队列中取任务执行
用途:newFixedThreadPool用于负载比较大的服务器,为了资源的合理利用,需要限制当前数量
newSingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有的任务按照指定顺序(FIFO优先级)执行
newCachedThreadPool:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建一个可缓存的线程池,如果线程池长度超过处理需要,可以灵活的回收空闲线程若无回收,则新建线程;并且没有核心线程,非核心线程,非核心线程数无上限,
但是每个空闲的时间只有60s,超过后就会被回收。
它的执行流程如下:
1、没有核心线程,直接向SynchronousQueue中提交任务。
2、如果有空闲线程,就去取出任务执行;如果没有空闲线程,就新建一个。
3、执行完任务的线程有60s生存时间,如果在这个时间能够接收到任务就可以存活下去,否则就会被回收
4.2线程池的原理解析
一个案例图:
提交一个任务到线程池中,线程池的处理流程如下:
1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。
2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
3、判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
4.3线程池源码分析
4.3.1、public void execute(Runnable command)
execute的作用就是提交任务command到线程池执行,用户线程提交到任务到线程池的模型图:
该图中可以看到ThreadPoolExecutor的实现实际是一个生产消费模型,当用户添加一个任务到线程池中相当于生产者生产元素,workers线程工作集中的线程任务或者从任务
队列里面获取任务时则相当于消费者消费元素。
public void execute(Runnable command) {
//1、如果任务为null,则抛出异常
if (command == null)
throw new NullPointerException();
//2、获取当前线程池到状态+线程个数变量的组合值
int c = ctl.get();
//3、当前线程数比核心线程数小,就创建线程池
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//4、核心线程池数已经满了,任务队列未满,添加到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//5、检查当前状态是否是RUNING状态,如果不是则删除任务,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//6、如果当前线程池为空,则新建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//7、如果队列满则新增线程,新增失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
ctl的作用:在线程池中,贯穿在线程池整个生命周期中,它是一个原子类,主要作用是用来保存线程状态和线程数量
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
是用位运算,调用了ctlOf()方法
它是一个原子类,主要作用是用来保存线程数量和线程池的状态。我们来分析一下这段代码,其实比较有意思,他用到了位运算一个 int 数值是 32 个 bit 位,
这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。
我们来分析默认情况下,也就是 ctlOf(RUNNING)运行状态,调用了 ctlOf(int rs,int wc)方法;其中
private static int ctlOf(int rs, int wc) { return rs | wc; } 其中 RUNNING =-1 << COUNT_BITS ; -1 左移 29 位. -1 的二进制是
32 个 1(1111 1111 1111 1111 1111 1111 1111 1111)
-1的二进制计算方法原码是 1000…001 . 高位1表示符号位。然后对原码取反,高位不变得到 1111…110 然后对反码进行+1 ,也就是补码操作, 最后得到1111…1111
那么-1 <<左移 29 位, 也就是 【111】 表示; rs | wc 。二进制的 111 | 000 。得到的结果仍然是 111
那么同理可得其他的状态的 bit 位表示
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//将1的二进制向右位移29位,再减1表示最大线程容量
//运行状态保存在int值的高3位 (所有数值左移29位)
private static final int RUNNING = -1 << COUNT_BITS;// 接收新任务,并执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任务,但是执行队列中的任务
private static final int STOP = 1 << COUNT_BITS;// 不接收新任务,不执行队列中的任务,中断正在执行中的任务
private static final int TIDYING = 2 << COUNT_BITS; //所有的任务都已结束,线程数量为0,处于该状态的线程池即将调用terminated()方法
private static final int TERMINATED = 3 << COUNT_BITS;// terminated()方法执行完成
状态变化
4.3.2private boolean addWorker(Runnable firstTask, boolean core)
主要工作就是新增工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:goto语句避免死循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果线程处于非运行状态,且rs!=SHUTDOWN且fritTask不为空,且队列不为null,直接返回false
1、线程池已经SHUTDOWN之后,还要添加任务,拒绝
2、SHUTDOWN状态不接受新的任务,拒绝任务,所以当进入SHUTDOWN状态,而传进来的任务为null,并且队列不为null,
这个时候是可以添加新线程的,如果这个条件去反,就表示不允许添加新的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//自旋
for (;;) {
//获取worker工作线程数
int wc = workerCountOf(c);
//如果工作线程数的大小大于默认的线程数或者核心线程数和最大线程数,则返回false,表示不再添加worker
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通过CAS来增加工作线程数,若新增失败,则重试
if (compareAndIncrementWorkerCount(c))
break retry;
//重新获取ctl
c = ctl.get();
//如果这里不相等,这线程状态已经发生了变化,继续重试
if (runStateOf(c) != rs)
continue retry;
}
}
//工作线程是否已经启动成功
boolean workerStarted = false;
//工作线程是否添加成功
boolean workerAdded = false;
Worker w = null;
try {
//构建一个worker传入的了一个Runaable对象
w = new Worker(firstTask);
//从worker中取出对象
final Thread t = w.thread;
if (t != null) {
//使用锁避免并发问题
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//添加任务
workers.add(w);
int s = workers.size();
//工作的线程数大于最大线程数,表示这个最大线程数曾经出现过的最大线程数
if (s > largestPoolSize)
//更新出现过的最大线程数
largestPoolSize = s;
//创建工作线程成功
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
//添加任务成功,则开启线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//如果添加失败了,就递减实际工作线程数
addWorkerFailed(w);
}
return workerStarted;
}
4.3.3Worker类
1. 每个 worker,都是一条线程,同时里面包含了一个 firstTask,即初始化时要被首先执行的任务. 2. 最终执行任务的,是 runWorker()方法 Worker 类继承了 AQS,并实现了 Runnable 接口,注意其中的 firstTask 和 thread 属性: firstTask 用它来保存传入的任务;thread 是在调用构造方法时通过 ThreadFactory 来创建的 线程,是用来处理任务的线程。在调用构造方法时,需要传入任务,这里通过 getThreadFactory().newThread(this);来新建一个线程,
newThread 方法传入的参数是 this,因为 Worker 本身继承了 Runnable 接口,也就是一个线程,所以一个 Worker 对象在启动的时候会调用 Worker 类中的
run 方法。Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。可以看到 tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的:
lock 方法一旦获取了独占锁,表示当前线程正在执行任务中;那么它会有以下几个作用 1. 如果正在执行任务,则不应该中断线程; 2. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断; 3. 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers
方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态 4. 之所以设置为不可重入,是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁,这样会中断正在运行的线程 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;//这个是执行task的线程,从构造函数可知是由ThreadFactury创建的
Runnable firstTask;//这个就是需要执行的task
volatile long completedTasks;//完成的任务数,用线程统计
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
4.3.4addWorkerFailed
在addWorked中如果添加线程失败,会对添加失败进行处理
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
4.3.5runWork方法
worker简单的理解其实就是一个线程,里面重新了run方法,这块线程池执行任务的真正处理逻辑就是这个runwork方法了,这个方法主要做这个几件事情:
1、如果task不为null,则开始执行task
2、如果 task为null,则根据getTask获取线程,如果获取到Runnable不为null,则执行该任务
3、执行完毕后,通过while循环继续getTask()获取任务
4、如果getTask()取到的任务依然是空的,那么整个runwork()方法执行完毕
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); //表示当前worker线程允许被中断,因为new work默认的state=-1,此处调用worker类的tryRelease()方法,
将state置为0,而interruptIfStarted()中只有state>=0才允许调用中断
boolean completedAbruptly = true;
try {
//这里实现了线程复用,如果task为null,则根据getTask来获取任务
while (task != null || (task = getTask()) != null) {
w.lock();//上锁,不是为了解决并发问题,为了在shutDwon()时不终止正在运行的worker
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try {
//这里默认是没有实现的,在特定场景下,我们可以自己继承ThreadpoolExecutor自己重写 beforeExecute(wt, task); Throwable thrown = null; try {
//执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {
//默认是没有实现的 afterExecute(task, thrown); } } finally {
//置空任务(下次循环时任务依然为null,需要时在通过getTask获取)+记录该wroker完成的任务量+解锁 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally {
//将入参worker从数组中删掉,根据布尔值allowCoreThreadTimeOut来决定是否补充新的worker进数组workers
processWorkerExit(w, completedAbruptly); } }
4.3.6getTask方法
worker线程会从阻塞队列中获取需要执行的任务
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) {//自旋 int c = ctl.get(); int rs = runStateOf(c); //对线程池状态的判断,两种情况会 workerCount-1,并且返回 null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//time变量用于判断是否需要进行超时控制
//allowCoreThreadTimeOut默认为false,也就是核心线程不允许进行超时,
//wc>corePoolSize表示当前线程池的线程数大于核心线程数量
//对于这些超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//线程数量超过了maxinumPoolSize可能是线程池也在运行时被调用了setMaxinumPoolSize()被改变了大小,
否则已经addWoker()成功不会超时maxinumPoolSize()
//timed和timeOut如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时,其实
就是体现了空闲线程的存活时间
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;//返回为null线程worker会自动退出的
continue;
}
try {
//根据timed来判断,如果为true,则通过阻塞队列poll方法进行超时控制,如果在keepAliveTime时间内没有获取任务,则返回为null。
否则通过take方法阻塞式获取队列中的任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)//如果拿到的队列不为null,则执行返回给worker处理
return r;
timedOut = true;//如果r等于空,说明已经超时了,设置timeout为true,在下次自旋的时候进行回收
} catch (InterruptedException retry) {
timedOut = false;//如果获取任务时当前线程发生了中断,则设置timeout为false并返回循环重试
}
}
}
4.3.7拒绝策略
等待队列满了,再也不塞下新任务了,同时,线程池中max线程也达到了,无法继续为新任务服务。
这时候我们就需要拒绝策略机制合理的处理这个问题。
JDK拒绝策略:
- AbortPolicy(默认):直接抛出RejectExecutionException异常阻止系统正常运行。
- CallerRunsPolicy:"调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
- DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常。如果允许丢失,这是最好的一种解决方案。
4.3.8自定义线程池
代码案例:
public class Test002 {
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(17,
17,1L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 50; i++) {
int finalI = i;
pool.execute(()->{
System.out.println(Thread.currentThread().getName()+":"+ finalI);
});
}
}
}
4.3.9合理的配置线程池
CPU密集型
CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。
CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程,该任务都不可能得到加速,因为CPU总的运算能力就那些。
CPU密集型任务配置尽可能少的线程数量:
一般公式:
CPU核数+1个线程的线程池
IOP密集型
1、由于IO密集型任务线程并不是一直都在执行任务,则应该配置尽可能多的线程,如CPU核数*2
2、IO密集型,即该任务需要大量的IO,即大量的阻塞。
在单线程运行IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。所以在IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。
IO密集型,大部分线程都阻塞,故需要多配置线程数:
参考公式:
CPU核数/1-阻塞系数 阻塞系数在0.8-0.9之间
比如8核的CPU: 8/1-0.9 =80个线程数