前言

接下来继续BlockingQueue的另一个实现,优先级阻塞队列PriorityBlockingQueue。PriorityBlockingQueue是一个无限容量的阻塞队列,由于容量是无限的所以put等入队操作其实不存在阻塞,只要内存足够都能够立即入队成功,当然多个入队操作的线程之间还是存在竞争唯一锁的互斥访问。虽然PriorityBlockingQueue逻辑上是无界的,但是尝试添加元素时还是可能因为资源耗尽而抛出OutOfMemoryError。

该队列也不允许放入null值,它使用与类java.util.PriorityQueue 相同的排序规则,也不允许放入不可比较的对象,这样做会导致ClassCastException。

值得注意的是,虽然PriorityBlockingQueue叫优先级队列,但是并不是说元素一入队就会按照排序规则被排好序,而是只有通过调用take、poll方法出队或者drainTo转移出的队列顺序才是被优先级队列排过序的。所以通过调用 iterator() 以及可拆分迭代器 spliterator() 方法返回的迭代器迭代的元素顺序都没有被排序。如果需要有序遍历可以通过 Arrays.sort(pq.toArray()) 方法来排序。注意peek方法永远只获取且不删除第一个元素,所以多次调用peek都是返回同样的值。

PriorityBlockingQueue其实是通过Comparator来排序的,要么入队的元素实现了Comparator接口(即所谓的自然排序),要么构造PriorityBlockingQueue实例的时候传入一个统一的Comparator实例,如果两者兼备那么以后者为准。PriorityBlockingQueue不保证具有相同优先级的元素顺序,但是你可以定义自定义类或比较器,通过辅助属性来决定优先级相同的元素的顺序,后文会举例说明。

PriorityBlockingQueue的实现使用了基于数组的平衡二叉堆(小的在上,大的在下方,即最小堆),使用单个ReentrantLock锁来保护公共操作,它虽然被设计成无界的,但是初始容量只有11,会随着元素的入队空间不够用再扩容,在扩容的时候使用了一个单独的自旋锁而不需要一直占用ReentrantLock锁,以保证消费线程的take等出队操作能够同时进行,避免了消费线程的反复延迟和随之而来的其它入队操作的元素积累,该类虽然有用到java.util.PriorityQueue,但是只是用于本队列的序列号与反序列化,以兼容老版本的实现,但是这种兼容性的维护将以开销瞬间翻倍为代价。所以没有学习过java.util.PriorityQueue也不影响对PriorityBlockingQueue的理解。

在队列不为空的任意时刻,PriorityBlockingQueue只保证队列中的第一个节点即head是当前最小或者是最高优先级的节点,peek、take、poll都是直接返回该head,至于head出队之后谁最小或者说谁最该被优先出队是由take或者poll方法在返回之前通过平衡二叉堆算法排序得出的,这个节点有可能就是队列中的某一个,也可能是在这段时间之内新入队的一个具有更高优先级的节点。

源码解析

先来看看PriorityBlockingQueue的成员属性:

 /**
* Default array capacity. 默认的数组容量。
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11; /**
* The maximum size of array to allocate. 要分配的数组的最大大小。
* Some VMs reserve some header words in an array. 一些vm在数组中保留一些头信息
* Attempts to allocate larger arrays may result in
* OutOfMemoryError: Requested array size exceeds VM limit
尝试分配更大的数组可能会导致OutOfMemoryError:请求的数组大小超过VM限制
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
lowest value is in queue[0], assuming the queue is nonempty. 优先级队列表现为一个平衡的二进制堆:queue[n]的两个子队列是queue[2*n+1]和queue[2*(n+1)]。 优先级队列通过comparator或元素的自然顺序排序,如果comparator为null:对于堆中的每个节点n和n的每个子节点d, n <= d。假设队列非空,则queue[0]中的值最小。 */
private transient Object[] queue; /**
* The number of elements in the priority queue. 优先级队列中的元素个数。
*/
private transient int size; /**
* The comparator, or null if priority queue uses elements'
* natural ordering. 比较器,如果优先级队列使用元素的自然顺序排序,则为null。
*/
private transient Comparator<? super E> comparator; /**
* Lock used for all public operations 用于所有公共操作的锁
*/
private final ReentrantLock lock; /**
* Condition for blocking when empty
*/
private final Condition notEmpty; /**
* Spinlock for allocation, acquired via CAS. 用于分配的自旋锁,通过CAS获取。
*/
private transient volatile int allocationSpinLock; /**
* A plain PriorityQueue used only for serialization,
* to maintain compatibility with previous versions
* of this class. Non-null only during serialization/deserialization.
一个普通的PriorityQueue,仅用于序列化,以保持与该类以前版本的兼容性。
仅在序列化/反序列化期间非空。
*/
private PriorityQueue<E> q;

