最近公司需要实现一个订单超时自动关闭的功能,由Java这块来实现
一开始我以为就是定时任务,深入了解了之后发现并不是,官方名称应该叫延时任务,到时间之后 执行传过来的回调函数
这个功能我一共前前后后写了三版,写完第三版之后回头看第一版写的,简直就是****(脏话)
第二版是采用多线程轮询机制实现的 但是针对到时并发执行有很大问题 虽然实际项目中还没有达到高并发 但还是一直想实现一个完美的方案 于是有了第三版
第三版使用了DelayQueue 其实实现原理是一样的 只是到时执行的入口 和 任务的结构改了一下 这两版我都有使用redis做备份 以防项目挂掉任务丢失 解决了多任务同一时间并发问题 和redis储存结构问题
如果有更好的方案 希望您可以不吝啬的与我分享一下
PS:RedisService的方案自己写就可以了 我使用的是 StringRedisTemplate + Jedis
/** * @Author : Yanqiang * @Date : 2019/1/18 * @Param : [db] * @return : void * @Description : 切换不同db */ public void switchRedisDb( int db){ JedisConnectionFactory jedisConnectionFactory = (JedisConnectionFactory) stringRedisTemplate.getConnectionFactory(); jedisConnectionFactory.setDatabase(db); stringRedisTemplate.setConnectionFactory(jedisConnectionFactory); ValueOperations valueOperations = stringRedisTemplate.opsForValue(); } /** * 写入缓存 */ public void setStringRedis(final String key, String value){ ValueOperations<String, String> operations = stringRedisTemplate.opsForValue(); operations.set(key, value); }
/** * 获取所有key * @return */ public Set<String> getKeys(){ Set<String> keys = redisTemplate.keys("*"); return keys; }
下面是第二版
参数: key:时间戳 callback:回调函数
接口: executeTimer(String date, String callback) 添加任务
running() 执行任务
/** * @ClassName : MyTask * @Author : Yanqiang * @Date : 2019/1/16 * @Description : 任务实体类 */ public class MyTask implements Runnable { private String date;//到时时间戳 private String callback;//到时回调函数 public MyTask(String date, String callback) { this.date = date; this.callback = callback; } /** * @Author : Yanqiang * @Date : 2019/1/17 * @Param : [] * @return : void * @Description : 任务 */ @Override public void run() { String result = HttpClientUtil.httpGetMethodString(callback);//httpclient调用 Thread t1 = new Thread(() -> { //把重试交给子线程处理 防止占用线程池 retryHttpClient(date, result, callback); }); t1.start(); System.out.println("已执行 ===编号:"+date+" 路径:"+callback+"===="); } /** * @Author : Yanqiang * @Date : 2019/1/17 * @Param : [firstResult, httpUrl] * @return : void * @Description : 重试机制 三次 5秒 10秒 15秒 如果还返回失败 跳过不做处理 */ public void retryHttpClient(String time,String firstResult, String httpUrl) { RedisService redisService = ApplicationContextProvider.getBean(RedisService.class); JSONObject jsonResult = JSONObject.fromObject(firstResult); if (null != jsonResult) { int errno = (int) jsonResult.get("errno"); //取结果判断是否执行成功 if (errno != 0) { try { //5秒 10秒 15秒 for (int i = 1; i <= 3; i++) { Thread.sleep(5000 * i); String result = HttpClientUtil.httpGetMethodString(httpUrl); //logger.info("=====执行重试 地址: " + httpUrl + " 返回:" + result + "======"); JSONObject retryResult = JSONObject.fromObject(result); if (null != retryResult) { int retryErrno = (int) retryResult.get("errno"); //如果执行成功 跳出重试 if (retryErrno == 0) { break; }else { //把错误的存入redis DB 3库 redisService.switchRedisDb(3); redisService.setStringRedis(time,callback); //切回redis DB 2 redisService.switchRedisDb(2); } } } } catch (InterruptedException e) { e.printStackTrace(); } } } } }
@Component
@Service
public class TimerReviseService extends Thread implements CommandLineRunner {
private final static Logger logger = LoggerFactory.getLogger(TimerReviseService.class);
@Resource
RedisService redisService;
//线程安全 并且有序的map (key大小排序)
ConcurrentSkipListMap<String, String> map = new ConcurrentSkipListMap<>();
/**
* 超出核心线程的任务会创建新的线程来执行,
* 最多线程为最大线程数,核心线程为核心线程数,
* 在任务完成后空闲超过5秒就会被回收
*/
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3,//核心线程数
100,//最大线程数
10,//空闲时间为5秒
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
/**
* 添加任务
*/
@Transactional
public BaseResult executeTimer(String date, String callback) {
BaseResult baseResult = new BaseResult();
map.put(date, callback);//扔进任务
redisService.set(date, callback);//扔进redis
String log = "key:" + date + " value:" + callback + " 加入 队列与redis; ";
logger.info(log);
baseResult.setData(log);
return baseResult;
}
@Override
public void run(String... strings) {
this.running();
}
public void running() {
//项目启动先从redis读取缓存数据
ConcurrentSkipListMap<String, String> redisMap = new ConcurrentSkipListMap<>();
Set<String> rediskeys = redisService.getKeys();
if (null != rediskeys && rediskeys.size()>0){
for (String key:rediskeys) {
String value = (String) redisService.get(key);
redisMap.put(key,value);
}
map.putAll(redisMap);
}
//问题就在这里 如果两个甚至几十个任务同一时间 没办法同时执行 而且使用毫秒级的时间戳做key不能保证完全不重复
Thread t1 = new Thread(() -> {
logger.info("===开始轮询是否有延迟任务===");
while (true) {
try {
if (map.size()>0){
Map.Entry<String, String> stringStringEntry = map.firstEntry();//取出第一个 也就是key最小 最早应该执行的任务
String key = stringStringEntry.getKey();
if (stampToMillis(key) < 0) {//与当前时间相比较 算出相差几秒 <=0 执行
String value = stringStringEntry.getValue();
executor.execute(new MyTask(key,value));//使用线程池执行
map.remove(key);//任务队列中删除
redisService.remove(key);//redis中删除
}
}
sleep(1000);//1秒查一次
} catch (Exception e) {
logger.error(StatusCode.TIMER_SERVER_RUN_ERROR.getMsg());
e.printStackTrace();
}
}
});
t1.start();
}
/**
* @return : long
* @Author : Yanqiang
* @Date : 2019/1/8
* @Param : [stampStr]
* @Description : 计算两个时间戳相差秒数
*/
public long stampToMillis(String stampStr) {
long thisTime = new Long(stampStr);
long systemTime = new Date().getTime();
long c = 0;
if (stampStr.length() == 13) {
c = (thisTime - systemTime) / 1000;
} else {
long systemTime10 = new Date().getTime() / 1000;
c = (thisTime - systemTime10);
}
return c;
}
以下是使用了DelayQueue的方案
网上的方案都拆分的比较细 好多个类 我把它们简化集合到一起了
1.优化了key方案
2.内存中使用对象 而不是map 多任务分离 到时互不影响
/** * @Author : Yanqiang * @Date : 2019/1/24 * @Param : * @return : * @Description : 任务调度系统 * 与上面相同 只是重写了Delayed 的 compareTo(),getDelay(),hashCode(),equals()方法 */ public class Task implements Runnable,Delayed { private final static Logger logger = LoggerFactory.getLogger(Task.class); //到期时间 private final long time; //redisKey private final String redisKey; //问题对象 private final String task; private static final AtomicLong atomic = new AtomicLong(0); private final long n; public Task(long timeout, String t,String redisKey) { this.time = timeout; this.task = t; this.n = atomic.getAndIncrement(); this.redisKey = redisKey; } /** * 返回与此对象相关的剩余延迟时间,以给定的时间单位表示 */ @Override public long getDelay(TimeUnit unit) { String thisTime = String.valueOf(time); long systemTime = new Date().getTime(); long c = 0; if (thisTime.length() == 13) { c = (time - systemTime) / 1000; } else { long systemTime10 = new Date().getTime() / 1000; c = (time - systemTime10); } return unit.convert(c, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed other) { if (other == this) return 0; if (other instanceof Task) { Task x = (Task) other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (n < x.n) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } public String getTask() { return this.task; } @Override public int hashCode() { return task.hashCode(); } @Override public boolean equals(Object object) { if (object instanceof Task) { return object.hashCode() == hashCode() ? true : false; } return false; } @Override public void run() { String result = HttpClientUtil.httpGetMethodString(task); RedisService redisService = ApplicationContextProvider.getBean(RedisService.class); redisService.remove(redisKey); Thread t1 = new Thread(() -> { //把重试交给子线程处理 防止占用线程池 retryHttpClient(result, task); }); t1.start(); logger.info("已执行 ===编号:"+time+" 路径:"+task+"===="); } /** * @Author : Yanqiang * @Date : 2019/1/17 * @Param : [firstResult, httpUrl] * @return : void * @Description : 重试机制 三次 5秒 10秒 15秒 如果还返回失败 跳过不做处理 */ public void retryHttpClient(String firstResult, String httpUrl) { RedisService redisService = ApplicationContextProvider.getBean(RedisService.class); JSONObject jsonResult = JSONObject.fromObject(firstResult); if (null != jsonResult) { int errno = (int) jsonResult.get("errno"); //取结果判断是否执行成功 if (errno != 0) { try { //5秒 10秒 15秒 for (int i = 1; i <= 3; i++) { Thread.sleep(5000 * i); String result = HttpClientUtil.httpGetMethodString(httpUrl); //logger.info("=====执行重试 地址: " + httpUrl + " 返回:" + result + "======"); JSONObject retryResult = JSONObject.fromObject(result); if (null != retryResult) { int retryErrno = (int) retryResult.get("errno"); //如果执行成功 跳出重试 if (retryErrno == 0) { break; }else { if (i==3) { redisService.switchRedisDb(3); redisService.setStringRedis(redisKey, task); redisService.switchRedisDb(2); } } } } } catch (InterruptedException e) { e.printStackTrace(); } } } } public long getTime() { return time; } }
/** * @ClassName : TaskQueueDaemonThread * @Author : Yanqiang * @Date : 2019/1/24 * @Description : */ @Service public class TaskQueueDaemonThread implements CommandLineRunner { private final static Logger logger = LoggerFactory.getLogger(TaskQueueDaemonThread.class); @Resource RedisService redisService; //创建一个DelayQueue private DelayQueue<Task> queue = new DelayQueue<>(); private TaskQueueDaemonThread() { } private static class LazyHolder { private static TaskQueueDaemonThread taskQueueDaemonThread = new TaskQueueDaemonThread(); } public static TaskQueueDaemonThread getInstance() { return LazyHolder.taskQueueDaemonThread; } Executor executor = Executors.newFixedThreadPool(100); //守护线程 private Thread daemonThread; //初始化守护线程 public void init() { //项目启动先从redis读取缓存数据 Set<String> rediskeys = redisService.getKeys(); if (null != rediskeys && rediskeys.size() > 0) { for (String key : rediskeys) { String value = (String) redisService.get(key); //拿到key解析 去除 -后四位随机数 StringBuffer buffer = new StringBuffer(key); String substring = buffer.substring(0, key.length() - 5); //放入队列 queue.put(new Task(Long.valueOf(substring), value, key)); } } daemonThread = new Thread(() -> execute()); daemonThread.setDaemon(true); //daemonThread.setName("任务队列守护进程线程"); daemonThread.start(); } private void execute() { while (true) { try { //从queue中取值 取出之后 queue中会删除此值 Task task = queue.take(); if (task != null) { executor.execute(task);//多线程执行任务 redisService.remove(String.valueOf(task.getTime()));//redis中删除 } } catch (Exception e) { e.printStackTrace(); continue;//发生异常 跳过此次 } } } /** * 添加任务, * time 延迟时间 * task 任务 * 用户为问题设置延迟时间 */ public BaseResult executeTimer(String time, String callback) { BaseResult baseResult = new BaseResult(); //key = 原key + 四位随机数 String randomNum = String.valueOf((long) (Math.random() * 8999) + 1000); String randomAndKey = time + "+" + randomNum; //创建一个任务,将任务放在延迟的队列中 queue.put(new Task(Long.valueOf(time), callback, randomAndKey)); redisService.set(randomAndKey, callback); String log = "key:" + time + " value:" + callback + " 加入队列与redis key:" + randomAndKey; logger.info(log); baseResult.setData(log); return baseResult; } /** * @Author : Yanqiang * @Date : 2019/1/24 * @Param : [strings] * @return : void * @Description : 启动入口 */ @Override public void run(String... strings) { //this.init(); } }