1. 延迟任务方案
延迟任务的实现方案有很多,常见的有四类:
以上四种方案都可以解决问题,其中采用RabbitMQ实现延迟任务可参考我另一篇博客RabbitMQ安装配置,封装工具类,发送消息及监听,延迟消息
本例中我们会使用DelayQueue方案。这种方案使用成本最低,而且不依赖任何第三方服务,减少了网络交互。
但缺点也很明显,就是不能在分布式环境下使用,需要占用JVM内存,且在数据量非常大的情况下可能会有问题。。
如果项目中数据量非常大,DelayQueue不能满足业务需求,也可以替换为其它延迟队列方式,例如Redisson、MQ等
2. DelayQueue的原理
首先来看一下DelayQueue的源码:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
// ... 略
}
可以看到DelayQueue实现了BlockingQueue接口,是一个阻塞队列。队列就是容器,用来存储东西的。DelayQueue叫做延迟队列,其中存储的就是延迟执行的任务。
我们可以看到DelayQueue的泛型定义:
DelayQueue<E extends Delayed>
这说明存入DelayQueue
内部的元素必须是Delayed
类型,这其实就是一个延迟任务的规范接口。来看一下:
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
从源码中可以看出,Delayed类型必须具备两个方法:
getDelay()
:获取延迟任务的剩余延迟时间compareTo(T t)
:比较两个延迟任务的延迟时间,判断执行顺序
可见,Delayed类型的延迟任务具备两个功能:获取剩余延迟时间、比较执行顺序。当然,我们可以对Delayed做实现和功能扩展,比如添加延迟任务的数据。
新建延迟任务时,放在这样的一个Delayed
类型的延迟任务并设定固定的延迟时间到DelayQueue
队列。DelayQueue
会调用compareTo
方法,根据剩余延迟时间对任务排序。剩余延迟时间越短的越靠近队首,这样就会被优先执行。
3. DelayQueue的用法
首先定义一个Delayed类型的延迟任务类,要能保持任务数据。
package com.gzdemo.delay.utils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
@Slf4j
public class DelayedTask<T> implements Delayed {
private T data;// 数据
private long deadLineTime;// 当前消息什么时候到期
// Duration jdk 提供的一个可以表示指定时间单位 时间的 对象
public DelayedTask(T data, Duration duration) {
this.data = data;
deadLineTime =System.currentTimeMillis() +duration.toMillis();
}
/**
* DelayQueue 的take() 方法 会不断调用 本方法,获取消息的剩余时间
* 1) 消息剩余时间<=0 表示消息到期
* 2) >0 表示消息未到期
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
//log.info("getDelay方法被调用了");
//return deadLineTime-System.currentTimeMillis();
return unit.convert(deadLineTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 如果有多个消息,则 底层存储消息时会调用该方法进行排序
* 返回值:
* 如果>0 表示当前任务 到期时间 长,应该放入队列的后面
* 如果<=0 表示当前任务 到期时间 断,应该放入队列的前面
*/
@Override
public int compareTo(Delayed other) {
// 获取当前任务时间和其他任务时间的 差
Long i = this.getDelay(TimeUnit.SECONDS) -other.getDelay(TimeUnit.SECONDS);
return i.intValue();
}
}
接下来就可以创建延迟任务,交给延迟队列保存:
import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.DelayQueue;
@Slf4j
@RestController
public class TestController {
@GetMapping("/addDelayedTask")
public String addDelayedTask() throws InterruptedException {
DelayQueue<DelayedTask<String>> queue = new DelayQueue(); // 模拟了一个 消息队列(理解为搭建了一台RabbitMQ)
// 模拟消息的生产者
queue.add(new DelayedTask("任务1的数据", Duration.ofSeconds(8))); // 添加任务到队列
queue.add(new DelayedTask("任务2的数据", Duration.ofSeconds(3))); // 添加任务到队列
queue.add(new DelayedTask("任务3的数据", Duration.ofSeconds(5))); // 添加任务到队列
// queue.add(new DelayedTask("任务3的数据")); // 添加任务到队列
log.info("{} 消息放入了...", LocalTime.now());
// 模拟消息的消费者
while (true) {
// take() 方法可以理解为 监听器 Listener , 如果队列中有要返回的任务,则返回任务,放行
// 如果没有就等待
DelayedTask<String> task = queue.take();
log.info("{} 收到了消息:{}",LocalTime.now(),task.getData());
String data = task.getData();
}
}
}
注意:
这里我们是直接同一个线程来执行任务了。当没有任务的时候线程会被阻塞。而在实际开发中,我们会准备线程池,开启多个线程来执行队列中的任务。
测试
4. 项目中使用案例
工具类,这原本是一个Redis相关的业务工具类,在创建使用redis过程中,需要用到延时任务,在这里把除了延时任务外的别的内容去掉了,展示了项目中要使用DelayQueue的大概骨架。
import com.gzdemo.delay.domain.LearningRecord;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
@Component
@Slf4j
@RequiredArgsConstructor
public class LearningRecordCacheHandler {
private DelayQueue<DelayedTask<RecordQueueData>> queue = new DelayQueue<>();
// 消息的消费时机: 启动时拉取
/**
* @PostConstruct: 是spring 支持的一个注解
* 写到方法上,表述 当前类的构造方法执行后执行: 初始化对象后执行
*/
@PostConstruct
public void init(){
System.out.println("主线程...对象创建了");
// 启动时开启一个子线程,主线程不会阻塞
// Thread 对象 创建的线程,不支持后续扩展线程池
// new Thread(() -> {
// this.dealData();;
// }).start();
// jdk 8 提供了支持线程池,创建线程的方式 CompletableFuture
CompletableFuture.runAsync(()->{
this.dealData();;
});
}
public void dealData(){
while (true){
try {
// 获取延时消息
DelayedTask<RecordQueueData> task = queue.take();
RecordQueueData data = task.getData();
log.info("{},延时队列获取到了消息:{}", LocalTime.now(),data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 写入消息到队列
public void writeRecord2Queue(LearningRecord record){
log.info("{},延时队列放入了消息lessonId为{}",LocalTime.now(),record.getLessonId());
queue.add(new DelayedTask<>(new RecordQueueData(record), Duration.ofSeconds(15)));
}
// 队列中的消息对象
@Data
@NoArgsConstructor
public class RecordQueueData{
private Integer moment;
private Long lessonId;
private Long sectionId;
public RecordQueueData(LearningRecord record) {
this.moment = record.getMoment();
this.lessonId = record.getLessonId();
this.sectionId = record.getSectionId();
}
}
}
通用的DelayedTask
@Data
@Slf4j
public class DelayedTask<T> implements Delayed {
private T data;// 数据
private long deadLineTime;// 当前消息什么时候到期
// Duration jdk 提供的一个可以表示指定时间单位 时间的 对象
public DelayedTask(T data, Duration duration) {
this.data = data;
deadLineTime =System.currentTimeMillis() +duration.toMillis();
}
/**
* DelayQueue 的take() 方法 会不断调用 本方法,获取消息的剩余时间
* 1) 消息剩余时间<=0 表示消息到期
* 2) >0 表示消息未到期
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
//log.info("getDelay方法被调用了");
//return deadLineTime-System.currentTimeMillis();
return unit.convert(deadLineTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 如果有多个消息,则 底层存储消息时会调用该方法进行排序
* 返回值:
* 如果>0 表示当前任务 到期时间 长,应该放入队列的后面
* 如果<=0 表示当前任务 到期时间 断,应该放入队列的前面
*/
@Override
public int compareTo(Delayed other) {
// 获取当前任务时间和其他任务时间的 差
Long i = this.getDelay(TimeUnit.SECONDS) -other.getDelay(TimeUnit.SECONDS);
return i.intValue();
}
}
添加任务的测试controller
@Autowired
LearningRecordCacheHandler learningRecordCacheHandler;
@GetMapping("/addDelayedTask2/{lessonId}")
public String addDelayedTask2(@PathVariable Long lessonId) throws InterruptedException {
LearningRecord record = LearningRecord.builder()
.lessonId(lessonId)
.moment(234)
.sectionId(21412L)
.build();
learningRecordCacheHandler.writeRecord2Queue(record);
return "添加消息成功";
}
测试
5. 完整的实际开发案例
package com.xxx.learning.utils;
import com.tianji.common.utils.JsonUtils;
import com.tianji.learning.domain.po.LearningLesson;
import com.tianji.learning.domain.po.LearningRecord;
import com.tianji.learning.mapper.LearningRecordMapper;
import com.tianji.learning.service.ILearningLessonService;
import com.tianji.learning.service.ILearningRecordService;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.function.Supplier;
/**
* 添加学习记录处理器工具类
* 1) 处理缓存
* 2)
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class LearningRecordCacheHandler {
private final StringRedisTemplate redisTemplate;
private final ILearningLessonService lessonService;
private final LearningRecordMapper recordMapper;
// 缓存前缀
private static final String LEARNING_RECORD_CACHE_PREFIX="learning:record:";
private DelayQueue<DelayedTask<RecordQueueData>> queue = new DelayQueue<>();
// 消息的消费时机: 启动时拉取
/**
* @PostConstruct: 是spring 支持的一个注解
* 写到方法上,表述 当前类的构造方法执行后执行: 初始化对象后执行
* @PreDestroy 是spring 支持的一个注解
* 写到方法上,表述 当前类(正常关闭容器)销毁前执行该方法
*/
@PostConstruct
public void init(){
System.out.println("主线程...对象创建了");
// 启动时开启一个子线程,主线程不会阻塞
// Thread 对象 创建的线程,不支持后续扩展线程池
// new Thread(() -> {
// this.dealData();;
// }).start();
// jdk 8 提供了支持线程池,创建线程的方式 CompletableFuture
CompletableFuture.runAsync(()->{
this.dealData();;
});
}
@PreDestroy
public void destroy(){
System.out.println("对象销毁了");
}
public void dealData(){
while (true){
try {
// 获取延时消息
DelayedTask<RecordQueueData> task = queue.take();
RecordQueueData data = task.getData();
log.info("延时队列获取到了消息");
// 获取redis 中的消息
LearningRecord record = readRecordCache(data.getLessonId(), data.getSectionId());
/**
* 比较消息
* 如果消息不一致,证明: 用户在持续播放, 抛弃消息
*/
if(!data.getMoment().equals(record.getMoment())){
log.info("redis 中的数据和 延时消息不一致... 不处理");
continue;
}
log.info("redis 中的数据和 延时消息一致... 处理..写入数据库");
// 一致 则证明用户暂停播放, 写入数据库
// 更新 record
LearningRecord newRecord = new LearningRecord();
newRecord.setId(record.getId());
newRecord.setMoment(record.getMoment());
recordMapper.updateById(newRecord);
// 更新 lesson
LearningLesson lesson = new LearningLesson();
lesson.setId(data.getLessonId());
lesson.setLatestSectionId(data.sectionId);
lesson.setLatestLearnTime(LocalDateTime.now().minusSeconds(20));
lessonService.updateById(lesson);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 写入消息到队列
public void writeRecord2Queue(LearningRecord record){
log.info("延时队列放入了消息:");
queue.add(new DelayedTask<>(new RecordQueueData(record),Duration.ofSeconds(20)));
}
//1) 写入缓存
public void writeRecordCache(LearningRecord record){
log.info("缓存放入了消息:");
// 1.转成只有三个属性的对象
RecordCacheData cacheData = new RecordCacheData(record);
// 2. 转成json 字符串
String jsonData = JsonUtils.toJsonStr(cacheData);
// 3 拼接key learning:record:6688
String key =LEARNING_RECORD_CACHE_PREFIX+record.getLessonId();
// 4 存储缓存(覆盖)
redisTemplate.opsForHash().put(key,record.getSectionId().toString(),jsonData);
// 5 设置超时时间
redisTemplate.expire(key, Duration.ofSeconds(60));
}
//2) 读取缓存
public LearningRecord readRecordCache(Long lessonId,Long sectionId){
//1 拼接key learning:record:6688
String key =LEARNING_RECORD_CACHE_PREFIX+lessonId;
// 真正的是String
Object o = redisTemplate.opsForHash().get(key, sectionId.toString());
if(o==null){
return null;
}
// {id:1,moment:30,finshsend:false}
// 转成对象
LearningRecord record = JsonUtils.toBean(o.toString(), LearningRecord.class);
record.setLessonId(lessonId);
record.setSectionId(sectionId);
return record;
}
//3) 删除缓存
public void removeRecordCache(Long lessonId,Long sectionId){
//1 拼接key learning:record:6688
String key =LEARNING_RECORD_CACHE_PREFIX+lessonId;
redisTemplate.opsForHash().delete(key,sectionId.toString());
}
// 存储缓存的对象
@Data
@NoArgsConstructor
public class RecordCacheData{
private Long id;
private Boolean finished;
private Integer moment;
public RecordCacheData(LearningRecord record) {
this.id = record.getId();
this.finished = record.getFinished();
this.moment = record.getMoment();
}
}
// 队列中的消息对象
@Data
@NoArgsConstructor
public class RecordQueueData{
private Integer moment;
private Long lessonId;
private Long sectionId;
public RecordQueueData(LearningRecord record) {
this.moment = record.getMoment();
this.lessonId = record.getLessonId();
this.sectionId = record.getSectionId();
}
}
}