成员属性很简单一目了然,PriorityBlockingQueue理论上队列的容量最大可以到 Integer.MAX_VALUE - 8 (减8是因为某些JVM实现会在数组中保存对象头信息),Object[] queue 这个对象数组就是实际用来存放队列元素的,这一点和ArrayBlockingQueue类似都是采用数组存放队列元素,但是PriorityBlockingQueue不是先进先出队列,实际采用了平衡二叉堆的方式来存放,这个怎么理解呢,虽然数组是一个一维数据结构,但是它的存储顺序则是按二叉树从上到下,从左到右的顺序依次存放到数组中的。该平衡二叉树有一个根节点即queue[0]也是最小的元素, 根节点下面有两个子节点,左子节点对应queue[1],右子节点对应queue[2],然后左右子节点又各自有自己的两个子节点,依次类推就将该二叉树与数组对应起来了。平衡二叉堆中的每一个节点都 小于等于 它的两个子节点。

然后是构造方法,PriorityBlockingQueue有三个构造方法,可以指定初始容量大小、统一的比较器Comparator实例和初始集合元素。初始容量小于1将会抛出异常,前两个构造方法都很简单,只分析第三个构造方法:

 /**
* Creates a {@code PriorityBlockingQueue} containing the elements
* in the specified collection. If the specified collection is a
* {@link SortedSet} or a {@link PriorityQueue}, this
* priority queue will be ordered according to the same ordering.
* Otherwise, this priority queue will be ordered according to the
* {@linkplain Comparable natural ordering} of its elements.
创建包含指定集合中的元素的PriorityBlockingQueue。
如果指定的集合是SortedSet或PriorityQueue,则此优先级队列将按照相同的顺序排序。
否则,此优先级队列将根据其元素的自然顺序进行排序。
*
* @param c the collection whose elements are to be placed
* into this priority queue
* @throws ClassCastException if elements of the specified collection
* cannot be compared to one another according to the priority
* queue's ordering 如果指定集合中的元素不能根据优先级队列的顺序相互比较
* @throws NullPointerException if the specified collection or any
* of its elements are null 如果指定的集合或其任何元素为空抛出NullPointerException
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order 需要重新排序,即堆化。
boolean screen = true; // true if must screen for nulls 空值校验
if (c instanceof SortedSet<?>) { //SortedSet
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) { //PriorityBlockingQueue
PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class) //如果c.toArray不正确返回Object[],复制它。
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) { //n == 1 什么鬼???
for (int i = 0; i < n; ++i)
if (a[i] == null) //空值校验
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify(); //堆化,即需要根据比较器重新排序,不考虑调用之前元素的顺序。
}

这个构造方法的大概逻辑就是如果传入的集合本身是具有特定的排序规则例如SortedSet、PriorityBlockingQueue那么在转换成数组之后,在可能的null元素校验通过之后直接将queue指向该数组,对于非SortedSet、PriorityBlockingQueue类型的集合,最后还需要进行堆化,即根据元素的自然排序重新生成平衡二叉堆结构对应的数组。总之就是需要按照PriorityBlockingQueue的排序规则重新组织数组元素的顺序。

 /**
* Establishes the heap invariant (described above) in the entire tree,
* assuming nothing about the order of the elements prior to the call.
假设在调用之前元素的顺序没有任何变化,
*/
private void heapify() {
Object[] array = queue;
int n = size;
int half = (n >>> 1) - 1; //非叶子节点
Comparator<? super E> cmp = comparator;
if (cmp == null) {
for (int i = half; i >= 0; i--) //遍历所有非叶子节点
siftDownComparable(i, (E) array[i], array, n);
}
else {
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}

堆化的过程由heapify方法完成,该方法通过从下到上遍历每一个非叶子节点(源码中所有 小于half的节点都是非叶子节点)执行siftDownComparable或者siftDownUsingComparator,这两个方法的逻辑是一样的,不同的一个使用的元素自己实现的比较器方法,另一个使用统一的比较器实例来进行比较得出优先级。这两个方法也是PriorityBlockingQueue的精髓之一,我们来了解一下这个方法:

siftDownComparable/siftDownUsingComparator

 /**
* Inserts item x at position k, maintaining heap invariant by
* demoting x down the tree repeatedly until it is less than or
* equal to its children or is a leaf.
将项x插入到位置k处,通过重复地将x从树中降级来保持堆不变,直到它小于或等于其子元素或叶节点。
*
用x元素与k的最小的子节点比较(如果左子节点比右子节点大,就拿右子节点),如果x小或相等就将x设置为k位置的元素,否则k的最小子节点升级为父节点。
如果是k的子节点小,继续用刚才升级的子节点(如果是右节点小那就是右子节点)的子节点与x比较,找到最小的一个填充因为k的最小子元素升级空出来的位置,
直到将x元素插到某个非叶子节点或者已经到达叶子节点了,那么就将x插入最后升级为父节点的叶子节点处。 * @param k the position to fill
* @param x the item to insert
* @param array the heap array
* @param n heap size
*/
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) { //队列不为空
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf 非叶节点时执行循环
while (k < half) { // k 属于非叶子节点。
int child = (k << 1) + 1; // assume left child is least(假设左子结点最小) k的左子节点
Object c = array[child]; // 左子节点元素
int right = child + 1; // 右子节点索引
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right]; //如果左子节点比右子节点大,就转到更小的右子节点。
if (key.compareTo((T) c) <= 0)
break; //x比k的最小的子节点都小,退出
array[k] = c; //子节点就是最小的了,将其子节点升级为父节点。
k = child; //继续以子节点为父节点往下找子节点的子节点
}
array[k] = key; //将x插入到k
}
}

