最近在项目中遇到一个需要用线程池来处理任务的需求,于是我用ThreadPoolExecutor
来实现,但是在实现过程中我发现提交大量任务时它的处理逻辑是这样的(提交任务还有一个submit
方法内部也调用了execute
方法):
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
注释中已经写的非常明白:
- 如果线程数量小于
corePoolSize
,直接创建新线程处理任务 - 如果线程数量等于
corePoolSize
,尝试将任务放到等待队列里 - 如果等待队列已满,尝试创建非核心线程处理任务(如果
maximumPoolSIze > corePoolSize
)
但是在我的项目中一个线程启动需要10s左右的时间(需要启动一个浏览器对象),因此我希望实现一个更精细的逻辑提升资源的利用率:
- 线程池保持
corePoolSize
个线程确保有新任务到来时可以立即得到执行 - 当没有空闲线程时,先把任务放到等待队列中(因为开启一个线程需要10s,所以如果在等待队列比较小的时候,等待其他任务完成比等待新线程创建更快)
- 当等待队列的大小大于设定的阈值
threshold
时,说明堆积的任务已经太多了,这个时候开始创建非核心线程直到线程数量已经等于maximumPoolSize
- 当线程数量已经等于
maximumPoolSize
,再将新来的任务放回到任务队列中等待(直到队列满后开始拒绝任务) - 长时间空闲后退出非核心线程回收浏览器占用的内存资源
当我研究了常见的CachedThreadPool
、FixedThreadPool
以及尝试自己配置ThreadPoolExecutor
的构造函数后,发现无论如何都不能实现上面提到的逻辑,因为默认的实现只有在workQueue
达到容量上限后才会开始创建非核心线程,因此需要通过继承的方法实现一个新的类来完成需求。
怎么实现在workQueue
到达容量上限前就创建非核心线程?还要回顾下execute
函数的代码
//尝试将任务插入等待队列,如果返回false
//说明队列已经到达容量上限,进入else if逻辑
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//尝试创建非核心线程
else if (!addWorker(command, false))
reject(command);
那么只要改变workQueue.offer()
的逻辑,在线程数量还小于maximumPoolSize
的时候就返回false拒绝插入,让线程池调用addWoker
,等不能再创建更多线程时再允许添加到队列即可。
可以通过子类重写offer
方法来实现添加逻辑的改变
@Override
public boolean offer(E e) {
if (threadPoolExecutor == null) {
throw new NullPointerException();
}
//当调用该方法时,已经确定了workerCountOf(c) > corePoolSize
//当数量小于threshold,在队列里等待
if (size() < threshold) {
return super.offer(e);
//当数量大于等于threshold,说明堆积的任务太多,返回false
//让线程池来创建新线程处理
} else {
//此处可能会因为多线程导致错误的拒绝
if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
return false;
//线程池中的线程数量已经到达上限,只能添加到任务队列中
} else {
return super.offer(e);
}
}
}
这样就实现了基本实现了我需要的功能,但是在写代码的过程中我找到了一个可能出错的地方:ThreadPoolExecutor
是线程安全的,那么重写的offer
方法也可能遇到多线程调用的情况
//设想当poolSize = maximumPoolSize-1时,两个任务到达此处同时返回false
if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
return false;
}
由于添加到队列返回false
,execute
方法进入到else if (!addWorker(command, false))
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//添加到队列失败后进入addWorker方法中
else if (!addWorker(command, false))
reject(command);
}
再来看一下addWorker
方法的代码,这里只截取需要的一部分
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
//两个线程都认为还可以创建再创建一个新线程
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//两个线程同时调用cas方法只有一个能够成功
//成功的线程break retry;进入后面的创建线程的逻辑
//失败的线程重新回到上面的检查并返回false
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
最终,在竞争中失败的线程由于addWorker
方法返回了false
最终调用了reject(command)
。在前面写的要实现的逻辑里提到了,只有在等待队列容量达到上限无法再插入时才拒绝任务,但是由于多线程的原因,这里只是超过了threshold
但没有超过capacity
的时候就拒绝任务了,所以要对拒绝策略的触发做出修改:第一次触发Reject
时,尝试重新添加到任务队列中(不进行poolSize
的检测),如果仍然不能添加,再拒绝任务。
这里通过对execute
方法进行重写来实现重试
@Override
public void execute(Runnable command) {
try {
super.execute(command);
} catch (RejectedExecutionException e) {
/*
这里参考源码中将任务添加到任务队列的实现
但是其中通过(workerCountOf(recheck) == 0)
检查当任务添加到队列后是否还有线程存活的部分
由于是private权限的,无法实现类似的逻辑,因此需要做一定的特殊处理
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
*/
if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
if (this.isShutdown() && remove(command))
//二次检查
realRejectedExecutionHandler.rejectedExecution(command, this);
} else {
//插入失败,队列已经满了
realRejectedExecutionHandler.rejectedExecution(command, this);
}
}
}
}
这里有两个小问题:
- 初始化线程池传入的
RejectedExecutionHandler
不一定会抛出异常(事实上,ThreadPoolExecutor
自己实现的4中拒绝策略中只有AbortPolicy
能够抛出异常并被捕捉到),因此需要在初始化父类时传入AbortPolicy
拒绝策略并将构造函数中传入的自定义拒绝策略保存下来,在重试失败后才调用自己的rejectedExecution
。 - 在
corePoolSize = 0
的极端情况下,可能出现一个任务刚被插入队列的同时,所有的线程都结束任务然后被销毁了,此使这个被加入的任务就无法被执行,在ThreadPoolExecutor
中是通过
在添加后再检查工作线程是否为0来确保任务可以被执行,但是其中使用的方法是私有的,无法在子类中实现类似的逻辑,因此在初始化时只能强制else if (workerCountOf(recheck) == 0) addWorker(null, false);
corePoolSize
至少为1来解决这个问题。
全部代码如下
public class MyThreadPool extends ThreadPoolExecutor {
private RejectedExecutionHandler realRejectedExecutionHandler;
public MyThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
int queueCapacity) {
this(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
queueCapacity,
new AbortPolicy());
}
public MyThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
int queueCapacity,
RejectedExecutionHandler handler) {
super(corePoolSize == 0 ? 1 : corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new MyLinkedBlockingQueue<>(queueCapacity),
new AbortPolicy());
((MyLinkedBlockingQueue)this.getQueue()).setThreadPoolExecutor(this);
realRejectedExecutionHandler = handler;
}
@Override
public void execute(Runnable command) {
try {
super.execute(command);
} catch (RejectedExecutionException e) {
if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
if (this.isShutdown() && remove(command))
//二次检查
realRejectedExecutionHandler.rejectedExecution(command, this);
} else {
//插入失败,队列已经满了
realRejectedExecutionHandler.rejectedExecution(command, this);
}
}
}
}
public class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
private int threshold = 20;
private ThreadPoolExecutor threadPoolExecutor = null;
public MyLinkedBlockingQueue(int queueCapacity) {
super(queueCapacity);
}
public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}
@Override
public boolean offer(E e) {
if (threadPoolExecutor == null) {
throw new NullPointerException();
}
//当调用该方法时,已经确定了workerCountOf(c) > corePoolSize
//当数量小于threshold,在队列里等待
if (size() < threshold) {
return super.offer(e);
//当数量大于等于threshold,说明堆积的任务太多,返回false
//让线程池来创建新线程处理
} else {
//此处可能会因为多线程导致错误的拒绝
if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
return false;
//线程池中的线程数量已经到达上限,只能添加到任务队列中
} else {
return super.offer(e);
}
}
}
public boolean offerWithoutCheck(E e) {
return super.offer(e);
}
}
最后进行简单的测试
corePoolSize:2
maximumPoolSize:5
queueCapacity:10
threshold:7
任务2
线程数量:2
等待队列大小:0
等待队列大小小于阈值,继续等待。
任务3
线程数量:2
等待队列大小:1
等待队列大小小于阈值,继续等待。
任务4
线程数量:2
等待队列大小:2
等待队列大小小于阈值,继续等待。
任务5
线程数量:2
等待队列大小:3
等待队列大小小于阈值,继续等待。
任务6
线程数量:2
等待队列大小:4
等待队列大小小于阈值,继续等待。
任务7
线程数量:2
等待队列大小:5
等待队列大小小于阈值,继续等待。
任务8
线程数量:2
等待队列大小:6
等待队列大小小于阈值,继续等待。
任务9
线程数量:2
等待队列大小:7
等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,创建新线程处理。
任务10
线程数量:3
等待队列大小:7
等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,创建新线程处理。
任务11
线程数量:4
等待队列大小:7
等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,创建新线程处理。
任务12
线程数量:5
等待队列大小:7
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
任务13
线程数量:5
等待队列大小:8
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
任务14
线程数量:5
等待队列大小:9
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
任务15
线程数量:5
等待队列大小:10
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
队列已满
任务16
线程数量:5
等待队列大小:10
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
队列已满
再重新复习一遍要实现的功能:
- 线程池保持
corePoolSize
个线程确保有新任务到来时可以立即得到执行 - 当没有空闲线程时,先把任务放到等待队列中(因为开启一个线程需要10s,所以如果在等待队列比较小的时候,等待其他任务完成比等待新线程创建更快)
- 当等待队列的大小大于设定的阈值
threshold
时,说明堆积的任务已经太多了,这个时候开始创建非核心线程直到线程数量已经等于maximumPoolSize
- 当线程数量已经等于
maximumPoolSize
,再将新来的任务放回到任务队列中等待(直到队列满后开始拒绝任务) - 长时间空闲后退出非核心线程回收浏览器占用的内存资源
可以看出,线程池运行的逻辑和要实现的目标是相同的。