最近在项目中遇到一个需要用线程池来处理任务的需求,于是我用ThreadPoolExecutor来实现,但是在实现过程中我发现提交大量任务时它的处理逻辑是这样的(提交任务还有一个submit方法内部也调用了execute方法):

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

注释中已经写的非常明白:

  1. 如果线程数量小于corePoolSize,直接创建新线程处理任务
  2. 如果线程数量等于corePoolSize,尝试将任务放到等待队列里
  3. 如果等待队列已满,尝试创建非核心线程处理任务(如果maximumPoolSIze > corePoolSize

但是在我的项目中一个线程启动需要10s左右的时间(需要启动一个浏览器对象),因此我希望实现一个更精细的逻辑提升资源的利用率:

  1. 线程池保持corePoolSize个线程确保有新任务到来时可以立即得到执行
  2. 当没有空闲线程时,先把任务放到等待队列中(因为开启一个线程需要10s,所以如果在等待队列比较小的时候,等待其他任务完成比等待新线程创建更快)
  3. 当等待队列的大小大于设定的阈值threshold时,说明堆积的任务已经太多了,这个时候开始创建非核心线程直到线程数量已经等于maximumPoolSize
  4. 当线程数量已经等于maximumPoolSize,再将新来的任务放回到任务队列中等待(直到队列满后开始拒绝任务)
  5. 长时间空闲后退出非核心线程回收浏览器占用的内存资源

当我研究了常见的CachedThreadPoolFixedThreadPool以及尝试自己配置ThreadPoolExecutor的构造函数后,发现无论如何都不能实现上面提到的逻辑,因为默认的实现只有在workQueue达到容量上限后才会开始创建非核心线程,因此需要通过继承的方法实现一个新的类来完成需求。

怎么实现在workQueue到达容量上限前就创建非核心线程?还要回顾下execute函数的代码

					//尝试将任务插入等待队列,如果返回false
					//说明队列已经到达容量上限,进入else if逻辑
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
//尝试创建非核心线程
else if (!addWorker(command, false))
    reject(command);

那么只要改变workQueue.offer()的逻辑,在线程数量还小于maximumPoolSize的时候就返回false拒绝插入,让线程池调用addWoker,等不能再创建更多线程时再允许添加到队列即可。

可以通过子类重写offer方法来实现添加逻辑的改变

@Override
public boolean offer(E e) {
    if (threadPoolExecutor == null) {
        throw new NullPointerException();
    }
    //当调用该方法时,已经确定了workerCountOf(c) > corePoolSize
    //当数量小于threshold,在队列里等待
    if (size() < threshold) {
        return super.offer(e);
	//当数量大于等于threshold,说明堆积的任务太多,返回false
	//让线程池来创建新线程处理
    } else {
        //此处可能会因为多线程导致错误的拒绝
        if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
            return false;
        //线程池中的线程数量已经到达上限,只能添加到任务队列中
        } else {
            return super.offer(e);
        }
    }
}

这样就实现了基本实现了我需要的功能,但是在写代码的过程中我找到了一个可能出错的地方:ThreadPoolExecutor线程安全的,那么重写的offer方法也可能遇到多线程调用的情况

//设想当poolSize = maximumPoolSize-1时,两个任务到达此处同时返回false
if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
	return false;
}

由于添加到队列返回falseexecute方法进入到else if (!addWorker(command, false))

if (isRunning(c) && workQueue.offer(command)) {
	int recheck = ctl.get();
	if (! isRunning(recheck) && remove(command))
		reject(command);
	else if (workerCountOf(recheck) == 0)
		addWorker(null, false);
}
//添加到队列失败后进入addWorker方法中
else if (!addWorker(command, false))
	reject(command);
}

再来看一下addWorker方法的代码,这里只截取需要的一部分

for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
    	//两个线程都认为还可以创建再创建一个新线程
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
        //两个线程同时调用cas方法只有一个能够成功
        //成功的线程break retry;进入后面的创建线程的逻辑
        //失败的线程重新回到上面的检查并返回false
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();  // Re-read ctl
    if (runStateOf(c) != rs)
        continue retry;
    // else CAS failed due to workerCount change; retry inner loop
}

最终,在竞争中失败的线程由于addWorker方法返回了false最终调用了reject(command)。在前面写的要实现的逻辑里提到了,只有在等待队列容量达到上限无法再插入时才拒绝任务,但是由于多线程的原因,这里只是超过了threshold但没有超过capacity的时候就拒绝任务了,所以要对拒绝策略的触发做出修改:第一次触发Reject时,尝试重新添加到任务队列中(不进行poolSize的检测),如果仍然不能添加,再拒绝任务
这里通过对execute方法进行重写来实现重试

@Override
public void execute(Runnable command) {
    try {
        super.execute(command);
    } catch (RejectedExecutionException e) {
    	/*
    	这里参考源码中将任务添加到任务队列的实现
    	但是其中通过(workerCountOf(recheck) == 0)
    	检查当任务添加到队列后是否还有线程存活的部分
    	由于是private权限的,无法实现类似的逻辑,因此需要做一定的特殊处理
		if (isRunning(c) && workQueue.offer(command)) {
		     int recheck = ctl.get();
		     if (! isRunning(recheck) && remove(command))
		         reject(command);
		     else if (workerCountOf(recheck) == 0)
		         addWorker(null, false);
		 }
		*/
        if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
            if (this.isShutdown() && remove(command))
                //二次检查
                realRejectedExecutionHandler.rejectedExecution(command, this);
            } else {
                //插入失败,队列已经满了
                realRejectedExecutionHandler.rejectedExecution(command, this);
            }
        }
    }
}