siftDownComparable方法所要表达的意思其方法doc只有一句话:将项x插入到位置k处,通过重复地将x从树中降级来保持堆不变,直到它小于或等于其子元素或叶节点。第7到第9行是我的理解,说白了就是尝试将x元素插入到k位置,但是如果k的某个子节点更小,就把该子节点提升到k位置,x当然就只能尝试插入到被提升的那个子节点出,同理如果该子节点的子节点有比x更小的,那只好有又把更小的孙子提上来,x继续往下,以此类推,最终要么x在中间找到一个合适的位置插入,要么顺着那条路径下来所有的子孙节点都比x小,那x肯定只能插入到最末端能够子节点因为升级而空下来的位置了。

举例说明,构造方法传入的ArrayList [5, 1, 4, 8, 7, 9, 0, 6, 3, 2, 9],那么在没有调用堆化方法之前,queue数组中的顺序就是一样的,对应的平衡二叉树如下:

Java同步数据结构之PriorityBlockingQueue-LMLPHP即从上到下,从左到右的顺序。堆化heapify就是从下到上循环遍历所有非叶子节点,本例的非叶子节点调用顺序分别是:⑦、⑧、④、①、⑤。

以倒数第一个非叶子节点⑦执行siftDownComparable(4, 7, array, 11)为例,左子节点② < 右子节点⑨,就拿左子节点与⑦比较,子节点小,所以②升级到k的位置,⑦插入原来②的位置,已经到最底了所以结束,最终变成这样:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

然后,倒数第二个非叶子节点⑧执行siftDownComparable(3, 8, array, 11),左子节点⑥ > 右子节点③,就拿右子节点与⑧比较,又是子节点小,所以③升级到k位置,⑧插入到原来③的位置,又到最底了,最终变成:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

其次,倒数第三个非叶子节点④执行siftDownComparable(2, 4, array, 11),左子节点⑨ > 右子节点零,就拿右子节点与④比较,又是子节点小,所以零升级到k位置,④插入到原来零的位置,又到最底了,最终变成:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

再次,倒数第四个非叶子节点①执行siftDownComparable(1, 1, array, 11),左子节点③ > 右子节点②,就拿右子节点与①比较,这次是①更小,所以不做交换将①就放到原位k退出siftDownComparable方法,最终变成无变化。

最后,到处第五个非叶子节点⑤执行siftDownComparable(0, 5, array, 11),左子节点① < 右子节点零,就拿右子节点与⑤比较,子节点零更小,所以零升级到k位置,⑤插入到原来零的位置,k也移动到被升级的零的位置即2,注意这时候⑤还是处于非叶子节点处,所以继续看它新的子节点,左子节点⑨ > 右子节点④,就拿右子节点与⑤比较,又是子节点小,所以④再次升级到k(k==2)的位置,⑤插入到原来④的位置,这时候已经到达叶子节点了,退出siftDownComparable方法,并且这次已经是顶层根节点了,所以堆化方法heapify也结束,最终的结构变成:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

