定时任务
在项目开发过程中,经常需要定时任务来帮助我们实现某些业务功能,比如定时生成数据报表、生成对账单、订单超时处理等。Spring Boot提供了内置的@Scheduled注解实现定时任务的功能。
步骤
1.修改启动类
在启动类上加上@EnableScheduling开启定时任务。
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
使用@EnableScheduling注解打开定时功能之后,默认情况下,系统会自动启动一个线程,调度执行定义的后台定时任务。
2.创建定时任务类
package com.qsdbl.malldemo.component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author: 轻率的保罗
* @since: 2022-11
* @Description: 定时任务类
* 注意:需在启动类上加上@EnableScheduling开启定时任务
*/
@Slf4j
@Component
public class SchedulerTask {
/**
* 定时任务:定期从数据库中移除已删除的数据(deleted=1的数据)
* 测试每10秒执行一次!
*/
@Scheduled(cron="*/10 * * * * ?")//每10秒执行一次
// @Scheduled(fixedRate=10*1000)//每10秒执行一次
// @Scheduled(initialDelay=3000, fixedRate=10000)//首次延迟3秒后执行,之后每10秒执行一次
public void taskCron(){
SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
log.info("执行定时【从数据库中移除已删除超30天的数据】,时间: " + dateFormat.format(new Date()));
}
}
参数
@Scheduled注解可以接受两种定时的参数设置:
- 一种是我们常用的cron参数,设定按Cron表达式方式执行;
- cron参数:使用Cron表达式规则执行,比如@Scheduled(cron=“*/5 * * * * *”)为间隔5秒执行。
- 另一种是按fixedRate设定的固定时间执行。
- fixedRate参数:按指定的固定时间间隔执行,单位为毫秒,如@Scheduled(fixedDelay =5000)为上一次执行完毕时间点之后5秒再次执行,每次执行间隔5秒。
- 还支持简单的延时操作,比如fixedDelay、initialDelay后面填写相应的毫秒数即可:@Scheduled(initialDelay=1000, fixedRate=10000):首次延迟1秒后执行,之后按fixedRate指定的固定时间间隔执行,即每10秒执行一次。
cron表达式
Cron表达式主要由秒、分、小时、日期、月份、星期、年份7个字段构成,其中年份可选。示例中的Cron表达式“*/5 * * * *?”表示每5秒执行一次。
各个字段的含义
Cron一共有7位,最后一位是年份,可以留空。因此,一般我们可以写6位。另外,第6位星期(DayofWeek)的取值范围为1~7,从星期日(SUN)开始。
常用的Cron表达式
Cron表达式看起来晦涩难懂,但是只要明白了字段和通配符的含义,就能一眼看出表达式的触发执行规则。下边是一些常用的Cron表达式:
问题
默认情况下,Spring Boot定时任务是按单线程方式执行的。如果只有一个定时任务,这样做肯定没问题;当定时任务增多时,如果一个任务被阻塞,则会导致其他任务无法正常执行。
测试阻塞
测试多个定时任务造成阻塞现象,添加两个定时任务taskCron、taskFixedRate:
//定时任务类 测试多个定时任务造成阻塞现象
/**
* 定时任务1,每15秒执行一次,会执行10秒(造成10秒阻塞)
*/
@Scheduled(fixedRate=15*1000)//每15秒执行一次
public void taskCron() throws InterruptedException {
SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
log.info("taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: " + dateFormat.format(new Date()));
//模拟延时
Thread.sleep(10*1000);
}
/**
* 定时任务2,每3秒执行一次
*/
@Scheduled(fixedRate=3*1000)//每3秒执行一次
public void taskFixedRate(){
SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
log.info("taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: " + dateFormat.format(new Date()));
}
分析日志:
- 定时任务taskFixedRate、taskCron都有按指定的配置运行
- 所有的定时任务都是在线程"scheduling-1"执行 - 单线程方式执行
- 定时任务taskCron在【18:16:05】开始执行,执行10秒钟;在【18:16:15】执行了三个定时任务taskFixedRate
- 说明,在定时任务taskCron执行期间阻塞了其他定时任务的执行。本应分别在08、11、14秒执行的,推迟到了15秒才执行。
... [ scheduling-1] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:05
... [ scheduling-1] ... : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:16:05
... [ scheduling-1] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:15
... [ scheduling-1] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:15
... [ scheduling-1] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:15
... [ scheduling-1] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:17
... [ scheduling-1] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:20
... [ scheduling-1] ... : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:16:20
多线程
配置文件
springboot配置文件中添加如下配置:
# 异步线程(线程池)配置
# 阿里巴巴编程规范:线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
# 配置核心线程数
async.executor.thread.core_pool_size=5
# 配置最大线程数
async.executor.thread.max_pool_size=10
# 配置队列大小
async.executor.thread.queue_capacity=100
# 配置线程池中的线程的名称前缀
# 阿里巴巴编程规范:创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
async.executor.thread.name.prefix=async-service-
配置类
使用@Configuration
和@EnableAsync
这两个注解,表示这是个配置类,并且是线程池的配置类。自定义线程池,名为asyncServiceExecutor。
package com.qsdbl.malldemo.configuration.thread;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author: 轻率的保罗
* @since: 2022-11
* @Description: 多线程配置类(注解@EnableAsync,开启异步事件支持)
*/
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig {
@Value("${async.executor.thread.core_pool_size}")//从配置文件中获取配置项
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
/**
* 自定义线程池配置类。
* 不要命名为 taskScheduler,与spring框架的bean重名。
* @return
*/
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
//阿里巴巴编程规范:线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
//SpringBoot项目,可使用Spring提供的对 ThreadPoolExecutor 封装的线程池 ThreadPoolTaskExecutor:
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();//自定义ThreadPoolTaskExecutor,会打印线程池情况
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// 1、CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行。
// "该策略既不会抛弃任务,也不会抛出异常,而是将任务回推到调用者。"顾名思义,在饱和的情况下,调用者会执行该任务(而不是由多线程执行)
// 2、AbortPolicy:拒绝策略,直接拒绝抛出异常
// 3、。。。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
使用
- 1、类上要加注解@Component(衍生注解也行,需交由spring容器管理)
- 扩展:交由spring容器管理之后,可通过注解@Autowired在其他地方启用该异步事件(发送电子邮件、批量处理过期订单、批量同步数据等等)
- 2、方法上增加@Async注解,使任务能够异步执行,这样各个后台任务就不会阻塞。@Async注解中指定使用我们配置的线程池asyncServiceExecutor。示例:
@Async(value = "asyncServiceExecutor")
与上边的测试相比多了个@Async(value = "asyncServiceExecutor")
配置:
//定时任务类 测试多个定时任务造成阻塞现象
/**
* 定时任务1,每15秒执行一次,会执行10秒(造成10秒阻塞)
*/
@Async(value = "asyncServiceExecutor")
@Scheduled(fixedRate=15*1000)//每15秒执行一次
public void taskCron() throws InterruptedException {
SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
log.info("taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: " + dateFormat.format(new Date()));
//模拟延时
Thread.sleep(10*1000);
}
/**
* 定时任务2,每3秒执行一次
*/
@Async(value = "asyncServiceExecutor")
@Scheduled(fixedRate=3*1000)//每3秒执行一次
public void taskFixedRate(){
SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
log.info("taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: " + dateFormat.format(new Date()));
}
分析日志:
- 定时任务taskFixedRate、taskCron都有按指定的配置运行
- 所有的定时任务都是在我们配置的线程池"async-service-xxx"中执行 - 不再是单线程方式执行
- 定时任务taskCron在【18:34:26】开始执行,执行10秒钟;在【18:34:29】执行了定时任务taskFixedRate
- 没有出现阻塞其他定时任务执行的情况
... [async-service-1] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:26
... [async-service-2] ... : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:34:26
... [async-service-3] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:29
... [async-service-4] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:32
... [async-service-5] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:35
... [async-service-1] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:38
... [async-service-4] ... : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:34:41
... [async-service-3] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:41
... [async-service-5] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:44
扩展
监控 线程池。往线程池提交任务前,在日志中打印线程池情况。
自定义Executor
编写ThreadPoolTaskExecutor的子类MyThreadPoolTaskExecutor,往线程池提交任务前,在日志中打印线程池情况。
package com.qsdbl.malldemo.configuration.thread;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author: 轻率的保罗
* @since: 2022-11-30
* @Description: 监控 线程池。往线程池提交任务前,在日志中打印线程池情况。
*/
@Slf4j
public class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
/**
* 打印 多线程信息
* 在父类的execute、submit等方法里调用,每次有任务被提交到线程池的时候,都会将当前线程池的基本情况打印到日志中
* @param method 提交任务执行的方法。
* submit():提交任务,返回Future,可以返回结果,可以传入Callable、Runnable。
* execute():提交任务,没有返回值,只可以传入Runnable。
*/
private void showThreadPoolInfo(String method) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
log.info("\n\n提交任务前【线程池】情况:\n {}\n 线程池当前线程数量 = {}\n 当前线程池中正在执行任务的线程数量 = {}\n 队列大小 = {}\n 线程池已执行和未执行的任务总数 = {}\n 已完成的任务数量 = {}\n",
method,
threadPoolExecutor.getPoolSize()+1,
threadPoolExecutor.getActiveCount()+1,
threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("do execute 提交任务");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("do execute 提交任务");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("do submit 提交任务");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("do submit 提交任务");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("do submitListenable 提交任务");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("do submitListenable 提交任务");
return super.submitListenable(task);
}
}
修改配置类
将上边配置类做如下修改:
在创建ThreadPoolTaskExecutor对象时,使用我们自定义的MyThreadPoolTaskExecutor。
...
//SpringBoot项目,可使用Spring提供的对 ThreadPoolExecutor 封装的线程池 ThreadPoolTaskExecutor:
//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();//自定义ThreadPoolTaskExecutor,会打印线程池情况
//配置核心线程数
...
运行效果
提交任务前【线程池】情况:
do submit 提交任务
线程池当前线程数量 = 3
当前线程池中正在执行任务的线程数量 = 2
队列大小 = 0
线程池已执行和未执行的任务总数 = 2
已完成的任务数量 = 1
... [async-service-3] ... : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 19:07:47
如何合理配置线程池
从两个方面考虑,你的任务是CPU密集型还是IO密集型。
CPU密集型
CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。
CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。
CPU密集型任务配置尽可能少的线程数量,一般公式:CPU核数+1个线程的线程池。
IO密集型
IO密集型,即该任务需要大量的IO,即大量的阻塞。
在单线程上运行IO密集型的任务会导致大量的CPU运算能力浪费在等待上。所以在IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞!
IO密集型时,大部分线程都阻塞,故需要多配置线程数,参考公式:CPU核数/(1-阻塞系数),阻塞系数在0.8~0.9之间。
另一种说法:由于IO密集型任务,线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数*2
扩展
System.out.println(Runtime.getRuntime().availableProcessors());
// 查看CPU核数
MacOS 命令查看CPU信息:sysctl machdep.cpu
qsdbl@macbook mysql % sysctl machdep.cpu
machdep.cpu.cores_per_package: 8
machdep.cpu.core_count: 8
machdep.cpu.logical_per_package: 8
machdep.cpu.thread_count: 8
machdep.cpu.brand_string: Apple M1