这里有两个小问题:

  1. 初始化线程池传入的RejectedExecutionHandler不一定会抛出异常(事实上,ThreadPoolExecutor自己实现的4中拒绝策略中只有AbortPolicy能够抛出异常并被捕捉到),因此需要在初始化父类时传入AbortPolicy拒绝策略并将构造函数中传入的自定义拒绝策略保存下来,在重试失败后才调用自己的rejectedExecution
  2. corePoolSize = 0 的极端情况下,可能出现一个任务刚被插入队列的同时,所有的线程都结束任务然后被销毁了,此使这个被加入的任务就无法被执行,在ThreadPoolExecutor中是通过
    else if (workerCountOf(recheck) == 0)
    	addWorker(null, false);
    
    在添加后再检查工作线程是否为0来确保任务可以被执行,但是其中使用的方法是私有的,无法在子类中实现类似的逻辑,因此在初始化时只能强制corePoolSize至少为1来解决这个问题。

全部代码如下

public class MyThreadPool extends ThreadPoolExecutor {

    private RejectedExecutionHandler realRejectedExecutionHandler;

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        int queueCapacity) {
        this(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                queueCapacity,
                new AbortPolicy());
    }

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        int queueCapacity,
                        RejectedExecutionHandler handler) {
        super(corePoolSize == 0 ? 1 : corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new MyLinkedBlockingQueue<>(queueCapacity),
                new AbortPolicy());
        ((MyLinkedBlockingQueue)this.getQueue()).setThreadPoolExecutor(this);
        realRejectedExecutionHandler = handler;
    }

    @Override
    public void execute(Runnable command) {
        try {
            super.execute(command);
        } catch (RejectedExecutionException e) {
            if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
                if (this.isShutdown() && remove(command))
                    //二次检查
                    realRejectedExecutionHandler.rejectedExecution(command, this);
            } else {
                //插入失败,队列已经满了
                realRejectedExecutionHandler.rejectedExecution(command, this);
            }
        }
    }
}


public class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    private int threshold = 20;

    private ThreadPoolExecutor threadPoolExecutor = null;

    public MyLinkedBlockingQueue(int queueCapacity) {
        super(queueCapacity);
    }

    public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
        this.threadPoolExecutor = threadPoolExecutor;
    }

    @Override
	public boolean offer(E e) {
	    if (threadPoolExecutor == null) {
	        throw new NullPointerException();
	    }
	    //当调用该方法时,已经确定了workerCountOf(c) > corePoolSize
	    //当数量小于threshold,在队列里等待
	    if (size() < threshold) {
	        return super.offer(e);
		//当数量大于等于threshold,说明堆积的任务太多,返回false
		//让线程池来创建新线程处理
	    } else {
	        //此处可能会因为多线程导致错误的拒绝
	        if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
	            return false;
	        //线程池中的线程数量已经到达上限,只能添加到任务队列中
	        } else {
	            return super.offer(e);
	        }
	    }
	}

    public boolean offerWithoutCheck(E e) {
        return super.offer(e);
    }
}

最后进行简单的测试

corePoolSize:2
maximumPoolSize:5
queueCapacity:10
threshold:7
任务2
线程数量:2
等待队列大小:0
等待队列大小小于阈值,继续等待。
任务3
线程数量:2
等待队列大小:1
等待队列大小小于阈值,继续等待。
任务4
线程数量:2
等待队列大小:2
等待队列大小小于阈值,继续等待。
任务5
线程数量:2
等待队列大小:3
等待队列大小小于阈值,继续等待。
任务6
线程数量:2
等待队列大小:4
等待队列大小小于阈值,继续等待。
任务7
线程数量:2
等待队列大小:5
等待队列大小小于阈值,继续等待。
任务8
线程数量:2
等待队列大小:6
等待队列大小小于阈值,继续等待。
任务9
线程数量:2
等待队列大小:7
等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,创建新线程处理。
任务10
线程数量:3
等待队列大小:7
等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,创建新线程处理。
任务11
线程数量:4
等待队列大小:7
等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,创建新线程处理。
任务12
线程数量:5
等待队列大小:7
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
任务13
线程数量:5
等待队列大小:8
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
任务14
线程数量:5
等待队列大小:9
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
任务15
线程数量:5
等待队列大小:10
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
队列已满
任务16
线程数量:5
等待队列大小:10
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
队列已满

再重新复习一遍要实现的功能:

  1. 线程池保持corePoolSize个线程确保有新任务到来时可以立即得到执行
  2. 当没有空闲线程时,先把任务放到等待队列中(因为开启一个线程需要10s,所以如果在等待队列比较小的时候,等待其他任务完成比等待新线程创建更快)
  3. 当等待队列的大小大于设定的阈值threshold时,说明堆积的任务已经太多了,这个时候开始创建非核心线程直到线程数量已经等于maximumPoolSize
  4. 当线程数量已经等于maximumPoolSize,再将新来的任务放回到任务队列中等待(直到队列满后开始拒绝任务)
  5. 长时间空闲后退出非核心线程回收浏览器占用的内存资源

可以看出,线程池运行的逻辑和要实现的目标是相同的。

04-29 04:13