即,最终的数组queue中的元素顺序是:[0, 1, 4, 3, 2, 9, 5, 6, 8, 7, 9],可以看到入队后的元素并没有完成最后真正的排序,如果你通过迭代器迭代的话,反应的元素顺序也是这样的乱序(只能保证head节点是最小的节点)。只有通过take等从队列头部依次出队的元素才会是真正想要的顺序(当然drainTo方法转移出的集合元素的顺序也是被最终排序的),因为执行出队方法的时候还会进行一次排序,暂不表,我们继续看PriorityBlockingQueue的其它方法。

入队方法add/put/offer

入队方法最终都是调用的offer,我们直接看offer方法的源码:

 /**
* Inserts the specified element into this priority queue.
* As the queue is unbounded, this method will never return {@code false}.
* 将指定的元素插入此优先级队列。由于队列是无界的,因此该方法永远不会返回false。 * @param e the element to add
* @return {@code true} (as specified by {@link Queue#offer})
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering
如果无法根据优先级队列的顺序将指定的元素与当前优先级队列中的元素进行比较。 * @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap); //队列中的实际元素个数已经满了,扩容。
try {
//从 n(最末尾) 开始往上找一个刚好比它小或者相等的祖父节点,
//然后将其插入到该祖父节点的下方(作为其子节点)
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1; //size加1
notEmpty.signal(); //唤醒可能阻塞的消费线程
} finally {
lock.unlock();
}
return true;
}

入队操作offer方法有两个重要的地方,第一个就是如果队列满了size(实际元素个数) >= queue.length (当前数组长度),那么要进行扩容,直到扩容成功或者抛出OutOfMemoryError;第二个就是入队的实现siftUpComparable/siftUpUsingComparator方法。

先看扩容方法:

 /**
* Tries to grow array to accommodate at least one more element
* (but normally expand by about 50%), giving up (allowing retry)
* on contention (which we expect to be rare). Call only while
* holding lock.
尝试扩展数组以容纳至少一个以上元素(但通常扩展50%),在争用时放弃(允许重试)(我们希望这种情况很少见)。
只有在持有锁的状态下才调用。
*
* @param array the heap array
* @param oldCap the length of the array
*/
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock 必须释放并重新获取主锁。
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) { //CAS 获取扩容的能力,标记我要扩容了
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small 如果容量比较小扩的多一些。
(oldCap >> 1));//本身已经超过64了,每次只扩张一半
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow 超出了最大限制
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError(); //已经最大了,抛出OutOfMemoryError
newCap = MAX_ARRAY_SIZE; //否则扩容到允许的最大限制
}
if (newCap > oldCap && queue == array) //扩容成功,并且当前的源数组没有被其他线程扩容。
newArray = new Object[newCap]; //创建一个新的数组
} finally {
allocationSpinLock = 0; //扩容完成,还原标记
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield(); //正好有另一个线程已经在扩容了,放弃CPU
lock.lock(); //真正要开始扩容才获取锁,
if (newArray != null && queue == array) {//新数组已经申请好,并且当前并没有被其他线程扩容
queue = newArray; //改变当前数组指向新的数组
System.arraycopy(array, 0, newArray, 0, oldCap); //拷贝元素
}
}

扩容的逻辑很简单,在计算出本次扩容的大小之前,先释放唯一的锁,使其它消费线程不至于被阻塞,使由于被消费,队列已经不满后随后的入队线程也不被阻塞,然后再通过一个allocationSpinLock竞争扩容的执行权,拿到这个标记的线程就可以计算本次应该被扩容到多大,否则就出让CPU;拿到标记的线程计算出新数组的长度之后,还是先要获取到唯一的锁之后才能执行数组扩容操作(即把原来的数组拷贝的新数组,使queue指向这个新数组)。有意思的是如果刚开始队列的容量小于64,那么每次扩张一倍,否则扩张原来的一半,毕竟越到后面资源越紧张了。扩容的精髓就是尽量不阻塞其它出入队操作,只有拷贝数据到新数组的时候才会阻塞其它操作。

