DelayQueue 类关系图

无界阻塞延迟队列DelayQueue基本原理与使用-LMLPHP

  • 从类关系图谱上看,本质上具有集合、队列、阻塞阻塞队列、延迟等特性

应用场景:

  • 延迟队列(类似RocketMQ中提供的机制)
  • 定时任务(定时触发某个任务)

核心原理:

  • 初始状态

无界阻塞延迟队列DelayQueue基本原理与使用-LMLPHP

  1. 队列中的元素按到期时间排好序;
  2. 假设存在3个消费者线程
  3. 线程1通过争抢成为了leader
  4. 线程1查看队列头部元素
  5. 发现需要2s后到期,则进入睡眠状态2s后唤醒
  6. 此时线程2、3处于待命状态,不会做任何事情
  7. 线程1唤醒后,拿到对象1后,向线程2、3发送signal
  8. 线程2、3收到信号后,争抢leader
  • 进一步状态

无界阻塞延迟队列DelayQueue基本原理与使用-LMLPHP

  1. 此处假设线程2抢到leader
  2. 线程2查看对象2状态,休眠3s后唤醒
  3. 后续逻辑与线程1逻辑类同
  4. 线程2被唤醒后,线程3成为leader进入等待状态
  5. 此时,若线程1已处理完毕,则继续处于待命状态
  6. 若线程1未处理完毕,则继续处理
  • 不良状态

无界阻塞延迟队列DelayQueue基本原理与使用-LMLPHP

  1. 一种不好的情况,3个线程因处理时间较长,目前都在处理中状态;
  2. 此时对象4快要到期了,没有消费者线程空下来消费
  3. 此时对象4的处理会延期
  4. 如果元素进入队列很快、且元素间到期时间相对集中,并且元素处理时间较长时,可能造成队列元素堆积情况
  • 特殊状态

无界阻塞延迟队列DelayQueue基本原理与使用-LMLPHP

  1. 还有一种特殊情况,若目前处于左图现状
  2. 队列中的头元素突然发生变化
  3. 因为leader是取头元素的,此时的leader将没有意义
  4. 则将把当前leader = null
  5. 此时可能唤醒线程2、3中的某一个成为新的leader
  6. 新的leader将重新查看当前队列中最新的头元素
  7. 再后面的逻辑与上述一致;

核心方法offer()

无界阻塞延迟队列DelayQueue基本原理与使用-LMLPHP

核心方法take()

无界阻塞延迟队列DelayQueue基本原理与使用-LMLPHP

  • 重要方法解释
offfer() ->插入元素到队列中
peek() -> 窥视 查看
await() -> 待命
awaitNanos - > 等待
signal() -> 发出信号
poll() -> 从队列中弹出头部元素
lockInterruptibly() ->加了一把可中断锁

延迟队列实现代码

/**
 * @author qinchen
 * @date 2021/6/17 14:27
 * @description 延迟队列数据对象
 */
public class Order implements Delayed {

    /**
     * 延迟时间
     */
    private Long delayTime;

    private String name;

    public Order(Long delayTime, String name) {
        this.delayTime = System.currentTimeMillis() + delayTime;
        this.name = name;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {

        Order order = (Order) o;
        Long t = this.delayTime - order.delayTime;

        if( t > 0) {
            return 1;
        }

        if( t < 0) {
            return -1;
        }

        return 0;
    }

    public String getName() {
        return name;
    }
}
public class OrderConsumer implements Runnable{

    private DelayQueue<Order> queue;

    public OrderConsumer(DelayQueue<Order> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {
            try {
                Order take = queue.take();
                System.out.println("消费的订单名称:" + take.getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
Order order1 = new Order(5000L, "Order1");
Order order2 = new Order(12000L, "Order2");
Order order3 = new Order(3000L, "Order3");

DelayQueue<Order> queue = new DelayQueue<>();

queue.offer(order1);
queue.offer(order2);
queue.offer(order3);

ExecutorService exec = new ThreadPoolExecutor(4, 8,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
exec.execute(new OrderConsumer(queue));
exec.shutdown();
06-21 08:58