1、线程池的使用场景

2、线程池的关键参数说明

一般情况下我们是通过ThreadPoolExecutor来构造我们的线程池对象的。

* 阿里巴巴的开发规范文档是禁止直接使用Executors静态工厂类来创建线程池的,原因是

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {

}

参数说明:

3、线程池的分类

3.1、newCachedThreadPool
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
}
3.2、newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
3.3、ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
3.4、SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(){
return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
3.5、总结
newCachedThreadPool1、线程数无上限
2、空闲线程存活60s
3、阻塞队列
1、任务执行时间短
2、任务要求响应时间短
newFixedThreadPool1、线程数固定
2、无界队列
1、任务比较平缓
2、控制最大的线程数
newScheduledThreadPool核心线程数量固定、非核心线程数量无限制(闲置时马上回收)执行定时 / 周期性 任务
newSingleThreadExecutor只有一个核心线程(保证所有任务按照指定顺序在一个线程中执行,不需要处理线程同步的问题)不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作,文件操作等

4、使用线程池容易出现的问题

整个系统影响缓慢,大部分5041、为设置最大的线程数,任务积压过多,线程数用尽
oom1、队列无界或者size设置过大
使用线程池对效率并没有明显的提升1、线程池的参数设置过小,线程数过小或者队列过小,或者是服务器的cpu核数太低

5、线程池的监控

5.1、为什么要对线程池进行监控

5.2、监控的原理

  • 另起一个定时单线程数的线程池newSingleThreadScheduledExecutor

  • 调用scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)定时执行监控任务;

  • 定时任务内 通过ThreadPoolExecutor对象获取监控的对象信息,比如t线程池需要执行的任务数、线程池在运行过程中已完成的任务数、曾经创建过的最大线程数、线程池里的线程数量、线程池里活跃的线程数量、当前排队线程数

  • 根据预设的日志或报警策略,进行规则控制

5.3、实现的细节

定义线程池并启动监控

 /**
* 定义线程池的队列的长度
*/
private final Integer queueSize = 1000;

/**
* 定义一个定长的线程池
*/
private ExecutorService executorService;

@PostConstruct
private void initExecutorService() {
log.info(
"executorService init with param: threadcount:{} ,queuesize:{}",
systemConfig.getThreadCount(),
systemConfig.getThreadQueueSize());
executorService =
new ThreadPoolExecutor(
systemConfig.getThreadCount(),
systemConfig.getThreadCount(),
0,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue(systemConfig.getThreadQueueSize()),
new BasicThreadFactory.Builder()
.namingPattern("async-sign-thread-%d")
.build(),
(r, executor) -> log.error("the async executor pool is full!!"));

/** 启动线程池的监控 */
ThreadPoolMonitoring threadPoolMonitoring = new ThreadPoolMonitoring();
threadPoolMonitoring.init();
}

线程池的监控

/**
* 功能说明:线程池监控
*
* @params
* @return <br>
* 修改历史<br>
* [2019年06月14日 10:20:10 10:20] 创建方法by fengqingyang
*/
public class ThreadPoolMonitoring {
/** 用于周期性监控线程池的运行状态 */
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder()
.namingPattern("async thread executor monitor")
.build());

/**
* 功能说明:自动运行监控
*
* @return <br>
* 修改历史<br>
* [2019年06月14日 10:26:51 10:26] 创建方法by fengqingyang
* @params
*/
public void init() {
scheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
ThreadPoolExecutor threadPoolExecutor =
(ThreadPoolExecutor) executorService;
/** 线程池需要执行的任务数 */
long taskCount = threadPoolExecutor.getTaskCount();
/** 线程池在运行过程中已完成的任务数 */
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
/** 曾经创建过的最大线程数 */
long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
/** 线程池里的线程数量 */
long poolSize = threadPoolExecutor.getPoolSize();
/** 线程池里活跃的线程数量 */
long activeCount = threadPoolExecutor.getActiveCount();
/** 当前排队线程数 */
int queueSize = threadPoolExecutor.getQueue().size();
log.info(
"async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}",
taskCount,
completedTaskCount,
largestPoolSize,
poolSize,
activeCount,
queueSize);

/** 超过阀值的80%报警 */
if (activeCount >= systemConfig.getThreadCount() * 0.8) {
log.error(
"async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}",
taskCount,
completedTaskCount,
largestPoolSize,
poolSize,
activeCount,
queueSize);
;
}
} catch (Exception ex) {
log.error("ThreadPoolMonitoring service error,{}", ex.getMessage());
}
},
0,
30,
TimeUnit.SECONDS);
}
}

6、需要注意的事项

01-12 09:24
查看更多