再看,入队逻辑siftUpComparable/siftUpUsingComparator

上面介绍过的siftDownXXX方法是PriorityBlockingQueue的精髓之一,另一个就是这两个siftUpXXX方法了,这两组方法就是PriorityBlockingQueue实现的平衡二叉堆排序算法的核心。

 /**
* Inserts item x at position k, maintaining heap invariant by
* promoting x up the tree until it is greater than or equal to
* its parent, or is the root.
将项x插入到位置k处,通过向上提升x直到它大于或等于它的父元素或根元素,从而保持堆不变。
*
* To simplify and speed up coercions and comparisons. the
* Comparable and Comparator versions are separated into different
* methods that are otherwise identical. (Similarly for siftDown.)
* These methods are static, with heap state as arguments, to
* simplify use in light of possible comparator exceptions.
简化和加速强制和比较。Comparable和Comparator版本被分为不同的方法,这些方法在其他方面是相同的。
* 这些方法是静态的,使用堆状态作为参数,以便根据可能的comparator异常简化使用。 siftUp就是从k开始往上找一个刚好比它小或者相等的祖父节点,然后将其插入到该祖父节点的下方(作为其子节点)。
在寻找的过程中也会将比它大的祖父节点的位置下移。 * @param k the position to fill 填补位置
* @param x the item to insert 要插入的项
* @param array the heap array 堆数组
*/
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1; //父节点索引
Object e = array[parent]; //父节点元素
if (key.compareTo((T) e) >= 0) //找到一个比key小或者相等的父节点跳出循环
break;
array[k] = e; //父节点下移
k = parent; //继续往上找父节点的父节点
}
array[k] = key; //将x插入到比它小或者相等的父节点下方(作为其子节点)。
}

siftUpComparable方法就是尝试将x插入到k位置,如果x 比 k位置节点的父节点大,x理所应当的占据k位置,否则,将该父节点降级到k位置,x上升到刚才父节点的位置,然后继续往上比较它新的父节点,如果x依然比此时的新的父节点更小,继续将该新父节点降级到x的位置,x又升级到该位置,直到x发现它大于等于其父节点或者x已经跑到顶变成根节点才结束。其实就是一个x从最后一个元素开始上冒的一个过程。以上面的queue状态[0, 1, 4, 3, 2, 9, 5, 6, 8, 7, 9]为例,此时如果offer(3),即入队一个3,会怎样呢?

根据offer方法,这时候容量刚好已经等于默认初始容量,所以先要扩容一倍,变成11 + (11+2)=24. 然后执行siftUpUsingComparator(11, 3, array),将3插入11的位置,即二叉树最下面的一排,②已经又两个子节点了,⑨还没有子节点,就插到⑨的下面作为其左子节点先:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

然后③与其父节点⑨比较,③更小,所以③和⑨换位,变成:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

继续看新的父节点④,还是③更小,所以③和④再换位,变成:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

最后,③和其父节点零比较,③更大了,所以③就占据该位置,结束。最终数组变成[0, 1, 3, 3, 2, 4, 5, 6, 8, 7, 9, 9],siftUpXXX的算法就是先尝试将x插入k处,并通过向上冒泡来找到一个x真正合适的位置。运用到入队操作offer,就是将新元素插入队列的末尾,然后通过向上冒泡来调整自己的位置。入队操作能够保证queue[0]即head节点是最小的节点,次优先级节点将随着head节点的出队而确立。

出队方法take/poll

除了peek是因为它是获取但不删除数组的第一个元素,所以同时多次调用peek都是返回同一个值即head,例如上面的数组,无论你连续执行多少次peek,得到的都是queue[0] 即0,任意时刻队列只保证queue[0]节点即head是最小的节点,所以peek拿到的节点就是当前队列中最小的节点即最高优先级的节点. 而take和poll在将head节点出队的同时还会将次优先级的节点提升到head处供下一次出队的时候使用,在队列为空时,poll()立即返回null,其它阻塞方法则等待队列不为空或超时,真正的出队逻辑都调用的dequeue方法:

 /**
* Mechanics for poll(). Call only while holding lock. 只有在锁定状态下才调用
*/
private E dequeue() {
int n = size - 1;
if (n < 0) //已经没有元素了
return null;
else {
Object[] array = queue;
E result = (E) array[0]; //取最开头的元素
E x = (E) array[n]; //最后一个元素
array[n] = null; //将最后一个元素位置置空
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n; //修改size
return result;
}
}

