并发编程-线程池核心原理
一、线程池的业务场景应用
1.1、异步处理
举个栗子,比如发送邮件,需要找smtp服务器,发送短信,需要找四大运营商。将这种允许延迟看到效果,甚至即便失败的也ok的任务,搞成异步的。
一般在项目完成这种操作的时候,咱们不会自己写线程池。直接SpringBoot的@Async就ok了。
这种SpringBoot的@Async的本质,还是将任务投递给线程池处理,只不过任务用的线程池你没关注。
是可以给异步处理的任务追加一个合理的线程池,从而提升效率
@Component
public class TestAsync {
static AtomicInteger atomicInteger = new AtomicInteger(1);
@Bean
public Executor taskPool(){
ExecutorService pool = Executors.newFixedThreadPool(100, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("task-" + atomicInteger.getAndIncrement());
return t;
}
});
return pool;
}
@Async(value = "taskPool")
public void task() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + ",发送邮件");
}
}
1.2、并行IO处理
项目非常常见的一个情况,比如一个业务处理,需要走三个服务拿到数据,最后整合的情况,可以合理的实现线程池做多个IO的并行处理,提升效率。
一般都是CountDownLatch + ThreadPoolExecutor去实现。
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
10,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10)
);
public Object test1() {
CountDownLatch countDownLatch = new CountDownLatch(3);
// 时间 = MAX(商品,库存,策略) = 0.8s
// 调用商品服务 - 拿点数据 0.5s
executor.execute(() -> {
Object itemData = itemFeignClient.getData();
countDownLatch.countDown();
});
// 调用库存服务 - 拿点数据 0.8s
executor.execute(() -> {
Object stockData = stockFeignClient.getData();
countDownLatch.countDown();
});
// 调用策略服务 - 拿点数据 0.7s
executor.execute(() -> {
Object strategyData = strategyFeignClient.getData();
countDownLatch.countDown();
});
try {
countDownLatch.await(1500, TimeUnit.MILLISECONDS);
// 业务线程在这汇总 0.2s
Object result = itemData + stockData + strategyData;
return result;
} catch (InterruptedException e) {
e.printStackTrace();
// 超时
throw new RuntimeException("超时啦~");
}
}
1.3、 其他框架底层的线程池
RabbitMQ,消费者的问题。RabbitMQ默认情况下,消费者是单线程消费!
一种方式,是基于RabbitMQ的配置,prefetch以及maxconcurrent去指定每次拉取消息的数量以及最大并行消费的线程数。
这里是不是也要基于消费者的业务逻辑去考虑线程池的配置。
OpenFeign,底层也需要你去指定线程池的信息,不然OpenFeign慢的要死(新版本我不太清楚)
Tomcat,默认200个核心线程。
二、线程池核心属性&参数扫盲
AtomicInteger ctl
- 高3位:线程池的状态
- RUNNING:一切正常,干活!
- SHUTDOWN:接的活都能干,不接新活!
- STOP:什么活都不能干了!
- TIDYING:线程池准备倒闭!
- TERMINATED:线程池倒闭了!
- 第29位:工作线程的数量(工作线程,就是线程池中new的Thread然后start起来的那个线程)
7个参数
- 核心线程数
- 最大线程数(核心线程数 + 非核心线程数)
- 最大空闲时间
- 空间时间单位
- 阻塞队列
- 线程工厂
- 拒绝策略
三、线程池提交任务流程
3.1 构建好的线程池内部,有多少个工作线程?
0个,工作线程是懒加载,随着任务的提交才会构建。
3.2 任务提交到线程池的几种处理方式?(线程池的执行流程/原理)
- 工作线程数 < 核心线程数: 直接构建核心线程来处理当前任务。只要满足前面的条件,就直接构建核心线程,不存在其他工作线程是否空闲。
- 工作线程数 ≥ 核心线程数: 将任务投递到阻塞队列,空闲的工作线程都会来当前的阻塞队列获取任务处理。
- 当任务丢到阻塞队列之后,可能会出现没有工作线程的情况。 此时会构建一个非核心线程去处理阻塞队列中的任务,避免任务长时间在阻塞队列中不被处理!
- 阻塞队列已满: 会创建非核心线程来处理刚刚投递过来的任务。等刚刚投递过来的任务处理完,才会去阻塞队列找之前排队的任务处理。
- 工作线程数 ≥ 最大线程数 && 阻塞队列已满: 执行拒绝策略。
3.3 核心线程与非核心线程的区别?
没有区别!
线程池构建线程,也是基于Thread t = new Thread()去构建,虽然在构建前,有一些判断上的不同,但是构建出来的都是线程,不分核心还是非核心!
线程池最终只确保数量满足参数要求即可。无论线程构建之初,被认定为核心还是非核心,到后面都是工作线程,而且线程池再多维护一个工作线程的属性,很麻烦,而且对效率有影响。
四、线程池后续处理任务流程
线程池中的工作线程,在处理完手头的任务之后,都会去阻塞队列中尝试拿新任务。在尝试的过程中,工作线程处于阻塞状态。
尝试的时间略有不同。
- 有的工作线程会死等,没任务,我就死等,等到有任务。 (核心线程。 工作线程没有超过核心线程数时,都是死等! ) WAITING,take方法。
- 有的工作线程会等待最大空闲时间,这段时间没拿到任务,这个线程就销毁。(非核心线程。 工作线程超过核心线程数之后,都是等一会! )TIMED_WAITING,poll(最大空闲时间)方法。
有的工作线程没拿到任务就销毁?怎么销毁的?
无论是线程池的线程,还是自己new的Thread都是同一种线程。都是start起来的。
销毁方式很简单,run方法结束。
线程池也是一样的操作,让run方法结束即可,而结束的方式,是让runWorker方法里的while循环结束。(工作线程可以一直干活的原因,就是在一个while循环中不停的去阻塞队列获取任务)
- 当没有从阻塞队列在指定的时间范围内,拿到数据,线程会销毁。(非核心线程)
- 等待任务的线程状态,处于WAITING或者TIMED_WAITING,只要线程在这个状态下中断了,就会被立即唤醒,没有拿到任务,这种也会直接被销毁。
原理一样,都是在阻塞队列中尝试获取任务,但是最后没拿到,就被要干掉,干掉的方式,就是run方法结束,线程销毁。
问题:线程池中线程回收的问题?
当咱们在局部位置,方法内部构件一个线程池去处理任务后,即便方法弹栈了,线程池引用没了,工作线程依然无法被正常回收,严重化会导致OOM问题。
在方法内部使用完毕线程池后,一定要基于执行shutdown方法,先让线程池将工作线程回收,否则存活的工作线程会让线程池对象无法被GC回收。
线程池构建的线程,属于GCRoot的一种。
同时工作线程的run方法的参数里传入了Worker对象(Worker属于Thread线程的包装),Worker无法被回收。
Worker是线程池的内部类,内部类还在,外部类无法被回收。
工作线程执行的run方法,是栈桢压栈执行,所以run方法里涉及到的内容都是GCRoot,run方法里涉及到了Worker对象的引用 -------> Worker ---------> ThreadPoolExecutor
@GetMapping("/test")
public void test() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1000,
1000,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10)
);
for (int i = 0; i < 1000; i++) {
executor.execute(() -> {
System.out.println(111);
});
}
Thread.sleep(10000);
}
问题:线程池的核心参数如何设置?
1、如果你项目中有业务涉及到了线程池应用,贴合你的业务去聊。(4C8G)
2、项目中没涉及,以这种方式来聊:
- 核心线程数
- 任务的类型是CPU密集还是IO密集。
- CPU密集:你的线程需要CPU尽可能的去调度他,核心线程数一般就要设置到CPU内核数±1。
- IO密集:很多书上有一些IO密集时,核心线程数设置的公式,但是实际的情况和公式还有出入的,IO密集的时间不确定的。这里一定要基于压测的形式,找到一个合适的中间值,在短时间内可以按照任务的RT时间以及任务数来决定核心线程数的大小。
- 任务的类型是CPU密集还是IO密集。
- 阻塞队列长度
- 考虑任务处理允许的延迟时间,基于每秒任务大致多少的数量,来预测队列最长时,任务会延迟多久。
- JVM内存问题,如果大量的任务扔到阻塞队列中,占用的内存也不小,尽可能不要占用太多。
- 最大线程数
- 强烈推荐就跟核心线程数的大小设置为一样的。如果设置的超过核心线程数,反而会造成线程池处理任务的效率降低,因为CPU频繁的上下文切换,导致浪费CPU资源,这个是完全没有必要的。
- 拒绝策略
- 这个只能贴近业务去聊,最基本比如记录日志,丢就丢了,采用discard都可以接受。
- 一些任务要求必须处理完,设置为CallerRuns都可以接收。
- 或者Abort抛出异常也成,日志有记录的信息,会知道线程池处理的能力不足。
- 任务允许延迟,但是又不能影响当前效率,记录信息扔到数据库,后面定时处理。