定时任务

在项目开发过程中,经常需要定时任务来帮助我们实现某些业务功能,比如定时生成数据报表、生成对账单、订单超时处理等。Spring Boot提供了内置的@Scheduled注解实现定时任务的功能。


步骤

1.修改启动类

在启动类上加上@EnableScheduling开启定时任务。

@SpringBootApplication
@EnableScheduling
    public class Application {
    public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
    }
}

使用@EnableScheduling注解打开定时功能之后,默认情况下,系统会自动启动一个线程,调度执行定义的后台定时任务。

2.创建定时任务类

package com.qsdbl.malldemo.component;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author: 轻率的保罗
 * @since: 2022-11
 * @Description: 定时任务类
 * 注意:需在启动类上加上@EnableScheduling开启定时任务
 */
@Slf4j
@Component
public class SchedulerTask {

    /**
     * 定时任务:定期从数据库中移除已删除的数据(deleted=1的数据)
     * 测试每10秒执行一次!
     */
    @Scheduled(cron="*/10 * * * * ?")//每10秒执行一次
//    @Scheduled(fixedRate=10*1000)//每10秒执行一次
//    @Scheduled(initialDelay=3000, fixedRate=10000)//首次延迟3秒后执行,之后每10秒执行一次
    public void taskCron(){
        SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
        log.info("执行定时【从数据库中移除已删除超30天的数据】,时间: " + dateFormat.format(new Date()));
    }
}

参数

@Scheduled注解可以接受两种定时的参数设置:

  • 一种是我们常用的cron参数,设定按Cron表达式方式执行;
    • cron参数:使用Cron表达式规则执行,比如@Scheduled(cron=“*/5 * * * * *”)为间隔5秒执行。
  • 另一种是按fixedRate设定的固定时间执行。
    • fixedRate参数:按指定的固定时间间隔执行,单位为毫秒,如@Scheduled(fixedDelay =5000)为上一次执行完毕时间点之后5秒再次执行,每次执行间隔5秒。
    • 还支持简单的延时操作,比如fixedDelay、initialDelay后面填写相应的毫秒数即可:@Scheduled(initialDelay=1000, fixedRate=10000):首次延迟1秒后执行,之后按fixedRate指定的固定时间间隔执行,即每10秒执行一次。

cron表达式

Cron表达式主要由秒、分、小时、日期、月份、星期、年份7个字段构成,其中年份可选。示例中的Cron表达式“*/5 * * * *?”表示每5秒执行一次。

定时任务&多线程-springboot-LMLPHP


各个字段的含义

定时任务&多线程-springboot-LMLPHP

Cron一共有7位,最后一位是年份,可以留空。因此,一般我们可以写6位。另外,第6位星期(DayofWeek)的取值范围为1~7,从星期日(SUN)开始。


常用的Cron表达式

Cron表达式看起来晦涩难懂,但是只要明白了字段和通配符的含义,就能一眼看出表达式的触发执行规则。下边是一些常用的Cron表达式:

定时任务&多线程-springboot-LMLPHP


问题

默认情况下,Spring Boot定时任务是按单线程方式执行的。如果只有一个定时任务,这样做肯定没问题;当定时任务增多时,如果一个任务被阻塞,则会导致其他任务无法正常执行。


测试阻塞

测试多个定时任务造成阻塞现象,添加两个定时任务taskCron、taskFixedRate:

//定时任务类 测试多个定时任务造成阻塞现象

/**
 * 定时任务1,每15秒执行一次,会执行10秒(造成10秒阻塞)
 */
@Scheduled(fixedRate=15*1000)//每15秒执行一次
public void taskCron() throws InterruptedException {
    SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
    log.info("taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: " + dateFormat.format(new Date()));
    //模拟延时
    Thread.sleep(10*1000);
}

/**
 * 定时任务2,每3秒执行一次
 */
@Scheduled(fixedRate=3*1000)//每3秒执行一次
public void taskFixedRate(){
    SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
    log.info("taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: " + dateFormat.format(new Date()));
}