出队的逻辑:先直接拿走第一个位置queue[0]的元素,因为队列已经保证在入队的时候,最顶端的根节点就是最小的元素,但是其它节点不一定是按顺序排好的,拿走第一个元素之后,直接把最后一个元素也拿出来,然后调用siftDownXXX方法,尝试把最后那个元素放到queue[0]位置,最后一个元素当然90%的情况下不是最小的,所以siftDownXXX会不断将其降级来把真正最小的元素提升到queue[0]这个位置。

以上面的例子[0, 1, 3, 3, 2, 4, 5, 6, 8, 7, 9, 9]举例,第一次调用take() 返回0,然后把最后一个元素⑨放到queue[0] 的位置:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

然后,⑨的左子节点① < 右子节点③,就拿左子节点① 与 ⑨比较,左子节点更小,所以交换①和⑨交换位置:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

其次,⑨的左子节点③ > 右子节点②,就拿右子节点② 与 ⑨比较,右子节点更小,所以交换②和⑨交换位置:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

最后,⑨的左子节点 ⑦< 右子节点⑨,就拿左子节点⑦ 与 ⑨比较,左子节点更小,所以交换⑦和⑨交换位置:

Java同步数据结构之PriorityBlockingQueue-LMLPHP

自此,已经到达叶子节点了,siftDownXXX方法结束,take方法才真正返回,所以每一次take执行的时候,拿走最小的queue[0]元素时,还会将剩下的元素中最小的一个冒泡到queue[0]位置,以备下一次出队的时候使用,依次类推,每次调用take的时候才会准备下一次的返回,而不是入队的时候就已经排好序,这样做可能为了节省不必要的执行。至此,PriorityBlockingQueue的出入队逻辑已经清楚了。

remove(Object)

另一个比较复杂的方法就是内部删除remove(Object),它先通过indexOf(Object)遍历拿到对应的索引,然后再通过removeAt(index)的方法来真正执行删除逻辑,直接看removeAt:

 /**
* Removes the ith element from queue.从队列中删除第i个元素
*/
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
if (n == i) // removed last element 删除最后一个元素
array[i] = null;
else {
E moved = (E) array[n]; //最后一个有效元素
array[n] = null; //将最后一个有效元素的位置置空
Comparator<? super E> cmp = comparator; //尝试将最后一个元素覆盖i位置,并向下降级来调整位置。
if (cmp == null)
siftDownComparable(i, moved, array, n);
else
siftDownUsingComparator(i, moved, array, n, cmp); //如果向下无可调整,那就需要向上冒泡再次尝试调整。
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}

remove的时候,如果删除的是最后一个元素的时候直接将queue[i] 置为空,如果删除的不是最后一个元素那么就拿走最后一个元素,先执行siftDown直接将最后一个元素覆盖到i位置,然后通过siftUp向下降级来调整位置,如果无可调整即最后一个元素还在i位置,那么还要向上冒泡来调整自己的位置。同时运用了siftDown和siftUp来调整二叉堆的结构。

其它方法就简单了,就不一一列举了。由于PriorityBlockingQueue的无界的,所以调用remainingCapacity()永远返回Integer.MAX_VALUE。toArray方法是拷贝的当前数组中的元素到新的数组返回,所以返回的数组没有排序,并且不会对源队列产生任何影响,drainTo方法再执行元素转移的时候,每转移一个元素都执行了dequeue方法对二叉堆进行了排序,所以drainTo最后返回的数组的排好序的。drainTo传入的数组会被队列元素覆盖,而且超出队列长度的位置都将被重置为null。

迭代器

PriorityBlockingQueue的迭代器是无优先级顺序的,并且当原队列元素有变化的时候,迭代器也不能得到同步更新,因为构造迭代器的实例的时候是直接拷贝了原数组的元素产生一个新的数组来进行遍历:

public Iterator<E> iterator() {
return new Itr(toArray()); //toArray拷贝数组元素创建新数组
}

这样的迭代器只能反映当时的一个队列快照,用处应该不是很大。值得注意的是,虽然该迭代器脱离了原队列的同步更新,但是通过迭代器的remove方法却可以删除原队列中的元素:

public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]); //移除队列中上一次迭代的元素。
lastRet = -1;
}

