1. 延迟任务方案

延迟任务的实现方案有很多,常见的有四类:

以上四种方案都可以解决问题,其中采用RabbitMQ实现延迟任务可参考我另一篇博客RabbitMQ安装配置,封装工具类,发送消息及监听,延迟消息

本例中我们会使用DelayQueue方案。这种方案使用成本最低,而且不依赖任何第三方服务,减少了网络交互。

但缺点也很明显,就是不能在分布式环境下使用,需要占用JVM内存,且在数据量非常大的情况下可能会有问题。。

如果项目中数据量非常大,DelayQueue不能满足业务需求,也可以替换为其它延迟队列方式,例如Redisson、MQ等

2. DelayQueue的原理

首先来看一下DelayQueue的源码:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    
    // ... 略
}

可以看到DelayQueue实现了BlockingQueue接口,是一个阻塞队列。队列就是容器,用来存储东西的。DelayQueue叫做延迟队列,其中存储的就是延迟执行的任务

我们可以看到DelayQueue的泛型定义:

DelayQueue<E extends Delayed>

这说明存入DelayQueue内部的元素必须是Delayed类型,这其实就是一个延迟任务的规范接口。来看一下:

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

从源码中可以看出,Delayed类型必须具备两个方法:

  • getDelay():获取延迟任务的剩余延迟时间
  • compareTo(T t):比较两个延迟任务的延迟时间,判断执行顺序

可见,Delayed类型的延迟任务具备两个功能:获取剩余延迟时间、比较执行顺序。当然,我们可以对Delayed做实现和功能扩展,比如添加延迟任务的数据。

新建延迟任务时,放在这样的一个Delayed类型的延迟任务并设定固定的延迟时间到DelayQueue队列。DelayQueue会调用compareTo方法,根据剩余延迟时间对任务排序。剩余延迟时间越短的越靠近队首,这样就会被优先执行。

3. DelayQueue的用法

首先定义一个Delayed类型的延迟任务类,要能保持任务数据。

package com.gzdemo.delay.utils;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
@Slf4j
public class DelayedTask<T> implements Delayed {

    private T data;// 数据
    private long deadLineTime;// 当前消息什么时候到期

    // Duration jdk 提供的一个可以表示指定时间单位 时间的 对象
    public DelayedTask(T data, Duration duration) {
        this.data = data;
        deadLineTime =System.currentTimeMillis() +duration.toMillis();
    }

    /**
     * DelayQueue 的take() 方法 会不断调用   本方法,获取消息的剩余时间
     *  1) 消息剩余时间<=0 表示消息到期
     *  2) >0 表示消息未到期
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        //log.info("getDelay方法被调用了");
        //return  deadLineTime-System.currentTimeMillis();
        return unit.convert(deadLineTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }
    /**
     * 如果有多个消息,则 底层存储消息时会调用该方法进行排序
     * 返回值:
     *    如果>0  表示当前任务 到期时间 长,应该放入队列的后面
     *    如果<=0  表示当前任务 到期时间 断,应该放入队列的前面
     */
    @Override
    public int compareTo(Delayed other) {
        // 获取当前任务时间和其他任务时间的 差
        Long i = this.getDelay(TimeUnit.SECONDS) -other.getDelay(TimeUnit.SECONDS);
        return i.intValue();
    }
}

接下来就可以创建延迟任务,交给延迟队列保存:

import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.DelayQueue;

@Slf4j
@RestController
public class TestController {

    @GetMapping("/addDelayedTask")
    public String addDelayedTask() throws InterruptedException {
        DelayQueue<DelayedTask<String>> queue = new DelayQueue(); //  模拟了一个 消息队列(理解为搭建了一台RabbitMQ)

        // 模拟消息的生产者
        queue.add(new DelayedTask("任务1的数据", Duration.ofSeconds(8))); // 添加任务到队列
        queue.add(new DelayedTask("任务2的数据", Duration.ofSeconds(3))); // 添加任务到队列
        queue.add(new DelayedTask("任务3的数据", Duration.ofSeconds(5))); // 添加任务到队列
        // queue.add(new DelayedTask("任务3的数据")); // 添加任务到队列
        log.info("{} 消息放入了...", LocalTime.now());
        // 模拟消息的消费者
        while (true) {
            // take() 方法可以理解为  监听器 Listener , 如果队列中有要返回的任务,则返回任务,放行
            // 如果没有就等待
            DelayedTask<String> task = queue.take();

            log.info("{} 收到了消息:{}",LocalTime.now(),task.getData());
            String data = task.getData();

        }
    }
}

注意

这里我们是直接同一个线程来执行任务了。当没有任务的时候线程会被阻塞。而在实际开发中,我们会准备线程池,开启多个线程来执行队列中的任务。

测试
使用DelayQueue实现延迟任务-LMLPHP

4. 项目中使用案例

