简介

基于优先级队列,以过期时间作为排序的基准,剩余时间最少的元素排在队首。只有过期的元素才能出队,在此之前,线程等待。

源码解析

属性

     private final transient ReentrantLock lock = new ReentrantLock(); // 可重入锁
private final PriorityQueue<E> q = new PriorityQueue<E>(); // 优先级队列
private Thread leader = null; // 领导者线程,成为领导者的线程,只会等待队首元素的delay时间,其他线程会一直等待,直到有线程唤醒它,一直等待的这些线程会伴随着一个个元素的出队相继成为leader,同一时刻,只有一个线程作为领导者等待一个delay时间
private final Condition available = lock.newCondition(); // 条件队列,阻塞的线程会被压入条件队列,等待被唤醒

构造方法

     public DelayQueue() {
} public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}

关键方法

offer(E e)

     public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 获取锁
try {
q.offer(e); // 入队
if (q.peek() == e) { // 队首元素更新为新加入的元素
leader = null; // 上一个领导者线程同其他线程一样被同等对待(等着被唤醒),重新抢占leader,其实不必抢占(因为只有一个线程被唤醒),只不过leader线程等待的delay需要更新(新的队首元素的delay值)
available.signal(); // 只会唤醒Condition队列的队首线程
}
return true;
} finally {
lock.unlock(); // 释放锁
}
}

take()

     public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 响应中断
try {
for (;;) {
E first = q.peek(); // 取出队首元素
if (first == null) // 若为空
available.await(); // 说明队列为空,线程等待
else {
long delay = first.getDelay(NANOSECONDS); // 否则,获取first的delay值
if (delay <= 0) // 若已到期
return q.poll(); // 弹出此元素
first = null; // 线程等待时,不保持对元素的引用,
if (leader != null) // 领导者线程不为空,则等待
available.await();
else {
Thread thisThread = Thread.currentThread(); // 否则,设置当前线程为领导者线程
leader = thisThread;
try {
available.awaitNanos(delay); // 等待delay时间
} finally {
if (leader == thisThread) // 等待结束后,如果自己是领导者线程,重置leader为null
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null) // 如果自己是领导线程(上面把leader置为null了),并且有元素,则唤醒其他线程(其中一个,下一届的leader,如此循环下去)
available.signal();
lock.unlock(); // 释放锁
}
}

leader线程,leader-follower模式,当真省去了不必要的出队入队操作(出入的是Condition的队列,出入者是唤醒又马上要等待的线程)

每一届的leader线程对应当前的队首元素,当这届leader线程拿到元素后,会空出leader位置,并唤醒其中等待时间最长的线程成为下一届的leader,如此传递下去。

行文至此结束。

尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_delayqueue.html

05-11 13:22