★★★建议星标我们★★★
公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。
2020年Java原创面试题库连载中
(共18篇)
【032期】JavaEE面试题(四)Spring(2)
上一篇讲解了线程池的原理,这篇就在线程池基础上介绍基于线程池实现的定时器ScheduledThreadPoolExecutor:
ScheduledThreadPoolExecutor的用法
ScheduledThreadPoolExecutor源码分析
ScheduledThreadPoolExecutor执行过程分析
1. 介绍
ScheduledThreadPoolExecutor 可以用来在给定延时后执行异步任务或者周期性执行任务,也就是我们说的定时器。ScheduledThreadPoolExecutor基于线程池,通过多线程实现延时和周期执行。
1.1 用法Demo
如下代码使用ScheduledThreadPoolExecutor实现:10ms后打印第一次,之后每隔30ms打印一次。
public class TimerDemo {
1.2 四种定时器用法
public static void main(String[] args) {
// 1. 创建线程池定时器
ScheduledExecutorService timer = Executors.newScheduledThreadPool(3);
// 2. 提交定时任务:10ms后打印第一次,之后每隔30ms打印一次
timer.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(1);
}
}, 10, 30, TimeUnit.MILLISECONDS);
}
}
第一种schedule(Runnable command, long delay, TimeUnit unit);达到给定的延时时间后,执行任务,Runnable不能返回结果;
第二种schedule(Callable callable, long delay, TimeUnit unit); 达到给定的延时时间后,执行任务,Callable可以返回结果;
第三种scheduleAtFixedRate(); 固定周期执行任务,每次执行的开始时间之间的间隔是固定的,最开始就能够确定之后每次执行的时间;
第四种scheduleWithFixedDelay(); 固定延时周期执行任务,上一次执行结束到下一次执行开始的间隔时间是固定的,由于每次执行任务花费时间不一定相同,所以只有在上次执行结束之后才能确定下次执行开始的时间。
/**
* 达到给定的延时时间后,执行任务
* @param command Runnable接口的任务,ScheduledFuture.get()获取结果为null
*/
public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit);
/**
* 到给定的延时时间后,执行任务。
* @param callable 实现Callable接口的任务,ScheduledFuture.get()可获取任务结果
*/
public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);
/**
* 固定周期执行任务
* @param initialDelay 第一次执行的延迟时间
* @param period 周期
*/
public ScheduledFuture scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit);
/**
2. 类结构 2.1 继承结构
* 固定延时周期执行任务
* @param initialDelay 第一次执行的延迟时间
* @param delay 上一次执行结束到下一次执行开始的间隔时间
*/
public ScheduledFuture scheduleWithFixedDelay(Runnable command,
long initialDelay, long delay, TimeUnit unit);
ScheduledThreadPoolExecutor 继承了ThreadPoolExecutor,是一种特殊的线程池,拥有 execute()和 submit()提交异步任务功能。
ScheduledThreadPoolExecutor 类实现了ScheduledExecutorService,该接口定义了延时执行任务和周期执行任务的功能;
ScheduledThreadPoolExecutor 有两个重要的内部类:DelayedWorkQueue和ScheduledFutureTask。DelayedWorkQueue 实现了 BlockingQueue 接口,是一个阻塞队列;ScheduledFutureTask 继承了 FutureTask 类,是一个可以返回异步任务的结果的Runnable。
2.2 构造方法
通过构造方法可以看到,创建ScheduledThreadPoolExecutor其实就是创建一个线程池,corePoolSize可以指定,maximumPoolSize为Integer.MAX_VALUE,任务队列为DelayedWorkQueue。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
2.3 ScheduledFutureTask
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor 提交的任务时,将任务封装成 ScheduledFutureTask,当执行任务时通过ScheduledFutureTask的run()方法调用任务的run()方法。
ScheduledFutureTask的compareTo()方法用于延迟队列排序,按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。
private class ScheduledFutureTask
extends FutureTask implements RunnableScheduledFuture {
private long time;// 下次执行时间
private final long period;// 周期
/**
* 比较方法,到下次执行时间短的任务优先
*/
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask x = (ScheduledFutureTask)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
DelayedWorkQueue 是一个基于小顶堆的数据结构,类似于 DelayQueue 和 PriorityQueue。DelayedWorkQueue 按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。之前已经详细介绍过 DelayQueue 和 PriorityQueue了,这里就不在重复了。
DelayedWorkQueue是存储ScheduledFutureTask阻塞队列。
插入元素时,会根据延期时间对元素排序,队头的元素是最先到期的;
取出元素时,只有在队头元素到期时才能够从队列中取元素。如果队头元素还有t时间到期,则将取出元素线程阻塞t时间,t时间到后再次尝试取出队头元素。
static class DelayedWorkQueue extends AbstractQueue
implements BlockingQueue {
// 队列初始容量
private static final int INITIAL_CAPACITY = 16;
// 根据初始容量创建RunnableScheduledFuture类型的数组
private RunnableScheduledFuture[] queue =
new RunnableScheduledFuture[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
//leader线程
private Thread leader = null;
//当较新的任务在队列的头部可用时,或者新线程可能需要成为leader,则通过该条件发出信号
private final Condition available = lock.newCondition();
/**
* 根据延期时间对元素排序,队头的元素是最先到期的
*/
public boolean offer(Runnable x) {}
/**
* 只有在队头元素到期时才能够从队列中取元素。
* 如果队头元素还有t时间到期,则将取出元素线程阻塞t时间,t时间到后再次尝试取出队头元素。
*/
public RunnableScheduledFuture take() throws InterruptedException {}
以上文的小示例为例,通过源码来分析ScheduledThreadPoolExecutor的执行过程。
示例代码:
public class TimerDemo {
3.1 创建线程池定时器
public static void main(String[] args) {
// 1. 创建线程池定时器
ScheduledExecutorService timer = Executors.newScheduledThreadPool(3);
// 2. 提交定时任务:10ms后打印第一次,之后每隔30ms打印一次
timer.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(1);
}
}, 10, 30, TimeUnit.MILLISECONDS);
}
}
创建线程池定时器就是创建一个线程池:
/**
* Executors.newScheduledThreadPool(3);
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
3.2 添加任务
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
Runable任务封装成ScheduledFutureTask;
任务加入延时队列,同时在队列中按照执行的时间顺序排序,最先执行的任务在队头;
确保线程池中有活动线程,如果没有就启动一个。
public ScheduledFuture scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit) {
// 一些检查
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 将command封装成ScheduledFutureTask,包括下次执行时间和周期
ScheduledFutureTask sft = new ScheduledFutureTask(command,
null, triggerTime(initialDelay, unit), unit.toNanos(period));
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;// 设置下次执行的任务
// 将ScheduledFutureTask类型的任务放入线程池延时执行,下文详细介绍
delayedExecute(t);
return t;
}
/**
* 将ScheduledFutureTask类型的任务放入线程池延时执行
*/
private void delayedExecute(RunnableScheduledFuture task) {
// 线程池关闭了,走拒绝策略
if (isShutdown())
reject(task);
else {
// 无论如何,将任务加入延时队列,插入队列时会根据下次执行时间排序
super.getQueue().add(task);
// 特殊情况,插入队列后线程池关闭了,需要从队列中删除
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();// 确保线程池中有线程执行
}
}
/**
3.3 执行任务
* 确保线程池中有线程执行
* 只有两种情况会启动线程:
* 1. 当前线程数小于corePoolSize,以corePoolSize为界限启动一个线程
* 2. 线程池参数corePoolSize=0且此时线程池中没有线程,以maximumPoolSize为界限启动一个线程
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 线程数小于corePoolSize,以corePoolSize为界限启动一个线程
if (wc < corePoolSize)
addWorker(null, true);
/*
* 线程数大于等于corePoolSize且线程数==0,其实就是线程池参数corePoolSize=0且此时线程池中没有线程
* 以maximumPoolSize为界限启动一个线程
*/
else if (wc == 0)
addWorker(null, false);
}
线程池中的活动线程会循环到任务队列中取任务,当队头任务还没到期时,线程阻塞至队头任务到期时间,然后再取任务;
取出任务后执行,因为任务是ScheduledFutureTask类型(添加任务时封装的),执行ScheduledFutureTask.run();
ScheduledFutureTask.run()执行当前任务,设置下次执行时间并将任务放入线程池;
线程池中的活动线程会循环到任务队列中取任务,...循环...
/**
* ScheduledThreadPoolExecutor提交的任务被封装成ScheduledFutureTask,所以任务执行要通过这个run()方法
*/
public void run() {
boolean periodic = isPeriodic();// 是否是周期执行
// 当前线程池运行状态下如果不可以执行任务,取消该任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果不是周期性任务,直接调用FutureTask中的run方法执行
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果是周期性任务,调用FutureTask中的runAndReset方法执行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();// 计算下次执行该任务的时间
reExecutePeriodic(outerTask);// 将新任务再次放入线程池等待被执行任务
}
}
/**
* 计算下次执行该任务的时间
*/
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
/**
3.3 执行过程总结
* 将新任务再次放入线程池等待被执行任务
*/
void reExecutePeriodic(RunnableScheduledFuture task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);// 加入延时队列
// 删除不符合条件任务
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();// 确保线程池中有线程执行
}
}
Runable任务封装成ScheduledFutureTask;
任务加入延时队列,同时在队列中按照执行的时间顺序排序,最先执行的任务在队头;
确保线程池中有活动线程,如果没有就启动一个;
线程池中的活动线程会循环到任务队列中取任务,当队头任务还没到期时,线程阻塞至队头任务到期时间,然后再取任务;
取出任务后执行,因为任务是ScheduledFutureTask类型(添加任务时封装的),执行ScheduledFutureTask.run();
ScheduledFutureTask.run()执行当前任务,设置下次执行时间并将任务放入线程池;
线程池中的活动线程会循环到任务队列中取任务,...循环...
4. scheduleAtFixedRate() VS scheduleWithFixedDelay()
scheduleAtFixedRate(); 固定周期执行任务,每次执行的开始时间之间的间隔是固定的,最开始就能够确定之后每次执行的时间;
scheduleWithFixedDelay(); 固定延时周期执行任务,上一次执行结束到下一次执行开始的间隔时间是固定的,由于每次执行任务花费时间不一定相同,所以只有在上次执行结束之后才能确定下次执行开始的时间。
从源码角度理解scheduleAtFixedRate()和scheduleWithFixedDelay()的不同,由两个细节决定:
细节一:构造ScheduledFutureTask时,scheduleAtFixedRate传入period(>0),而scheduleWithFixedDelay传入-delay(<0)。
public ScheduledFuture scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask sft =
new ScheduledFutureTask(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));// z这里是period
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask sft =
new ScheduledFutureTask(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));// 这里是-delay
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
细节二:执行完一次后设置下次执行时间时,
p>0,scheduleAtFixedRate,下次执行开始时间=上次开始执行时间+周期
period<=0,scheduleWithFixedDelay,下次执行开始时间=上次结束执行时间(now)+周期
/**
* 计算下次执行该任务的时间
*/
private void setNextRunTime() {
long p = period;
// p>0,scheduleAtFixedRate,下次执行开始时间=上次开始执行时间+周期
if (p > 0)
time += p;
// period<=0,scheduleWithFixedDelay,下次执行开始时间=上次结束执行时间+周期
else
time = triggerTime(-p);
}
long triggerTime(long delay) {
5. 总结
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
ScheduledThreadPoolExecutor 可以用来在给定延时后执行异步任务或者周期性执行任务,也就是我们说的定时器。ScheduledThreadPoolExecutor基于线程池,通过多线程实现延时和周期执行。
ScheduledThreadPoolExecutor的四种用法:
schedule(Runnable command, long delay, TimeUnit unit);达到给定的延时时间后,执行任务,Runnable不能返回结果;
schedule(Callable callable, long delay, TimeUnit unit); 达到给定的延时时间后,执行任务,Callable可以返回结果;
第三种scheduleAtFixedRate(); 固定周期执行任务,每次执行的开始时间之间的间隔是固定的,最开始就能够确定之后每次执行的时间;
第四种scheduleWithFixedDelay(); 固定延时周期执行任务,上一次执行结束到下一次执行开始的间隔时间是固定的,由于每次执行任务花费时间不一定相同,所以只有在上次执行结束之后才能确定下次执行开始的时间。
执行过程:
Runable任务封装成ScheduledFutureTask;
任务加入延时队列,同时在队列中按照执行的时间顺序排序,最先执行的任务在队头;
确保线程池中有活动线程,如果没有就启动一个;
线程池中的活动线程会循环到任务队列中取任务,当队头任务还没到期时,线程阻塞至队头任务到期时间,然后再取任务;
取出任务后执行,因为任务是ScheduledFutureTask类型(添加任务时封装的),执行ScheduledFutureTask.run();
ScheduledFutureTask.run()执行当前任务,设置下次执行时间并将任务放入线程池;
线程池中的活动线程会循环到任务队列中取任务,...循环...
并发系列文章汇总
之前,给大家发过三份Java面试宝典,这次新增了一份,目前总共是四份面试宝典,相信在跳槽前一个月按照面试宝典准备准备,基本没大问题。
《java面试宝典5.0》(初中级)
《350道Java面试题:整理自100+公司》(中高级)
《资深java面试宝典-视频版》(资深)
《Java[BAT]面试必备》(资深)
分别适用于初中级,中高级,资深级工程师的面试复习。
内容包含java基础、javaweb、mysql性能优化、JVM、锁、百万并发、消息队列,高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级等等。
获取方式:点“在看”,V信关注上述Java最全面试题库号并回复【面试】即可领取,更多精彩陆续奉上。
看到这里,证明有所收获
// ......
3. 执行过程
}
/**
2.4 DelayedWorkQueue
* ScheduledThreadPoolExecutor提交的任务被封装成ScheduledFutureTask,所以任务执行要通过这个run()方法
*/
public void run() {
boolean periodic = isPeriodic();// 是否是周期执行
// 当前线程池运行状态下如果不可以执行任务,取消该任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果不是周期性任务,直接调用FutureTask中的run方法执行
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果是周期性任务,调用FutureTask中的runAndReset方法执行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();// 计算下次执行该任务的时间
reExecutePeriodic(outerTask);// 将新任务再次放入线程池等待被执行任务
}
}
}