DelayQueue 类关系图
- 从类关系图谱上看,本质上具有集合、队列、阻塞阻塞队列、延迟等特性
应用场景:
- 延迟队列(类似RocketMQ中提供的机制)
- 定时任务(定时触发某个任务)
核心原理:
- 队列中的元素按到期时间排好序;
- 假设存在3个消费者线程
- 线程1通过争抢成为了leader
- 线程1查看队列头部元素
- 发现需要2s后到期,则进入睡眠状态2s后唤醒
- 此时线程2、3处于待命状态,不会做任何事情
- 线程1唤醒后,拿到对象1后,向线程2、3发送signal
- 线程2、3收到信号后,争抢leader
- 此处假设线程2抢到leader
- 线程2查看对象2状态,休眠3s后唤醒
- 后续逻辑与线程1逻辑类同
- 线程2被唤醒后,线程3成为leader进入等待状态
- 此时,若线程1已处理完毕,则继续处于待命状态
- 若线程1未处理完毕,则继续处理
- 一种不好的情况,3个线程因处理时间较长,目前都在处理中状态;
- 此时对象4快要到期了,没有消费者线程空下来消费
- 此时对象4的处理会延期
- 如果元素进入队列很快、且元素间到期时间相对集中,并且元素处理时间较长时,可能造成队列元素堆积情况
- 还有一种特殊情况,若目前处于左图现状
- 队列中的头元素突然发生变化
- 因为leader是取头元素的,此时的leader将没有意义
- 则将把当前leader = null
- 此时可能唤醒线程2、3中的某一个成为新的leader
- 新的leader将重新查看当前队列中最新的头元素
- 再后面的逻辑与上述一致;
核心方法offer()
核心方法take()
offfer() ->插入元素到队列中
peek() -> 窥视 查看
await() -> 待命
awaitNanos - > 等待
signal() -> 发出信号
poll() -> 从队列中弹出头部元素
lockInterruptibly() ->加了一把可中断锁
延迟队列实现代码
/**
* @author qinchen
* @date 2021/6/17 14:27
* @description 延迟队列数据对象
*/
public class Order implements Delayed {
/**
* 延迟时间
*/
private Long delayTime;
private String name;
public Order(Long delayTime, String name) {
this.delayTime = System.currentTimeMillis() + delayTime;
this.name = name;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
Order order = (Order) o;
Long t = this.delayTime - order.delayTime;
if( t > 0) {
return 1;
}
if( t < 0) {
return -1;
}
return 0;
}
public String getName() {
return name;
}
}
public class OrderConsumer implements Runnable{
private DelayQueue<Order> queue;
public OrderConsumer(DelayQueue<Order> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Order take = queue.take();
System.out.println("消费的订单名称:" + take.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Order order1 = new Order(5000L, "Order1");
Order order2 = new Order(12000L, "Order2");
Order order3 = new Order(3000L, "Order3");
DelayQueue<Order> queue = new DelayQueue<>();
queue.offer(order1);
queue.offer(order2);
queue.offer(order3);
ExecutorService exec = new ThreadPoolExecutor(4, 8,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
exec.execute(new OrderConsumer(queue));
exec.shutdown();