分析日志

  • 定时任务taskFixedRate、taskCron都有按指定的配置运行
  • 所有的定时任务都是在线程"scheduling-1"执行 - 单线程方式执行
  • 定时任务taskCron在【18:16:05】开始执行,执行10秒钟;在【18:16:15】执行了三个定时任务taskFixedRate
    • 说明,在定时任务taskCron执行期间阻塞了其他定时任务的执行。本应分别在08、11、14秒执行的,推迟到了15秒才执行。
... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:05
... [   scheduling-1] ...  : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:16:05
... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:15
... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:15
... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:15


... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:17
... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:20
... [   scheduling-1] ...  : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:16:20


多线程

配置文件

springboot配置文件中添加如下配置:

# 异步线程(线程池)配置
# 阿里巴巴编程规范:线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
# 配置核心线程数
async.executor.thread.core_pool_size=5
# 配置最大线程数
async.executor.thread.max_pool_size=10
# 配置队列大小
async.executor.thread.queue_capacity=100
# 配置线程池中的线程的名称前缀
# 阿里巴巴编程规范:创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
async.executor.thread.name.prefix=async-service-

配置类

使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类。自定义线程池,名为asyncServiceExecutor。

package com.qsdbl.malldemo.configuration.thread;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author: 轻率的保罗
 * @since: 2022-11
 * @Description: 多线程配置类(注解@EnableAsync,开启异步事件支持)
 */
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig {
    @Value("${async.executor.thread.core_pool_size}")//从配置文件中获取配置项
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    /**
     * 自定义线程池配置类。
     * 不要命名为 taskScheduler,与spring框架的bean重名。
     * @return
     */
    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        //阿里巴巴编程规范:线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
        
        //SpringBoot项目,可使用Spring提供的对 ThreadPoolExecutor 封装的线程池 ThreadPoolTaskExecutor:
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//        ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();//自定义ThreadPoolTaskExecutor,会打印线程池情况
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        //     1、CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行。
        //        "该策略既不会抛弃任务,也不会抛出异常,而是将任务回推到调用者。"顾名思义,在饱和的情况下,调用者会执行该任务(而不是由多线程执行)
        //     2、AbortPolicy:拒绝策略,直接拒绝抛出异常
        //     3、。。。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

使用

  • 1、类上要加注解@Component(衍生注解也行,需交由spring容器管理)
    • 扩展:交由spring容器管理之后,可通过注解@Autowired在其他地方启用该异步事件(发送电子邮件、批量处理过期订单、批量同步数据等等)
  • 2、方法上增加@Async注解,使任务能够异步执行,这样各个后台任务就不会阻塞。@Async注解中指定使用我们配置的线程池asyncServiceExecutor。示例:@Async(value = "asyncServiceExecutor")

上边的测试相比多了个@Async(value = "asyncServiceExecutor")配置:

//定时任务类 测试多个定时任务造成阻塞现象

/**
 * 定时任务1,每15秒执行一次,会执行10秒(造成10秒阻塞)
 */
@Async(value = "asyncServiceExecutor")
@Scheduled(fixedRate=15*1000)//每15秒执行一次
public void taskCron() throws InterruptedException {
    SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
    log.info("taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: " + dateFormat.format(new Date()));
    //模拟延时
    Thread.sleep(10*1000);
}

/**
 * 定时任务2,每3秒执行一次
 */
@Async(value = "asyncServiceExecutor")
@Scheduled(fixedRate=3*1000)//每3秒执行一次
public void taskFixedRate(){
    SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
    log.info("taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: " + dateFormat.format(new Date()));
}

分析日志

  • 定时任务taskFixedRate、taskCron都有按指定的配置运行
  • 所有的定时任务都是在我们配置的线程池"async-service-xxx"中执行 - 不再是单线程方式执行
  • 定时任务taskCron在【18:34:26】开始执行,执行10秒钟;在【18:34:29】执行了定时任务taskFixedRate
    • 没有出现阻塞其他定时任务执行的情况
... [async-service-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:26
... [async-service-2] ...  : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:34:26


... [async-service-3] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:29
... [async-service-4] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:32
... [async-service-5] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:35
... [async-service-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:38
... [async-service-4] ...  : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:34:41
... [async-service-3] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:41
... [async-service-5] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:44

扩展

监控 线程池。往线程池提交任务前,在日志中打印线程池情况。

自定义Executor

编写ThreadPoolTaskExecutor的子类MyThreadPoolTaskExecutor,往线程池提交任务前,在日志中打印线程池情况。

package com.qsdbl.malldemo.configuration.thread;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author: 轻率的保罗
 * @since: 2022-11-30
 * @Description: 监控 线程池。往线程池提交任务前,在日志中打印线程池情况。
 */
@Slf4j
public class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    /**
     * 打印 多线程信息
     * 在父类的execute、submit等方法里调用,每次有任务被提交到线程池的时候,都会将当前线程池的基本情况打印到日志中
     * @param method 提交任务执行的方法。
     *               submit():提交任务,返回Future,可以返回结果,可以传入Callable、Runnable。
     *               execute():提交任务,没有返回值,只可以传入Runnable。
     */
    private void showThreadPoolInfo(String method) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (null == threadPoolExecutor) {
            return;
        }

        log.info("\n\n提交任务前【线程池】情况:\n    {}\n    线程池当前线程数量 = {}\n    当前线程池中正在执行任务的线程数量 = {}\n    队列大小 = {}\n    线程池已执行和未执行的任务总数 = {}\n    已完成的任务数量 = {}\n",
                method,
                threadPoolExecutor.getPoolSize()+1,
                threadPoolExecutor.getActiveCount()+1,
                threadPoolExecutor.getQueue().size(),
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("do execute 提交任务");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("do execute 提交任务");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("do submit 提交任务");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("do submit 提交任务");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("do submitListenable 提交任务");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("do submitListenable 提交任务");
        return super.submitListenable(task);
    }
}

修改配置类

将上边配置类做如下修改:

在创建ThreadPoolTaskExecutor对象时,使用我们自定义的MyThreadPoolTaskExecutor。

...
//SpringBoot项目,可使用Spring提供的对 ThreadPoolExecutor 封装的线程池 ThreadPoolTaskExecutor:
//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();//自定义ThreadPoolTaskExecutor,会打印线程池情况
//配置核心线程数
...

运行效果

提交任务前【线程池】情况:
    do submit 提交任务
    线程池当前线程数量 = 3
    当前线程池中正在执行任务的线程数量 = 2
    队列大小 = 0
    线程池已执行和未执行的任务总数 = 2
    已完成的任务数量 = 1

... [async-service-3] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 19:07:47

如何合理配置线程池

从两个方面考虑,你的任务是CPU密集型还是IO密集型。

CPU密集型

CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。

CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。

CPU密集型任务配置尽可能少的线程数量,一般公式:CPU核数+1个线程的线程池。

IO密集型

IO密集型,即该任务需要大量的IO,即大量的阻塞。

在单线程上运行IO密集型的任务会导致大量的CPU运算能力浪费在等待上。所以在IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞!

IO密集型时,大部分线程都阻塞,故需要多配置线程数,参考公式:CPU核数/(1-阻塞系数),阻塞系数在0.8~0.9之间。

另一种说法:由于IO密集型任务,线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数*2


扩展

System.out.println(Runtime.getRuntime().availableProcessors()); // 查看CPU核数
MacOS 命令查看CPU信息:sysctl machdep.cpu

qsdbl@macbook mysql % sysctl machdep.cpu 
machdep.cpu.cores_per_package: 8
machdep.cpu.core_count: 8
machdep.cpu.logical_per_package: 8
machdep.cpu.thread_count: 8
machdep.cpu.brand_string: Apple M1


12-01 10:50