工具类,这原本是一个Redis相关的业务工具类,在创建使用redis过程中,需要用到延时任务,在这里把除了延时任务外的别的内容去掉了,展示了项目中要使用DelayQueue的大概骨架。

import com.gzdemo.delay.domain.LearningRecord;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;

@Component
@Slf4j
@RequiredArgsConstructor
public class LearningRecordCacheHandler {
    private DelayQueue<DelayedTask<RecordQueueData>> queue = new DelayQueue<>();

    // 消息的消费时机: 启动时拉取
    /**
     *  @PostConstruct: 是spring 支持的一个注解
     *                  写到方法上,表述 当前类的构造方法执行后执行: 初始化对象后执行
     */
    @PostConstruct
    public void  init(){
        System.out.println("主线程...对象创建了");
        // 启动时开启一个子线程,主线程不会阻塞
        // Thread 对象 创建的线程,不支持后续扩展线程池
//        new Thread(() -> {
//               this.dealData();;
//        }).start();
        // jdk 8 提供了支持线程池,创建线程的方式 CompletableFuture
        CompletableFuture.runAsync(()->{
            this.dealData();;
        });
    }
    public void  dealData(){
        while (true){
            try {
                // 获取延时消息
                DelayedTask<RecordQueueData> task = queue.take();
                RecordQueueData data = task.getData();
                log.info("{},延时队列获取到了消息:{}", LocalTime.now(),data);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    // 写入消息到队列
    public void writeRecord2Queue(LearningRecord record){
        log.info("{},延时队列放入了消息lessonId为{}",LocalTime.now(),record.getLessonId());
        queue.add(new DelayedTask<>(new RecordQueueData(record), Duration.ofSeconds(15)));
    }
    // 队列中的消息对象
    @Data
    @NoArgsConstructor
    public class RecordQueueData{
        private Integer moment;
        private Long lessonId;
        private Long sectionId;

        public RecordQueueData(LearningRecord record) {
            this.moment = record.getMoment();
            this.lessonId = record.getLessonId();
            this.sectionId = record.getSectionId();
        }
    }

}

通用的DelayedTask

@Data
@Slf4j
public class DelayedTask<T> implements Delayed {

    private T data;// 数据
    private long deadLineTime;// 当前消息什么时候到期

    // Duration jdk 提供的一个可以表示指定时间单位 时间的 对象
    public DelayedTask(T data, Duration duration) {
        this.data = data;
        deadLineTime =System.currentTimeMillis() +duration.toMillis();
    }

    /**
     * DelayQueue 的take() 方法 会不断调用   本方法,获取消息的剩余时间
     *  1) 消息剩余时间<=0 表示消息到期
     *  2) >0 表示消息未到期
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        //log.info("getDelay方法被调用了");
        //return  deadLineTime-System.currentTimeMillis();
        return unit.convert(deadLineTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }
    /**
     * 如果有多个消息,则 底层存储消息时会调用该方法进行排序
     * 返回值:
     *    如果>0  表示当前任务 到期时间 长,应该放入队列的后面
     *    如果<=0  表示当前任务 到期时间 断,应该放入队列的前面
     */
    @Override
    public int compareTo(Delayed other) {
        // 获取当前任务时间和其他任务时间的 差
        Long i = this.getDelay(TimeUnit.SECONDS) -other.getDelay(TimeUnit.SECONDS);
        return i.intValue();
    }
}

添加任务的测试controller

    @Autowired
    LearningRecordCacheHandler learningRecordCacheHandler;

    @GetMapping("/addDelayedTask2/{lessonId}")
    public String addDelayedTask2(@PathVariable Long lessonId) throws InterruptedException {
        LearningRecord record = LearningRecord.builder()
                                    .lessonId(lessonId)
                                    .moment(234)
                                    .sectionId(21412L)
                                    .build();
        learningRecordCacheHandler.writeRecord2Queue(record);
        return "添加消息成功";
    }

测试
使用DelayQueue实现延迟任务-LMLPHP

5. 完整的实际开发案例

package com.xxx.learning.utils;

import com.tianji.common.utils.JsonUtils;
import com.tianji.learning.domain.po.LearningLesson;
import com.tianji.learning.domain.po.LearningRecord;
import com.tianji.learning.mapper.LearningRecordMapper;
import com.tianji.learning.service.ILearningLessonService;
import com.tianji.learning.service.ILearningRecordService;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.function.Supplier;

/**
 * 添加学习记录处理器工具类
 * 1) 处理缓存
 * 2)
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class LearningRecordCacheHandler {
    private final StringRedisTemplate redisTemplate;
    private final ILearningLessonService lessonService;
    private final LearningRecordMapper recordMapper;
    // 缓存前缀
    private static final String LEARNING_RECORD_CACHE_PREFIX="learning:record:";
    private DelayQueue<DelayedTask<RecordQueueData>> queue = new DelayQueue<>();


    // 消息的消费时机: 启动时拉取

    /**
     *  @PostConstruct: 是spring 支持的一个注解
     *                  写到方法上,表述 当前类的构造方法执行后执行: 初始化对象后执行
     *    @PreDestroy   是spring 支持的一个注解
     *                   写到方法上,表述 当前类(正常关闭容器)销毁前执行该方法
     */
    @PostConstruct
    public void  init(){
        System.out.println("主线程...对象创建了");
       // 启动时开启一个子线程,主线程不会阻塞
        // Thread 对象 创建的线程,不支持后续扩展线程池
//        new Thread(() -> {
//               this.dealData();;
//        }).start();
        // jdk 8 提供了支持线程池,创建线程的方式 CompletableFuture
        CompletableFuture.runAsync(()->{
            this.dealData();;
        });
    }
    @PreDestroy
    public void  destroy(){
        System.out.println("对象销毁了");
    }
    public void  dealData(){
        while (true){
            try {
                // 获取延时消息
                DelayedTask<RecordQueueData> task = queue.take();
                RecordQueueData data = task.getData();
                log.info("延时队列获取到了消息");
                // 获取redis 中的消息
                LearningRecord record = readRecordCache(data.getLessonId(), data.getSectionId());
                /**
                 * 比较消息
                 *  如果消息不一致,证明: 用户在持续播放, 抛弃消息
                 */
                if(!data.getMoment().equals(record.getMoment())){
                    log.info("redis 中的数据和 延时消息不一致... 不处理");
                    continue;
                }
                log.info("redis 中的数据和 延时消息一致... 处理..写入数据库");
                // 一致 则证明用户暂停播放, 写入数据库
                // 更新 record
                LearningRecord newRecord = new LearningRecord();
                newRecord.setId(record.getId());
                newRecord.setMoment(record.getMoment());
                recordMapper.updateById(newRecord);
                // 更新 lesson
                LearningLesson lesson = new LearningLesson();
                lesson.setId(data.getLessonId());
                lesson.setLatestSectionId(data.sectionId);
                lesson.setLatestLearnTime(LocalDateTime.now().minusSeconds(20));
                lessonService.updateById(lesson);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }



    // 写入消息到队列
    public void  writeRecord2Queue(LearningRecord record){
        log.info("延时队列放入了消息:");
        queue.add(new DelayedTask<>(new RecordQueueData(record),Duration.ofSeconds(20)));
    }
    //1) 写入缓存
    public void writeRecordCache(LearningRecord record){
        log.info("缓存放入了消息:");
        // 1.转成只有三个属性的对象
        RecordCacheData cacheData = new RecordCacheData(record);
        // 2. 转成json 字符串
        String jsonData = JsonUtils.toJsonStr(cacheData);
        // 3 拼接key  learning:record:6688
        String key =LEARNING_RECORD_CACHE_PREFIX+record.getLessonId();
        // 4 存储缓存(覆盖)
        redisTemplate.opsForHash().put(key,record.getSectionId().toString(),jsonData);
        // 5 设置超时时间
        redisTemplate.expire(key, Duration.ofSeconds(60));

    }
    //2) 读取缓存
    public LearningRecord readRecordCache(Long lessonId,Long sectionId){
        //1 拼接key  learning:record:6688
        String key =LEARNING_RECORD_CACHE_PREFIX+lessonId;
        // 真正的是String
        Object o = redisTemplate.opsForHash().get(key, sectionId.toString());
        if(o==null){
            return null;
        }
        // {id:1,moment:30,finshsend:false}
        // 转成对象
        LearningRecord record = JsonUtils.toBean(o.toString(), LearningRecord.class);
        record.setLessonId(lessonId);
        record.setSectionId(sectionId);
        return record;
    }
    //3) 删除缓存
    public void removeRecordCache(Long lessonId,Long sectionId){
        //1 拼接key  learning:record:6688
        String key =LEARNING_RECORD_CACHE_PREFIX+lessonId;
        redisTemplate.opsForHash().delete(key,sectionId.toString());
    }
    // 存储缓存的对象
    @Data
    @NoArgsConstructor
    public class RecordCacheData{
        private Long id;
        private Boolean finished;
        private Integer moment;

        public RecordCacheData(LearningRecord record) {
            this.id = record.getId();
            this.finished = record.getFinished();
            this.moment = record.getMoment();
        }
    }
    // 队列中的消息对象
    @Data
    @NoArgsConstructor
    public class RecordQueueData{
        private Integer moment;
        private Long lessonId;
        private Long sectionId;

        public RecordQueueData(LearningRecord record) {
            this.moment = record.getMoment();
            this.lessonId = record.getLessonId();
            this.sectionId = record.getSectionId();
        }
    }

}

07-05 01:19