迭代器remove方法中调用的removeEG方法属于是属于 PriorityBlockingQueue的方法,它会尝试到真正的队列中通过remove(Object)删除对应的元素。

可拆分迭代器spliterator

public Spliterator<E> spliterator() {
return new PBQSpliterator<E>(this, null, 0, -1);
}

PBQSpliterator是PriorityBlockingQueue的内部类实现,下面是它的源码:

 static final class PBQSpliterator<E> implements Spliterator<E> {
final PriorityBlockingQueue<E> queue;
Object[] array;
int index;
int fence; PBQSpliterator(PriorityBlockingQueue<E> queue, Object[] array,
int index, int fence) {
this.queue = queue;
this.array = array;
this.index = index;
this.fence = fence;
} final int getFence() {
int hi;
if ((hi = fence) < 0) //只有顶层迭代器的fence为-1
hi = fence = (array = queue.toArray()).length; //hi=fence=array.length
return hi;
} public Spliterator<E> trySplit() {
int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
return (lo >= mid) ? null :
new PBQSpliterator<E>(queue, array, lo, index = mid);
} @SuppressWarnings("unchecked")
public void forEachRemaining(Consumer<? super E> action) {
Object[] a; int i, hi; // hoist accesses and checks from loop
if (action == null)
throw new NullPointerException();
if ((a = array) == null)
fence = (a = queue.toArray()).length;
if ((hi = fence) <= a.length &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((E)a[i]); } while (++i < hi);
}
} public boolean tryAdvance(Consumer<? super E> action) {
if (action == null)
throw new NullPointerException();
if (getFence() > index && index >= 0) {
@SuppressWarnings("unchecked") E e = (E) array[index++];//取第一个元素
action.accept(e);
return true;
}
return false;
} public long estimateSize() { return (long)(getFence() - index); }//当前迭代器的元素个数 public int characteristics() {
return Spliterator.NONNULL | Spliterator.SIZED | Spliterator.SUBSIZED;
}
}

PBQSpliterator迭代器一旦开始工作,也不能同步原队列数据的更新,因为他内部的实现使用了toArray()方法来产生新的数组,迭代器迭代的时候只会遍历该数组,而与原来的队列没有关系。可拆分迭代器的顺序也不是排序后的,这都和顺序迭代器iterator一致,迭代器拆分的时候返回的是同类型的PBQSpliterator迭代器,并且每次拆分之后形成的新的迭代器都会分走原迭代器数组中前一半的元素。

通过可拆分迭代器的characteristics方法也可以看出可拆分迭代器的特性:Spliterator.NONNULL | Spliterator.SIZED | Spliterator.SUBSIZED,即迭代器不支持空值,不支持多线程修改原队列的数据同步(CONCURRENT),支持顶层迭代器的SIZE以及拆分后的子迭代器的SUBSIZED,因为迭代器要迭代的元素是固定不变的。

关于具有相同优先级的元素的顺序,PriorityBlockingQueue本身是不保证这种元素的顺序的,但是你可以定义一个包装类或者新的比较器来实现,下面这个例子中的两个元素Student和Person对象,本身是按照年龄age来排序的,当年龄相同的时候对于PriorityBlockingQueue就是具有相同优先级,这时候谁先谁后是无法预知的,但是我们可以重新定义一个包装类,依赖第二属性来处理这种具有相同优先级的元素的顺序:

 package com.Queue;

 import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong; /**
* <E extends Comparable> 是上限通配符,限定接受的类型E必须实现了 Comparable 接口。
* <? super E> 是下限通配符,限定接受的类型?必须是E的父类。
* <E extends Comparable<? super E>> 合起来表示接受实现了Comparable接口并且可以和其父类进行比较的类型E及其父类型。
* 说白了就是可以是父与父,子与子,父与子之间都可以进行比较的父子关系类型
*
* @param <E>
*/
public class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> { static final AtomicLong seq = new AtomicLong(0);
final long seqNum; final E entry; public FIFOEntry(E entry) {
seqNum = seq.getAndIncrement();
this.entry = entry;
} public E getEntry() {
return entry;
} public int compareTo(FIFOEntry<E> other) {
int res = entry.compareTo(other.entry);
if (res == 0 && other.entry != this.entry)
res = (seqNum < other.seqNum ? -1 : 1);
return res;
} @Override
public String toString() {
return "FIFOEntry{" +
"seqNum=" + seqNum +
", entry=" + entry +
'}';
} public static void main(String[] args) throws Exception{ FIFOEntry tom = new FIFOEntry(new Student(12, "Tom")); FIFOEntry tony = new FIFOEntry(new Person(12, "Tony")); PriorityBlockingQueue<FIFOEntry> pbQueue = new PriorityBlockingQueue<>(); pbQueue.offer(tony);
pbQueue.offer(tom); System.out.println(pbQueue.take());//FIFOEntry{seqNum=0, entry=Student{age=12, name='Tom'}}
System.out.println(pbQueue.take());//FIFOEntry{seqNum=1, entry=Person{age=12, name='Tony'}} }
} class Person implements Comparable<Person> { Integer age; String name; public Person(Integer age, String name) {
this.age = age;
this.name = name;
} @Override
public int compareTo(Person o) {
return this.age.compareTo(o.age);
} @Override
public String toString() {
return "Person{" +
"age=" + age +
", name='" + name + '\'' +
'}';
}
} class Student extends Person { public Student(Integer age, String name) {
super(age, name);
} @Override
public String toString() {
return "Student{" +
"age=" + age +
", name='" + name + '\'' +
'}';
}
}

上例中的决定排序顺序的第二属性seq是一个静态原子变量,所以谁先构造FIFOEntry实例谁就优先级更高,因此虽然入队的顺序是tony、tom,但是出队的顺序始终是tom优先级更高。

java.util.PriorityQueue

顺便介绍一下这个类吧,从名字也可以看出来,PriorityBlockingQueue是java.util.PriorityQueue的线程安全版本,加入了阻塞等待操作,其实它们的内部实现都是一样的,都是采用基于数组的平衡二叉堆实现,除了线程安全你完全可以把它们看成就是一样的东西。

除此之外,它们两者实现的iterator迭代器以及可拆分迭代器是不一样的,由于PriorityQueue被设计非线程安全的,所以它们的迭代器都不是像PriorityBlockingQueue那样为了保证线程安全而将原数组拷贝出来脱离原队列进行遍历,而是直接遍历的是原始队列中的数组,所以在实现iterator迭代器的remove的时候就要更小心一些,例如:

PriorityQueue的迭代器在做remove的时候,假设迭代器当前迭代的位置是n,那么remove将移除n-1位置的元素,通过上面的源码可以知道在remove的时候,会尝试将最后一个元素插入到被移除的元素的位置,先siftDown尝试在后面找个合适的位置,如果没有找到再通过siftUp向上找一个合适的位置,如果siftUp成功在上面找到一个合适的位置,那么意味着最后那个还没有被迭代器迭代的元素跑到当前迭代器已经迭代过的位置上去了,如果不记录下来,迭代器将遗漏它,因此PriorityQueue的迭代器在遇到这种情况的时候用了一个forgetMeNot队列变量来记录,当迭代器迭代完原始数组之后还会依次拿出这个队列中记录的元素。

总结

PriorityBlockingQueue是一个无界阻塞队列,但是容量是随着放入的元素空间不够来逐步扩容的,只支持放入可比较的对象即直接或间接实现了Comparator接口的元素,PriorityBlockingQueue内部使用基于数组的平衡二叉堆来存储队列元素,元素入队之后内部数组中元素顺序除了queue[0]是最小的并不保证其它节点是按优先级排好序的(所以peek拿到的元素就是当前时刻最小或者最高优先级的元素),而take、poll出队的时候直接拿走queue[0]即head节点,然后将剩下的节点中的最小的节点提升为head,所以只有通过take、poll方法或drainTo出队的元素序列才是真正按优先级排过序的,不论是iterator迭代器还是可拆分迭代器迭代的顺序都不是按优先级排序的,并且迭代器都不是与原队列的变化同步的。

PriorityBlockingQueue与PriorityQueue采用了同样的排序算法即平衡二叉堆,但是PriorityBlockingQueue是线程安全的,它并没有依赖任何PriorityQueue的方法,仅仅是为了兼容性而在队列序列以及反序列化的时候将其按照PriorityQueue来存储。PriorityQueue虽然和PriorityBlockingQueue采用了相同的算法实现,可以说是完全相同的内部机制,但是它们的迭代器是不一样的,PriorityQueue的迭代器直接迭代的原始队列数组,而PriorityBlockingQueue迭代器的数据是从原始队列数组中拷贝出来的,已经脱离了原始数据源而独立。

05-15 14:47