前言
相关系列
- 《Java ~ Collection【目录】》(持续更新)
- 《Java ~ Executor【目录】》(持续更新)
- 《Java ~ Collection/Executor ~ PriorityBlockingQueue【源码】》(学习过程/多有漏误/仅作参考/不再更新)
- 《Java ~ Collection/Executor ~ PriorityBlockingQueue【总结】》(学习总结/最新最准/持续更新)
- 《Java ~ Collection/Executor ~ PriorityBlockingQueue【问题】》(学习解答/持续更新)
涉及内容
- 《Java ~ Collection【总结】》
- 《Java ~ Collection ~ Queue【总结】》
- 《Java ~ Collection/Executor ~ BlockingQueue【总结】》
- 《Java ~ Executor【总结】》
- 《Java ~ AQS ~ ReentrantLock【总结】》
- 《Java ~ Other ~ Comparator【总结】》
- 《Java ~ Other ~ Comparable【总结】》
一 PriorityBlockingQueue(优先级阻塞队列)类源码及机制详解
类
PriorityBlockingQueue(优先级阻塞队列)类是BlockingQueue(阻塞队列)接口的实现类之一,基于数组实现。优先级阻塞队列类不是标准的FIFO队列,元素会以小顶堆的规则被排序并存放在数组的相应位置中。因此,当在实际开发中对元素的顺序有特定要求时,可以使用优先级阻塞队列类。所谓的小顶堆本质是一类特殊的完全二叉树,其规则是父元素一定小于/等于两个子元素(子元素之间不要求大小比对)。类似的,当父元素大于/等于两个子元素时,就是所谓的大顶堆。在优先级阻塞队列类中,小顶堆以数组的形式保存。之所以可以使用数组来模拟小顶堆,是因为元素索引会以以下规则分布在数组上:
优先级阻塞队列类必须定义比较器或元素必须实现比较能力。优先级阻塞队列类是具备排序能力的队列,而众所周知比较是排序的基础条件,为了实现这一点,优先级阻塞队列类设计了两种方式进行元素间的比较:一是使用比较器,即Comparator(比较器)接口对象;二是通过元素自身的比较能力,即元素类必须实现Comparable(比较能力)接口。开发者必须至少实现两者当中的一个,否则将会在使用过程中抛出类转换异常。比较器相比比较能力而言拥有更高的优先级,通俗的说就是当两种比较方式都满足的情况下会优先使用比较器进行比较。
延迟队列类是无界队列,意味着其最大容量理论上只受限于堆内存的大小。延迟队列类底层使用优先级队列类实现,由于其扩容机制的存在,延迟队列类也被纳入无界队列的范围中。但虽说如此,优先级队列类在实现中还受到数组实现与int类型影响,因此延迟队列的最大容量实际上为Integer.MAX_VALUE。由于其无界队列的定义,为了掩盖实际实现中受到的限制,当其保存的元素总数触达上限时会模拟堆内存不足的场景手动抛出内存溢出错误。
优先级阻塞队列类是真正意义上的无界队列,即容量理论上只受限于堆内存的大小。基于数组实现的原因及出于节省内存的目的,优先级阻塞队列类内部存在扩容机制,以使得元素数组的长度能够更加贴合实际所需,故而优先级阻塞队列类存在初始容量的说法,可在创建时显式指定。如果在创建优先级阻塞队列时未显式指定初始容量,则会隐式设置其为默认初始容量11。当元素总数触达优先级阻塞数组的当前容量时会触发扩容。扩容的本质是创建长度更大的新元素数组来代替旧元素数组,并将旧元素数组中的元素迁移至新元素数组。容量的具体增长规则如下:
由于在具体实现上受到int类型的物理限制,因此虽说优先级阻塞队列类是无界队列,但实际最大容量仅可扩容至Integer.MAX_VALUE - 8。减8的原因是因为有些虚拟机会在数组中保存源数据(header words),故而特意留出了这部分空间。但话虽如此,优先级阻塞队列最多依然可能保存Integer.MAX_VALUE个元素。因为优先级阻塞队列类虽然限制了通过常规单插入使得容量超过Integer.MAX_VALUE - 8的可能,但却可以通过批量插入来突破这个限制。为了兼容使用元素总数超过Integer.MAX_VALUE - 8的集进行批量插入及创建优先级阻塞队列的情况,优先级阻塞队列类是允许这么做的,但后果是可能抛出内存溢出错误。
优先级阻塞队列类不允许存null,或者说阻塞队列接口的所有实现类都不允许存null。null被poll()及peek()方法作为优先级阻塞队列不存在元素的标记值,因此所有的阻塞队列接口实现类都不允许存null。
优先级阻塞队列类是线程安全的,或者说阻塞队列接口的所有实现类都是线程安全的,其接口定义中强制要求实现类必须线程安全。优先级阻塞队列类采用“单锁”线程安全机制,即使用单个ReentrantLock(可重入锁)锁来保证整体的线程安全。但与此同时其还添加了“无锁”线程安全机制来辅助扩容,即在元素数组扩容时使用CAS乐观锁保证线程安全的同时不阻塞移除方法的执行,该知识点会在下文详述。由于CAS乐观锁并不是真正意义上的锁,因此被称为“无锁”线程安全机制。
优先级阻塞队列类的迭代器是弱一致性的,即可能迭代到已移除的元素或迭代不到新插入的元素。优先级阻塞队列类的迭代器实现非常直接(或者说归于直接了),其会直接将数据拷贝一份快照存入生成的迭代器中以进行迭代。这么做的好处是迭代器的实现非常的简单,但缺点也明显,当优先级阻塞队列的元素总数较大或生成的迭代器数量较多时对内存的消耗会非常严重。优先级阻塞队列类使用快照来实现迭代器的原因是元素会因为排序而难以追踪其位置上变化,因此使用不变的快照是最好的做法。
优先级阻塞队列类虽然与阻塞队列接口一样都被纳入Executor(执行器)框架的范畴,但同时也是Collection(集)框架的成员。
/**
* An unbounded {@linkplain BlockingQueue blocking queue} that uses the same ordering rules as class {@link PriorityQueue} and supplies
* blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing
* {@code OutOfMemoryError}). This class does not permit {@code null} elements. A priority queue relying on {@linkplain Comparable
* natural ordering} also does not permit insertion of non-comparable objects (doing so results in {@code ClassCastException}).
* 一个使用与优先级队列相同排序规则的阻塞队列,并且提供可阻塞的找回操作。尽管队列是逻辑上的无界,尝试新增也可能由于(造成内存
* 溢出错误)资源耗尽而失败。这个类不允许null元素。一个优先级队列依赖比较自然排序,并且不允许插入非比较对象(这么做可能会导致类
* 型转换异常)
* <p>
* This class and its iterator implement all of the <em>optional</em> methods of the {@link Collection} and {@link Iterator} interfaces. The
* Iterator provided in method {@link #iterator()} is <em>not</em> guaranteed to traverse the elements of the PriorityBlockingQueue in
* any particular order. If you need ordered traversal, consider using {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo}
* can be used to <em>remove</em> some or all elements in priority order and place them in another collection.
* 这个类及它的迭代器实现了集接口与迭代器接口所有的可能方法。这个迭代器由iterator()方法提供以保证按任意特定的顺序遍历优先级
* 阻塞队列的元素。如果你需要有序的遍历,考虑使用Arrays.sort(pq.toArray())(进行排序)。并且drainTo()方法会移除一些元素在其
* 它集中按优先级顺序存放它们。
* <p>
* Operations on this class make no guarantees about the ordering of elements with equal priority. If you need to enforce an ordering, you
* can define custom classes or comparators that use a secondary key to break ties in primary priority values. For example, here is a class
* that applies first-in-first-out tie-breaking to comparable elements. To use it, you would insert a {@code new FIFOEntry(anEntry)} instead
* of a plain entry object.
* 对该类的操作无法保证元素按优先级排序。如果你需要强制排序,可以定义使用辅助键来打破主优先级值的自定义类或比较器。例如,这个
* 类打破了可比较元素的FIFO。要使用它,你需要插入一个{@code new FIFOEntry(anEntry)}而不是一个普通的条目对象。(通俗的说,就是
* 重写比较进行针对性的排序)。
* <pre> {@code
* class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> {
* static final AtomicLong seq = new AtomicLong(0);
* final long seqNum;
* final E entry;
* public FIFOEntry(E entry) {
* seqNum = seq.getAndIncrement();
* this.entry = entry;
* }
* public E getEntry() { return entry; }
* // 比较
* public int compareTo(FIFOEntry<E> other) {
* // 先比较主要条件,再比较次要条件。
* int res = entry.compareTo(other.entry);
* if (res == 0 && other.entry != this.entry)
* res = (seqNum < other.seqNum ? -1 : 1);
* return res;
* }
* }}</pre>
*
* <p>
* This class is a member of the <a href="{@docRoot}/../technotes/guides/collections/index.html"> Java Collections Framework</a>.
* 这个类是Java集框架的成员
*
* @param <E> the type of elements held in this collection
* @author Doug Lea
* @Description: 优先级阻塞队列
* @since 1.5
*/
@SuppressWarnings("unchecked")
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = 5595510919245408276L;
/*
* The implementation uses an array-based binary heap, with public operations protected with a single lock. However, allocation during
* resizing uses a simple spinlock (used only while not holding main lock) in order to allow takes to operate concurrently with allocation.
* This avoids repeated postponement of waiting consumers and consequent element build-up. The need to back away from lock during
* allocation makes it impossible to simply wrap delegated PriorityQueue operations within a lock, as was done in a previous version of
* this class. To maintain interoperability, a plain PriorityQueue is still used during serialization, which maintains compatibility at the
* expense of transiently doubling overhead.
* 该实现使用了一个基于数组的二进制堆,公共操作受保护于一个单锁。但是,在调整大小期间,分配(即在容量满了之后重新分配一个
* 新的数组,准确的说应该称为扩容更加合适)使用一个简单的自旋锁(只在未持有主锁的情况下使用),为了允许拿取与分配并发地操
* 作。这避免了等待中的消费者重复延迟(如果使用单锁,在扩容期间消费者将无法消费造成拿取者堆积)和随之而来的元素积聚。
*/
...
}
字段
DEFAULT_INITIAL_CAPACITY (默认初始容量) —— 默认初始容量
/**
* Default array capacity.
* 默认数组容量
*
* @Description: 默认初始容量
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
MAX_ARRAY_SIZE(最大数组容量) —— 分配数组的最大大小。
/**
* The maximum size of array to allocate. Some VMs reserve some header words in an array. Attempts to allocate larger arrays may result
* in OutOfMemoryError: Requested array size exceeds VM limit
* 分配数组的最大大小。一些虚拟机会在数组中保留一些头字(所以专门预留了8个空位来进行保存,数组的最大可用空间就是int的最大值
* -8)。试图分配更大的数组可能导致内存溢出错误:请求数组大小超过虚拟机限制。
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
queue(队列) —— 真正用于保存元素的元素数组。
/**
* Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The priority
* queue is ordered by comparator, or by the elements' natural ordering, if comparator is null: For each node n in the heap and each
* descendant d of n, n <= d. The element with the lowest value is in queue[0], assuming the queue is nonempty.
* 优先级队列表示为一个平衡二进制堆:元素queue[n]的两个孩子是元素queue[2*n+1]和元素queue[2*(n+1)](这是一种通过数组来模拟完全
* 二叉树的做法。数组n位置表示的节点的两个子节点恰好是2*n+1和2*(n+1))。优先级队列通过比较排序,或通过元素自然排序。如果比较器
* 为null:为每个堆中的节点n和每个n的子孙d。最小的元素值在queue[0],假设队列不为空(即默认是小顶堆的实现,即父节点比子节点小,
* 因此根节点queue[0]是最小的;而大顶堆是父节点比子节点大,因此根节点queue[0]是最大的,通过修改比较器,是可以实现小顶堆到大顶
* 堆之间的转换的)。
*
* @Description: 队列:真正用于保存元素的元素数组。
*/
private transient Object[] queue;
size(大小) —— 元素的数量。
/**
* The number of elements in the priority queue.
* 优先级队列中元素的数量。
*
* @Description: 大小:元素的数量。
*/
private transient int size;
comparator(比较器) —— 用于进行比较排序,如果是元素的自然排序则为null。
/**
* The comparator, or null if priority queue uses elements' natural ordering.
* 比较器,如果优先级队列使用元素的自然排序则为null。
*
* @Description: 比较器:用于进行比较排序,如果是元素的自然排序则为null。
*/
private transient Comparator<? super E> comparator;
lock(锁) —— 用于保护所有的公共操作。
/**
* Lock used for all public operations
* 所有公共操作使用的锁。
*
* @Description: 锁:用于保护所有的公共操作。
*/
private final ReentrantLock lock;
notEmpty(非空) —— 当不存在元素时阻塞执行操作的拿取者。
/**
* Condition for blocking when empty
* 当为空时阻塞的条件
*
* @Description: 非空:当不存在元素时阻塞执行操作的拿取者。
*/
private final Condition notEmpty;
allocationSpinLock(分配自旋锁) —— 当进行扩容时保证扩容与移除/拿取可以同时执行。
/**
* Spinlock for allocation, acquired via CAS.
* 分配的自旋锁,通过CAS操作获取。
*
* @Description: 分配自旋锁:当进行扩容时保证扩容与移除/拿取可以同时执行。
*/
private transient volatile int allocationSpinLock;
q(优先级队列) —— 用于在序列化/反序列化时保证当前版本与之前版本的共存,因此也只在序列化/反序列化期间有值。
/**
* A plain PriorityQueue used only for serialization, to maintain compatibility with previous versions of this class. Non-null only during
* serialization/deserialization.
* 一个只在序列化时使用的普通的优先级队列,来维持类先前版本的共存。只在序列化和反序列化是不为空。
*
* @Description: 优先级队列:用于在序列化/反序列化时保证当前版本与之前版本的共存,因此也只在序列化/反序列化期间有值。
*/
private PriorityQueue<E> q;
构造方法
public PriorityBlockingQueue() —— 创建一个默认初始容量的优先级阻塞队列,按元素自身的比较器自然排序。
/**
* Creates a {@code PriorityBlockingQueue} with the default initial capacity (11) that orders its elements according to their {@linkplain
* Comparable natural ordering}.
* 创建一个默认初始容量11的优先级阻塞队列,其元素根据它们(元素)的比较器进行自然排序。
*
* @Description: 创建一个默认初始容量的优先级阻塞队列,按元素自身的比较器自然排序。
*/
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) —— 创建一个指定初始容量的优先级阻塞队列,按元素自身的比较器自然排序。
/**
* Creates a {@code PriorityBlockingQueue} with the specified initial capacity that orders its elements according to their {@linkplain Comparable
* natural ordering}.
* 创建一个指定初始容量的优先级阻塞队列,,其元素根据它们(元素)的比较器进行自然排序。
*
* @param initialCapacity the initial capacity for this priority queue 优先级队列的初始容量
* @throws IllegalArgumentException if {@code initialCapacity} is less than 1
* 非法参数异常:如果初始容量小于1
* @Description: 创建一个指定初始容量的优先级阻塞队列,按元素自身的比较器自然排序。
*/
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) —— 创建一个指定初始容量的优先级阻塞队列,其元素根据指定的比较器进行排序。
/**
* Creates a {@code PriorityBlockingQueue} with the specified initial capacity that orders its elements according to the specified comparator.
* 创建一个指定初始容量的优先级阻塞队列,,其元素根据指定的比较器进行排序。
*
* @param initialCapacity the initial capacity for this priority queue 优先级队列的初始容量
* @param comparator the comparator that will be used to order this priority queue. If {@code null}, the {@linkplain Comparable natural
* ordering} of the elements will be used.
* 该比较器将用于排序这个优先级队列。如果为null,则使用元素的比较能力进行排序
* @throws IllegalArgumentException if {@code initialCapacity} is less than 1
* 非法参数异常:如果初始容量小于1
*/
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
// 实例化锁及条件,用于保护和线程阻塞。
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
// 持有比较器与实例化元素数组。
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
public PriorityBlockingQueue(Collection<? extends E> c) —— 创建一个包含指定集中元素的优先级阻塞队列。如果指定集是一个排序集合或者优先级队列,该优先级队列将按相同的顺序进行排序。否则该优先级队列将根据元素的比较能力进行排序(因此在元素在优先级阻塞队列中的排序可能与指定集中的不同)。
/**
* Creates a {@code PriorityBlockingQueue} containing the elements in the specified collection. If the specified collection is a {@link SortedSet}
* or a {@link PriorityQueue}, this priority queue will be ordered according to the same ordering. Otherwise, this priority queue will be ordered
* according to the {@linkplain Comparable natural ordering} of its elements.
* 创建一个包含指定集中元素的优先级阻塞队列。如果指定集是一个排序集合或者优先级队列,该优先级队列将按相同的顺序进行排序。否则该
* 优先级队列将根据元素的比较能力进行排序(因此在元素在优先级阻塞队列中的排序可能与指定集中的不同)。
*
* @param c the collection whose elements are to be placed into this priority queue 集中的元素将存放与优先级队列
* @throws ClassCastException if elements of the specified collection cannot be compared to one another according to the priority queue's
* ordering
* 如果指定集的元素无法与根据优先级队列的顺序进行比较(如果有某个元素没有实现比较能力接口就会出现)
* @throws NullPointerException if the specified collection or any of its elements are null
* 空指针异常:如果指定集或集中的任意元素为null
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
// 实例化所与条件。
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
// true if not known to be in heap order
// 如果不知道堆顺序,则为true
boolean heapify = true;
// true if must screen for nulls
// 如果必须筛选空值,则为true
boolean screen = true;
if (c instanceof SortedSet<?>) {
// 如果指定集是一个排序集合,则将排序集合的比较器作为优先级阻塞队列的比较器。
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
} else if (c instanceof PriorityBlockingQueue<?>) {
// 如果指定集是一个优先级阻塞队列,则将其比较器作为优先级阻塞队列的比较器。
PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
// exact match
// 精确匹配(instanceof只能判断传入的指定集是优先级阻塞队列的子类(包含本身),而只有通过类型的具体判断才能判断是否真的是优
// 先级阻塞队列。如果指定集是优先级阻塞队列的子类,则单纯获取到比较器是无法判断具体的排序的,因为谁也不知道子类的实现是怎样,
// 因此只有判断指定集就是优先级阻塞队列才能确定这一点)。
if (pq.getClass() == PriorityBlockingQueue.class)
heapify = false;
}
// 获取指定集的元素数组拷贝。
Object[] a = c.toArray();
int n = a.length;
// 如果指定集的类型不是数组列表,则返回的元素数组拷贝可能因为协变的原因不是Object,因此需要再进行一次拷贝确保没有问题。
if (c.getClass() != ArrayList.class)
a = Arrays.copyOf(a, n, Object[].class);
// 如果需要筛选null值,并且元素总数为1或者比较为不为null,则直接进行null值的扫描。screen为true,说明指定集中可能包含null元素,需要
// 进行筛选。如果指定集中只有一个元素,则该元素很有可能因为无需进行比较而遗漏了空检查,从而使的一个null元素加入了优先级阻塞队列。
// 如果比较器不为null,则由于比较其本身是可能允许null元素的比较的,因此也需要进行筛选。
if (screen && (n == 1 || this.comparator != null)) {
// 判断是否有元素为null。
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
// 将赋值元素数组和大小。
this.queue = a;
this.size = n;
// 将数据进行堆化,本质就是进行排序。该操作是针对那些没有强制性排序需求的集的。
if (heapify)
heapify();
}
方法
private void tryGrow(Object[] array, int oldCap) —— 尝试成长 —— 对元素数组进行扩容。扩容可以分为三步:分配新元素数组、赋值新元素数组及数据迁移。其中,新元素数组会在乐观锁的保护下进行,以避免阻塞正常的移除/拿取和插入/放置。而赋值新元素数组及数据迁移这两部则必须在主锁(悲观锁)的的保护下进行。
/**
* Tries to grow array to accommodate at least one more element (but normally expand by about 50%), giving up (allowing retry) on contention
* (which we expect to be rare). Call only while holding lock.
* 尝试成长数组以容纳最少一个更多元素(但是通常会拓展50%),竞争时放弃(允许重试)(我们认为这种(竞争)情况很少见) 。只在持有
* 锁的情况下调用。
*
* @param array the heap array 堆数组
* @param oldCap the length of the array 数组的长度
* @Description: 尝试成长:对元素数组进行扩容。扩容可以分为三步:分配新元素数组、赋值新元素数组及数据迁移。其中,新元素数组会在乐
* @Description: 观锁的保护下进行,以避免阻塞正常的移除/拿取和插入/放置。而赋值新元素数组及数据迁移这两部则必须在主锁(悲观锁)的
* @Description: 的保护下进行。
*/
private void tryGrow(Object[] array, int oldCap) {
// must release and then re-acquire main lock
// 必须释放并重新获取主锁(在扩容结束后)
// 解锁
lock.unlock();
Object[] newArray = null;
// 加乐观锁,即通过CAS操作将allocationSpinLock从0修改1。如果成功则分配新元素数组,否则说明有其它线程在分配新元素数组,直接跳过
// 这个流程。
if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
try {
// grow faster if small
// 如果小就快速成长(如果元素数组的大小小于64,则直接在原来的基础上新增一倍,否则新增0.5倍)
int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));
// possible overflow
// 可能的溢出
// 如果新的容量 - 最大数组大小 > 0,说明新容量已经超过了最大数组大小,甚至有可能已经超过了int类型的最大值,在这种情况下,要判
// 断旧容量能否继续增大,即旧容量是否小于最大数组大小。如果是则直接增大至最大数组大小;否则直接抛出内存溢出异常。
if (newCap - MAX_ARRAY_SIZE > 0) {
int minCap = oldCap + 1;
// minCap < 0,说明旧容量本身就已经达到了int类型的最大值。这确实是可能的,虽然一般不会出现这种情况。但如果在创建优先级阻塞
// 队列的时候使用的是集构造方法,并且参数集中包含的元素数量等于int类型的最大值,那确实能产生这种情况。因此,对于旧容量能否
// 继续增大需要分两种情况进行判断:一是旧容量 <= 最大数组大小;二是旧容量 > 最大数组大小。
// 对于第一种情况,如果旧容量已经达到了最大数组大小,则就不允许在增大了,要抛出内存溢出异常;否则增大至最大数组大小。
// 对于第二种情况,由于旧容量本身已经大于最大数组大小,因此不允许在增大了,抛出内存溢出异常。
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// newCap > oldCap:意味着容量真正增大了;
// queue == array:说明在分配新元素数组期间没有其它线程完成扩容。
// 满足上述两个条件的情况下,实例化新的元素数组。
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
// back off if another thread is allocating
// 如果其它线程在分配则后退。
// 如果当前线程没有获得分配新元素数组的权限(即获取乐观锁失败);或者因为旧容量成长后超过了int类型的最大值;又或者是在此期间已经
// 有线程完成了扩容。总之,只要分配新元素数组失败,则放弃当前线程持有的CPU资源。目的是为了让完成了新元素数组分配的线程可以优先
// 执行下列流程。
if (newArray == null)
Thread.yield();
// 将锁重新加起。在分配元素数组期间,由于并不涉及原元素数组的替换与数据的迁移,因此理论上不应该阻塞拿取者/放置者的并发,也因此,
// 分配新元素数组这个操作会在一个乐观锁的保护下进行,并不会在主锁的保护下进行,因此新元素数组的分配并不会阻塞移除/拿取操作。
lock.lock();
// 如果新元素数组分配成功,并且在此期间没有其它线程完成扩容(分配新元素数组 + 赋值新元素数组 + 旧数据迁移),则将新分配的元素数组
// 设置为元素数组,并完成数据的迁移。
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
private E dequeue() —— 出队 —— 移除/拿取元素数组中的第一个元素,也是排序最小的元素。元素移除后,会取元素数组的最后一个元素进行首位填充,并进行下排序,以保证整个元素排序及分布的正确性。
/**
* Mechanics for poll(). Call only while holding lock.
* poll()方法机制。只在持有锁的情况下调用。
*
* @Description: 出队:移除/拿取元素数组中的第一个元素,也是排序最小的元素。元素移除后,会取元素数组的最后一个元素进行首位填充,并进
* @Description: 行下排序,以保证整个元素排序及分布的正确性。
*/
private E dequeue() {
int n = size - 1;
if (n < 0)
// 如果元素数组为空,则直接返回null。
return null;
else {
// 获取元素数组中的首个元素,用于返回。
Object[] array = queue;
E result = (E) array[0];
// 获取元素数组的最后一个元素,用于填充在首个索引位置,以被进行下排序。
E x = (E) array[n];
array[n] = null;
// 获取比较器,判断是通过比较器进行比较还是通过元素本身的比较能力进行比较。
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 通过元素本身的比较能力进行下排序。
siftDownComparable(0, x, array, n);
else
// 通过比较器进行下排序。
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
private static void siftUpComparable(int k, T x, Object[] array) —— 筛选上可比较的 —— 本质是循环性的上排序。所谓上排序就是判断指定索引位置的元素是否大于父元素,如果不是,则将之与父元素进行交换。然后将原父元素的索引位置作为新的指定索引位置重复这个过程。如果指定元素比两父元素要大,或者指定索引位置根节点,说明已经排序完毕,结束这个循环性的上排序。
/**
* Inserts item x at position k, maintaining heap invariant by promoting x up the tree until it is greater than or equal to its parent, or is the root.
* 在k位置插入一个项x,通过在树中对x进行提升直至其大于它的父节点或称为根节点来维持堆的不变性。
* <p>
* To simplify and speed up coercions and comparisons. the Comparable and Comparator versions are separated into different methods that are
* otherwise identical. (Similarly for siftDown.) These methods are static, with heap state as arguments, to simplify use in light of possible
* comparator exceptions.
* 简化和加速强制和比较。比较能力和比较器版本分离成不同的方法,其它完全一样。(类似于下排序。)这些方式是静态的,堆被声明为参数,以
* 便在可能出现比较器异常时简化使用(啥意思?)。
*
* @param k the position to fill 填充的位置
* @param x the item to insert 查询的项
* @param array the heap array 堆数组
* @Description: 筛选上可比较的:本质是循环性的上排序。所谓上排序就是判断指定索引位置的元素是否大于父元素,如果不是,则将之与父元素
* @Description: 进行交换。然后将原父元素的索引位置作为新的指定索引位置重复这个过程。如果指定元素比两父元素要大,或者指定索引位置根
* @Description: 节点,说明已经排序完毕,结束这个循环性的上排序。
*/
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
private static void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) —— 筛选上使用比较器 —— 该方法的逻辑与 siftUpComparable(int k, T x, Object[] array)完全相同,只是元素之间的比较变为了采用比较器的方式完成,而不是元素自身的比较能力。
/**
* @Description: 筛选上使用比较器:该方法的逻辑与 siftUpComparable(int k, T x, Object[] array)完全相同,只是元素之间的比较变为了采用
* @Description: 比较器的方式完成,而不是元素自身的比较能力。
*/
private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
private static void siftDownComparable(int k, T x, Object[] array, int n) —— 筛选下可比较的 —— 本质是循环性的下排序。所谓下排序就是判断指定索引位置的元素是否小于两个孩子元素,如果不是,则将之与两个孩子元素之中较小的一个进行交换。然后将较小孩子节点的索引位置作为新的指定索引位置重复这个过程。如果指定元素比两孩子元素都要小,或者指定索引位置是一个叶子节点,说明已经排序完毕,结束这个循环性的下排序。
/**
* Inserts item x at position k, maintaining heap invariant by demoting x down the tree repeatedly until it is less than or equal to its children or is
* a leaf.
* 在k的位置插入项x,通过重复地将x从树中降级直至其小于或等于它的孩子或是一个叶子(节点)。
*
* @param k the position to fill 填充的位置
* @param x the item to insert 插入的项
* @param array the heap array 堆数组
* @param n heap size 堆大小
* @Description: 筛选下可比较的:本质是循环性的下排序。所谓下排序就是判断指定索引位置的元素是否小于两个孩子元素,如果不是,则将之与
* @Description: 两个孩子元素之中较小的一个进行交换。然后将较小孩子节点的索引位置作为新的指定索引位置重复这个过程。如果指定元素比两
* @Description: 孩子元素都要小,或者指定索引位置是一个叶子节点,说明已经排序完毕,结束这个循环性的下排序。
*/
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>) x;
int half = n >>> 1;
// loop while a non-leaf
// 只要不是叶子节点就继续循环
while (k < half) {
// assume left child is least
// 假设左孩子是最小的
// 获取左孩子的索引值,不用担心孩子的索引会超出范围,方法在调用前只会选择元素数组的前半段元素。
int child = (k << 1) + 1;
Object c = array[child];
// 获取右孩子的索引值。
int right = child + 1;
// 比较左右两个孩子的大小,将更小的用于和指定元素进行对比。
if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// 将两个孩子中更小的那个与指定元素进行比较,如果指定元素更小,说明已无法进行下排序,排序结束;否则继续循环进行下排序,直
// 至排序到叶子节点或者指定元素比两个孩子节点都小。
if (key.compareTo((T) c) <= 0)
break;
// 将更小的那个孩子元素存放于指定的位置,随后将原孩子元素的索引位置作为下次下排序的指定位置,该操作就相当于进行了一次下排序。
array[k] = c;
k = child;
}
array[k] = key;
}
}
private static void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) —— 筛选下使用比较器 —— 该方法的逻辑与siftDownComparable(int k, T x, Object[] array, int n)完全相同,只是元素之间的比较变为了采用比较器的方式完成,而不是元素自身的比较能力。
/**
* @Description: 筛选下使用比较器:该方法的逻辑与siftDownComparable(int k, T x, Object[] array, int n)完全相同,只是元素之间的比较变为了
* @Description: 采用比较器的方式完成,而不是元素自身的比较能力。
*/
private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
// 不用担心孩子的索引会超出范围,方法在调用前只会选择元素数组的前半段元素进行降级。
Object c = array[child];
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
if (cmp.compare(x, (T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}
private void heapify() —— 堆化 —— 即按照比较器或元素的比较能力进行顶堆排序。
/**
* Establishes the heap invariant (described above) in the entire tree, assuming nothing about the order of the elements prior to the call.
* 在整个树中建立堆不变量(如上所述),在调用之前不假设元素的顺序(即该方法调用之前无需判断元素数组本身的顺序)。
*
* @Description: 堆化:即按照比较器或元素的比较能力进行顶堆排序。
*/
private void heapify() {
Object[] array = queue;
int n = size;
// 获取元素总数的中间值,因为对整个元素数组进行堆化排序只需要排序前一半元素即可,后一半会在前一半的排序中自动完成。
int half = (n >>> 1) - 1;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
for (int i = half; i >= 0; i--)
// 如果没有比较器,则通过元素自身的比较能力进行堆化排序。
siftDownComparable(i, (E) array[i], array, n);
} else {
// 如果存在比较器,则通过比较器比较进行堆化排序。
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}
public boolean add(E e) —— 新增 —— 向向优先级阻塞队列的队尾添加指定元素,优先级阻塞队列会根据排序规则通过上排序将指定元素排序到队列中合适的位置。该方法是插入/放置方法中"异常"形式的实现,当优先级阻塞队列存在空余容量时插入/放置成功并返回true;否则会抛出非法状态异常。虽说定义如此,但实际由于优先级阻塞队列作为无界队列的实现与链接阻塞队列不同,是动态扩容的方式,因此最大容量理论上只受限于堆内存的大小,故而不会抛出常规的非法状态异常,而只会在堆内存不足时抛出内存溢出异常。在真正的实现中,由于size字段的存在,因此容量实际上还受int类型的限制。为了与理论达成一致(容量只受限于堆内存的大小),当元素总数大于等于指定的数量上限时,下次扩容会直接抛出内存溢出错误,而不是正常的非法状态异常,以造成堆内存不足的假象。
/**
* Inserts the specified element into this priority queue.
* 新增指定元素至优先级队列
*
* @param e the element to add 新增的元素
* @return {@code true} (as specified by {@link Collection#add}) true
* @throws ClassCastException if the specified element cannot be compared with elements currently in the priority queue according to the
* priority queue's ordering
* 类转换异常:如果指定元素无法在优先级队列中根据优先级队列的排序与元素正确的比较(一般是因为没有实现比较能力接口)。
* @throws NullPointerException if the specified element is null
* 空指针异常:如果指定元素为null
* @Description: 新增:向优先级阻塞队列的队尾添加指定元素,优先级阻塞队列会根据排序规则通过上排序将指定元素排序到队列中合适的位置。
* @Description: 该方法是插入/放置方法中"异常"形式的实现,当优先级阻塞队列存在空余容量时插入/放置成功并返回true;否则会抛出非法状态
* @Description: 异常。虽说定义如此,但实际由于优先级阻塞队列作为无界队列的实现与链接阻塞队列不同,是动态扩容的方式,因此最大容量
* @Description: 理论上只受限于堆内存的大小,故而不会抛出常规的非法状态异常,而只会在堆内存不足时抛出内存溢出异常。
* @Description: 在真正的实现中,由于size字段的存在,因此容量实际上还受int类型的限制。为了与理论达成一致(容量只受限于堆内存的大小),
* @Description: 当元素总数大于等于指定的数量上限时,下次扩容会直接抛出内存溢出错误,而不是正常的非法状态异常,以造成堆内存不足的
* @Description: 假象。
*/
public boolean add(E e) {
// 由于优先级阻塞队列是一个无界队列,因此理论上不会出现容量不足的情况,因此该方法不会因为容量不足而出现异常...
// 事实上,虽然都是无界队列,优先级阻塞队列的实现与链接阻塞队列显然是完全不同的。链接阻塞队列的无界用法是有真正意义上的容量的,
// 因此方法运行都与有界队列无异。但是优先级阻塞队列则是可扩容的实现,理论上可以无限增大,唯一的限制是堆内存的大小。但在实际实
// 现中显然不可能,因为它还受大小字段int类型的限制。因此,当优先级阻塞队列保存的元素总数达到上限时,需要给开发者营造出是因为堆
// 内存的限制而无法继续存放的假象,故而当元素总数达到上限,在下一次扩容时会直接抛出内存溢出错误,而不是正常的非法状态异常。
return offer(e);
}
public boolean offer(E e) —— 提供 —— 向优先级阻塞队列的队尾添加指定元素,优先级阻塞队列会根据排序规则通过上排序将指定元素排序到队列中合适的位置。该方法是插入/放置方法中"特殊值"形式的实现,当优先级阻塞队列存在空余容量时插入/放置成功并返回true;否则失败返回false。虽说定义如此,但实际由于优先级阻塞队列作为无界队列的实现与链接阻塞队列不同,是动态扩容的方式,因此最大容量理论上只受限于堆内存的大小,故而永远不会返回false,而只会在堆内存不足时抛出内存溢出异常。在真正的实现中,由于size字段的存在,因此容量实际上还受int类型的限制。为了与理论达成一致(容量只受限于堆内存的大小),当元素总数大于等于指定的数量上限时,下次扩容会直接抛出内存溢出错误,而不是正常的返回false,以造成堆内存不足的假象。
/**
* Inserts the specified element into this priority queue. As the queue is unbounded, this method will never return {@code false}.
* 新增指定元素至优先级队列。由于队列时无界的,该方法永远不会返回false。
*
* @param e the element to add 新增的元素
* @return {@code true} (as specified by {@link Collection#add}) true
* @throws ClassCastException if the specified element cannot be compared with elements currently in the priority queue according to the
* priority queue's ordering
* 类转换异常:如果指定元素无法在优先级队列中根据优先级队列的排序与元素正确的比较(一般是因为没有实现比较能力接口)。
* @throws NullPointerException if the specified element is null
* 空指针异常:如果指定元素为null
* @Description: 提供:向优先级阻塞队列的队尾添加指定元素,优先级阻塞队列会根据排序规则通过上排序将指定元素排序到队列中合适的位置。
* @Description: 该方法是插入/放置方法中"特殊值"形式的实现,当优先级阻塞队列存在空余容量时插入/放置成功并返回true;否则失败返回false。
* @Description: 虽说定义如此,但实际由于优先级阻塞队列作为无界队列的实现与链接阻塞队列不同,是动态扩容的方式,因此最大容量理论上
* @Description: 只受限于堆内存的大小,故而永远不会返回false,而只会在堆内存不足时抛出内存溢出异常。
* @Description: 在真正的实现中,由于size字段的存在,因此容量实际上还受int类型的限制。为了与理论达成一致(容量只受限于堆内存的大小),
* @Description: 当元素总数大于等于指定的数量上限时,下次扩容会直接抛出内存溢出错误,而不是正常的返回false,以造成堆内存不足的假象。
*/
public boolean offer(E e) {
// 元素空判断。
if (e == null)
throw new NullPointerException();
// 加锁。
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 循环性质的进行扩容,直至容量变大或抛出内存溢出错误。
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
// 将新的元素插入数组的末尾,并通过比较能力或比较器进行元素的上排序,维持整个数组的排序。
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
// 递增大小,并唤醒可能因为没有元素而挂起的放置者。
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public void put(E e)—— 新增 —— 向优先级阻塞队列的队尾添加指定元素,优先级阻塞队列会根据排序规则通过上排序将指定元素排序到队列中合适的位置。该方法是插入/放置方法中"阻塞"形式的实现,当优先级阻塞队列存在空余容量时插入/放置成功并返回true;否则阻塞至有空余容量。虽说定义如此,但实际由于优先级阻塞队列作为无界队列的实现与链接阻塞队列不同,是动态扩容的方式,因此最大容量理论上只受限于堆内存的大小,故而永远阻塞,而只会在堆内存不足时抛出内存溢出异常。在真正的实现中,由于size字段的存在,因此容量实际上还受int类型的限制。为了与理论达成一致(容量只受限于堆内存的大小),当元素总数大于等于指定的数量上限时,下次扩容会直接抛出内存溢出错误,而不是正常的阻塞,以造成堆内存不足的假象。
/**
* Inserts the specified element into this priority queue. As the queue is unbounded, this method will never block.
* 插入指定元素至优先级队列。由于队列是无界的,该方法永远不会阻塞。
*
* @param e the element to add 新增的元素
* @throws ClassCastException if the specified element cannot be compared with elements currently in the priority queue according to the
* priority queue's ordering
* 类转换异常:如果指定元素无法在优先级队列中根据优先级队列的排序与元素正确的比较(一般是因为没有实现比较能力接口)。
* @throws NullPointerException if the specified element is null
* 空指针异常:如果指定元素为null
* @Description: 放置:向优先级阻塞队列的队尾添加指定元素,优先级阻塞队列会根据排序规则通过上排序将指定元素排序到队列中合适的位置。
* @Description: 该方法是插入/放置方法中"阻塞"形式的实现,当优先级阻塞队列存在空余容量时插入/放置成功并返回true;否则阻塞至有空余
* @Description: 容量。虽说定义如此,但实际由于优先级阻塞队列作为无界队列的实现与链接阻塞队列不同,是动态扩容的方式,因此最大容量
* @Description: 理论上只受限于堆内存的大小,故而永远阻塞,而只会在堆内存不足时抛出内存溢出异常。
* @Description: 在真正的实现中,由于size字段的存在,因此容量实际上还受int类型的限制。为了与理论达成一致(容量只受限于堆内存的大小),
* @Description: 当元素总数大于等于指定的数量上限时,下次扩容会直接抛出内存溢出错误,而不是正常的阻塞,以造成堆内存不足的假象。
*/
public void put(E e) {
// never need to block
// 永远无需阻塞
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) —— 提供 —— 向优先级阻塞队列的队尾添加指定元素,优先级阻塞队列会根据排序规则通过上排序将指定元素排序到队列中合适的位置。该方法是插入/放置方法中"超时"形式的实现,当优先级阻塞队列存在空余容量时插入/放置成功并返回true;否则在指定的等待时间内阻塞至有空余容量,超出等待时间则返回false。虽说定义如此,但实际由于优先级阻塞队列作为无界队列的实现与链接阻塞队列不同,是动态扩容的方式,因此最大容量理论上只受限于堆内存的大小,故而永远不会阻塞或返回false,而只会在堆内存不足时抛出内存溢出异常。在真正的实现中,由于size字段的存在,因此容量实际上还受int类型的限制。为了与理论达成一致(容量只受限于堆内存的大小),当元素总数大于等于指定的数量上限时,下次扩容会直接抛出内存溢出错误,而不是正常的阻塞或返回false,以造成堆内存不足的假象。
/**
* Inserts the specified element into this priority queue. As the queue is unbounded, this method will never block or return {@code false}.
* 插入指定元素至优先级队列。由于队列是无界的,该方法永远不会阻塞或返回false。
*
* @param e the element to add 新增的元素
* @param timeout This parameter is ignored as the method never blocks
* @param unit This parameter is ignored as the method never blocks
* @return {@code true} (as specified by {@link BlockingQueue#offer(Object, long, TimeUnit) BlockingQueue.offer}) true
* @throws ClassCastException if the specified element cannot be compared with elements currently in the priority queue according to the
* priority queue's ordering
* 类转换异常:如果指定元素无法在优先级队列中根据优先级队列的排序与元素正确的比较(一般是因为没有实现比较能力接口)。
* @throws NullPointerException if the specified element is null
* 空指针异常:如果指定元素为null
* @Description: 提供:向优先级阻塞队列的队尾添加指定元素,优先级阻塞队列会根据排序规则通过上排序将指定元素排序到队列中合适的位置。
* @Description: 该方法是插入/放置方法中"超时"形式的实现,当优先级阻塞队列存在空余容量时插入/放置成功并返回true;否则在指定的等待
* @Description: 时间内阻塞至有空余容量,超出等待时间则返回false。虽说定义如此,但实际由于优先级阻塞队列作为无界队列的实现与链接阻
* @Description: 塞队列不同,是动态扩容的方式,因此最大容量理论上只受限于堆内存的大小,故而永远不会阻塞或返回false,而只会在堆内存
* @Description: 不足时抛出内存溢出异常。
* @Description: 在真正的实现中,由于size字段的存在,因此容量实际上还受int类型的限制。为了与理论达成一致(容量只受限于堆内存的大小),
* @Description: 当元素总数大于等于指定的数量上限时,下次扩容会直接抛出内存溢出错误,而不是正常的阻塞或返回false,以造成堆内存不足
* @Description: 的假象。
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
// never need to block
return offer(e);
}
public E poll() —— 轮询 —— 从优先级阻塞队列的队头移除元素,后将队尾元素将之填充,并通过下排序维护优先级阻塞队列中的元素顺序。该方法是移除/拿取方法中"特殊值"形式的实现,当优先级阻塞队列存在元素时移除/拿取成功并返回元素;否则返回null。
/**
* @Description: 轮询:从优先级阻塞队列的队头移除元素,后将队尾元素将之填充,并通过下排序维护优先级阻塞队列中的元素顺序。该方法是
* @Description: 移除/拿取方法中"特殊值"形式的实现,当优先级阻塞队列存在元素时移除/拿取成功并返回元素;否则返回null。
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException —— 拿取 —— 从优先级阻塞队列的队头移除元素,后将队尾元素将之填充,并通过下排序维护优先级阻塞队列中的元素顺序。该方法是移除/拿取方法中"阻塞"形式的实现,当优先级阻塞队列存在元素时移除/拿取成功并返回元素;否则阻塞至有元素。
/**
* @Description: 拿取:从优先级阻塞队列的队头移除元素,后将队尾元素将之填充,并通过下排序维护优先级阻塞队列中的元素顺序。该方法是
* @Description: 移除/拿取方法中"阻塞"形式的实现,当优先级阻塞队列存在元素时移除/拿取成功并返回元素;否则阻塞至有元素。
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 当优先级阻塞队列中不存在元素时,将拿取者进行挂起,直至被放置者唤醒。
while ((result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException —— 轮询 —— 从优先级阻塞队列的队头移除元素,后将队尾元素将之填充,并通过下排序维护优先级阻塞队列中的元素顺序。该方法是移除/拿取方法中"超时"形式的实现,当优先级阻塞队列存在元素时移除/拿取成功并返回元素;否则阻塞指定的等待时间至有元素,超出等待时间则返回null。
/**
* @Description: 轮询:从优先级阻塞队列的队头移除元素,后将队尾元素将之填充,并通过下排序维护优先级阻塞队列中的元素顺序。该方法是
* @Description: 移除/拿取方法中"超时"形式的实现,当优先级阻塞队列存在元素时移除/拿取成功并返回元素;否则阻塞指定的等待时间至有元
* @Description: 素,超出等待时间则返回null。
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 当元素中不存在元素时将拿取者挂起,直至被唤醒或超时。
while ((result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
public E peek() —— 窥视 —— 获取优先级阻塞队列的队头获取元素。该方法是检查方法中"特殊值"形式的实现,当优先级阻塞队列存在元素时返回元素;否则返回null。
/**
* @Description: 窥视:获取优先级阻塞队列的队头获取元素。该方法是检查方法中"特殊值"形式的实现,当优先级阻塞队列存在元素时返回元素;
* @Description: 否则返回null。
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
public int size() —— 大小 —— 获取元素总数。
/**
* @Description: 大小:获取元素总数。
*/
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
public int remainingCapacity() —— 剩余容量 —— 获取剩余容量。由于优先级阻塞队列是无界队列,因此会永远返回Integer.MAX_VALUE。
由于延迟队列是无界队列,容量理论上只受限于堆内存的大小(实际上还有int类型的限制),因此永远返回Integer.MAX_VALUE。
/**
* Always returns {@code Integer.MAX_VALUE} because a {@code PriorityBlockingQueue} is not capacity constrained.
* 永远返回Integer.MAX_VALUE因为优先级阻塞队列不受容量限制。
*
* @return {@code Integer.MAX_VALUE} always 永远Integer.MAX_VALUE
* @Description: 剩余容量:获取剩余容量。由于优先级阻塞队列是无界队列,因此会永远返回Integer.MAX_VALUE。
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
public int drainTo(Collection<? super E> c)—— 流失 —— 将优先级阻塞队列中所有的元素都迁移到指定集中。
/**
* @throws UnsupportedOperationException {@inheritDoc} 不支持操作异常
* @throws ClassCastException {@inheritDoc} 类转化异常
* @throws NullPointerException {@inheritDoc} 空指针异常
* @throws IllegalArgumentException {@inheritDoc} 非法参数异常
* @Description: 流失:将优先级阻塞队列中所有的元素都迁移到指定集中。
*/
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) —— 流失 —— 将优先级阻塞队列中最多指定数量的元素都迁移到指定集中。
/**
* @throws UnsupportedOperationException {@inheritDoc} 不支持操作异常
* @throws ClassCastException {@inheritDoc} 类转化异常
* @throws NullPointerException {@inheritDoc} 空指针异常
* @throws IllegalArgumentException {@inheritDoc} 非法参数异常
* @Description: 流失:将优先级阻塞队列中最多指定数量的元素都迁移到指定集中。
*/
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 确定要迁移的元素数量。
int n = Math.min(size, maxElements);
// 将允许按优先级阻塞队列的出队顺序依次加入指定集中,每次移除一个元素都要通过下排序重新对优先级阻塞队列进行排序。
for (int i = 0; i < n; i++) {
// In this order, in case add() throws.
// 按当前顺序,
c.add((E) queue[0]);
dequeue();
}
return n;
} finally {
lock.unlock();
}
}
public Comparator<? super E> comparator() —— 比较器 —— 获取用于排序元素的比较器。
/**
* Returns the comparator used to order the elements in this queue, or {@code null} if this queue uses the {@linkplain Comparable natural ordering}
* of its elements.
* 返回在队列中用于排序元素的比较器,如果队列使用它元素的比较能力则返回null
*
* @return the comparator used to order the elements in this queue, or {@code null} if this queue uses the natural ordering of its elements
* 返回在队列中用于排序元素的比较器,如果队列使用它元素的比较能力则返回null
* @Description: 比较器:获取用于排序元素的比较器。
*/
public Comparator<? super E> comparator() {
return comparator;
}
private int indexOf(Object o)—— 索引关于 —— 获取指定元素在元素数组中的索引值,如果不存在则返回-1。
/**
* @Description: 索引关于:获取指定元素在元素数组中的索引值,如果不存在则返回-1。
*/
private int indexOf(Object o) {
if (o != null) {
Object[] array = queue;
int n = size;
for (int i = 0; i < n; i++)
if (o.equals(array[i]))
return i;
}
return -1;
}
private void removeAt(int i) —— 移除所在 —— 将指定索引位置的元素移除,并通过下排序和上排序维护整个元素数组的顺序。
/**
* Removes the ith element from queue.
* 从队列中移除第i个元素。
*
* @Description: 移除所在:将指定索引位置的元素移除,并通过下排序和上排序维护整个元素数组的顺序。
*/
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
if (n == i)
// removed last element
// 移除最后一个元素
array[i] = null;
else {
// 获取优先级阻塞队列的最后元素,该元素时用于填充被移除的元素的。
E moved = (E) array[n];
array[n] = null;
// 将最后元素覆盖至指定索引位置,随后通过比较器或比较能力进行下排序以维护整个元素数组的顺序。
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(i, moved, array, n);
else
siftDownUsingComparator(i, moved, array, n, cmp);
// 如果在下排序后,指定索引位置的元素就是覆盖的最后元素,则说明并没有发生移动。则该元素很有可能比指定索引位置的父节点要小,
// 因此再进行上排序。
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}
public boolean remove(Object o) —— 移除 —— 移除一个指定元素的单例,移除成功返回true;否则返回false。
/**
* Removes a single instance of the specified element from this queue, if it is present. More formally, removes an element {@code e} such that
* {@code o.equals(e)}, if this queue contains one or more such elements. Returns {@code true} if and only if this queue contained the specified
* element (or equivalently, if this queue changed as a result of the call).
* 如果存在,从队列中移除一个指定元素的单例。更正式的,应该通过o.equals(e)来判断队列包含一个或更多指定元素来进行移除。当且仅当
* 队列包含指定元素(或等效地,如果调用的结果是队列发生改变)时返回true。
*
* @param o element to be removed from this queue, if present 如果存在,从队列中移除的元素
* @return {@code true} if this queue changed as a result of the call 如果调用的结果是队列发生改变则返回true
* @Description: 移除:移除一个指定元素的单例,移除成功返回true;否则返回false。
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取指定元素在元素数组中的索引,如果为-1则表示不存在。
int i = indexOf(o);
if (i == -1)
return false;
// 移除指定索引位置的元素,并通过上排序和下排序来维护整个元素数组的顺序。
removeAt(i);
return true;
} finally {
lock.unlock();
}
}
void removeEQ(Object o) —— 移除EQ —— 移除指定元素。
/**
* Identity-based version for use in Itr.remove
* 基于身份的版本,用于Itr.remove
*
* @Description: 移除EQ:移除指定元素。
*/
void removeEQ(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 遍历整个元素数组,找到指定元素进行移除,并通过上/下排序来维护整个元素数组的顺序。
Object[] array = queue;
for (int i = 0, n = size; i < n; i++) {
if (o == array[i]) {
removeAt(i);
break;
}
}
} finally {
lock.unlock();
}
}
public boolean contains(Object o) —— 包含 —— 判断优先级阻塞队列是否包含指定元素,是则返回true;否则返回false。
/**
* Returns {@code true} if this queue contains the specified element. More formally, returns {@code true} if and only if this queue contains at
* least one element {@code e} such that {@code o.equals(e)}.
* 如果队列包含指定元素则返回true。更正式的,当且仅当通过o.equals(e)判断队列包含最少一个(指定)元素是返回true。
*
* @param o object to be checked for containment in this queue 在队列中检查包含的元素
* @return {@code true} if this queue contains the specified element 如果队列包含指定的元素则返回true
* @Description: 包含:判断优先级阻塞队列是否包含指定元素,是则返回true;否则返回false。
*/
public boolean contains(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取指定的元素的索引,如果不为-1,则表示存在。
return indexOf(o) != -1;
} finally {
lock.unlock();
}
}
public void clear() —— 清除 —— 清空优先级阻塞队列中的所有元素。
/**
* Atomically removes all of the elements from this queue. The queue will be empty after this call returns.
* 原子性的移除队列中的所有元素。该调用返回后队列将为空。
*
* @Description: 清空:清空优先级阻塞队列中的所有元素。
*/
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 遍历数组,将所有的元素置null,并将大小设置为0。
Object[] array = queue;
int n = size;
size = 0;
for (int i = 0; i < n; i++)
array[i] = null;
} finally {
lock.unlock();
}
}
public Object[] toArray() —— 转化数组 —— 返回一个包含优先级阻塞队列所有元素的数组。该数组是重新分配的,因此增删不会影响优先级阻塞队列。数组中的元素是直接按元素数组中的原始顺序排列的,并不是优先级阻塞队列的元素出队顺序。
/**
* Returns an array containing all of the elements in this queue. The returned array elements are in no particular order.
* 返回一个包含队列中所有元素的数组。返回的数组元素没有特定的顺序。
* <p>
* The returned array will be "safe" in that no references to it are maintained by this queue. (In other words, this method must allocate a new
* array). The caller is thus free to modify the returned array.
* 返回的数组是"安全的",队列中未维护任何指向该数组的引用。(换句话说,该方法必须分配一个新数组)。调用者因此可以自由的修改返
* 回的数组。
* <p>
* This method acts as bridge between array-based and collection-based APIs.
* 方法行为可作为基于数组和基于集的API(沟通的)桥梁。
*
* @return an array containing all of the elements in this queue 包含队列中所有元素的数组
* @Description: 转化数组:返回一个包含优先级阻塞队列所有元素的数组。该数组是重新分配的,因此增删不会影响优先级阻塞队列。数组中
* @Description: 的元素是直接按元素数组中的原始顺序排列的,并不是优先级阻塞队列的元素出队顺序。
*/
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return Arrays.copyOf(queue, size);
} finally {
lock.unlock();
}
}
public T[] toArray(T[] a) —— 转化数组 —— 返回一个包含优先级阻塞队列所有元素的数组。该数组是重新分配的,因此增删不会影响优先级阻塞队列。数组中的元素是直接按元素数组中的原始顺序排列的,并不是优先级阻塞队列的元素出队顺序。如果传入的数组长度足够,则直接将该数组返回;否则会新分配第一个指定类型且大小与优先级阻塞队列相同的数组用于返回。
/**
* Returns an array containing all of the elements in this queue; the runtime type of the returned array is that of the specified array. The
* returned array elements are in no particular order. If the queue fits in the specified array, it is returned therein. Otherwise, a new array is
* allocated with the runtime type of the specified array and the size of this queue.
* 返回一个包含队列中所有元素的数组;返回数组的运行时类型是指定数组(的运行时类型)。返回的数组元素没有特定的顺序。如果队列
* 符合指定的数组(意思是指定数组的长度足以容下队列的所有元素),则返回指定数组,否则分配一个指定数组的运行时类型与队列大小
* 的新数组。
* <p>
* If this queue fits in the specified array with room to spare (i.e., the array has more elements than this queue), the element in the array
* immediately following the end of the queue is set to {@code null}.
* 如果队列符合指定数组并且有所空余(即,数组拥有与队列更多的元素(应该说长度更合适,毕竟数组里面不一定有元素)),数组中的元
* 素直接跟随队列的重点设置为null(即将迁移后的最后元素的后一个索引位置设置为null)。
* <p>
* Like the {@link #toArray()} method, this method acts as bridge between array-based and collection-based APIs. Further, this method
* allows precise control over the runtime type of the output array, and may, under certain circumstances, be used to save allocation costs.
* 类似于toArray()方法,该方法行为可作为基于数组和基于集的API(沟通的)桥梁。此外,这种方法允许对输出数组的运行时类型进行精
* 确控制,并且在某些情况下,可以用于节省分配成本。
* <p>
* Suppose {@code x} is a queue known to contain only strings. The following code can be used to dump the queue into a newly allocated array
* of {@code String}:
* 假设一个队列已知只包含字符串。下述代码可用于倾泻队列之一个新分配的字符串数组:
* <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
* <p>
* Note that {@code toArray(new Object[0])} is identical in function to {@code toArray()}.
* 注意:toArray(new Object[0])在功能上等同于toArray()(其实还是不一样的,如果队列中没有元素,前者不会新分配数组)。
*
* @param a the array into which the elements of the queue are to be stored, if it is big enough; otherwise, a new array of the same runtime type
* is allocated for this purpose
* 如果足够大,用于保存队列中所有元素的数组;否则按计划分配一个相同运行时类型的新数组
* @return an array containing all of the elements in this queue 包含队列中所有元素的数组
* @throws ArrayStoreException if the runtime type of the specified array is not a supertype of the runtime type of every element in this queue
* 数组保存异常:如果指定数组的运行时类型不是队列中每个元素的运行时类型的超类
* @throws NullPointerException if the specified array is null
* 空指针异常:如果指定元素为null
* @Description: 转化数组:返回一个包含优先级阻塞队列所有元素的数组。该数组是重新分配的,因此增删不会影响优先级阻塞队列。数组中
* @Description: 的元素是直接按元素数组中的原始顺序排列的,并不是优先级阻塞队列的元素出队顺序。如果传入的数组长度足够,则直接将
* @Description: 该数组返回;否则会新分配第一个指定类型且大小与优先级阻塞队列相同的数组用于返回。
*/
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = size;
if (a.length < n)
// Make a new array of a's runtime type, but my contents:
// 创建一个a的运行时类型的新数组,但我的内容:
// 如果传入的数组长度不够,直接通过拷贝返回一个包含所有元素且类型相同的泛型数组。
return (T[]) Arrays.copyOf(queue, size, a.getClass());
// 如果传入的数组的长度足够,则通过拷贝将队列中的元素拷贝到指定数组中。
System.arraycopy(queue, 0, a, 0, n);
// 将后一位元素设置为null,表示结束。
if (a.length > n)
a[n] = null;
return a;
} finally {
lock.unlock();
}
}
public Iterator iterator() —— 迭代器 —— 获取一个优先级阻塞队列的迭代器。
/**
* Returns an iterator over the elements in this queue. The iterator does not return the elements in any particular order.
* 返回一个遍历队列中元素的迭代器。该迭代器不按任何特定的顺序返回元素(即不按出队顺序)。
* <p>
* The returned iterator is <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
* 返回的迭代器是弱一致性的。
*
* @return an iterator over the elements in this queue 一个遍历队列中元素的迭代器
*/
public Iterator<E> iterator() {
// 直接将当前时间的元素数组拷贝作为迭代器可迭代的元素...这实现真容易...丫的数组阻塞队列能有这什么简单不就好了。
return new Itr(toArray());
}
二 Itr类源码及机制详解
类
/**
* Snapshot iterator that works off copy of underlying q array.
* 快照迭代器,该迭代器工作于基础q数组的副本。
*
* @Description: 迭代器类:直接将优先级阻塞队列的元素快照保存起来进行遍历,由此可以完全不受有优先级阻塞队列的影响。
*/
final class Itr implements Iterator<E> {
...
}
字段
array(数组) —— 持有优先级阻塞队列元素数组的快照。
// Array of all elements
// 所有元素的数组
final Object[] array;
cursor(游标) —— 记录下个元素在数组中的索引。
// index of next element to return
// 下个返回元素的索引
int cursor;
currentElement(当前元素) —— 用于持有当前节点中容纳的元素。由于节点可能会被移除/拿取(包括中途移除),这种情况下节点都会被转化空节点,因此元素需要额外保存。
lastRet(上个索引) —— 记录上个元素在数组中的索引。
// index of last element, or -1 if no such
// 上个元素的索引,如果没有这样的(操作)则为-1。
int lastRet;
构造方法
Itr(Object[] array)—— 初始化优先级阻塞队列的迭代器,保存元素数组。
Itr(Object[] array) {
// 直接优先级阻塞队列的元素数组快照保存起来,因此迭代器中保存所有要迭代的元素。
lastRet = -1;
this.array = array;
}
方法
public boolean hasNext() —— 有下个 —— 判断是否存在下个可迭代的元素。如果存在返回true;否则返回false。
/**
* @Description: 存在下个:判断是否存在下个可迭代的元素。
*/
public boolean hasNext() {
// 通过判断索引值是否合法来判断是否存在可迭代的元素。
return cursor < array.length;
}
public E next() —— 有下个 —— 返回下个迭代的元素。
/**
* @Description: 下个:返回下个迭代的元素。
*/
public E next() {
// 如果索引值不合法,直接抛出异常。
if (cursor >= array.length)
throw new NoSuchElementException();
// 将游标设置为上个索引,用于执行迭代器的移除操作。
lastRet = cursor;
// 将游标在元素数组快照中的对应元素返回,并对游标进行递增。
return (E) array[cursor++];
}
public void remove() —— 移除 —— 移除上个迭代的元素。
/**
* @Description: 移除:移除上个迭代的元素。
*/
public void remove() {
// 判断是否进行过迭代或上个迭代的元素是否已经被移除,如果是,则抛出异常。
if (lastRet < 0)
throw new IllegalStateException();
// 找到元素数组快照中保存的上个元素,在元素数组中寻找并移除这个元素(如果这个元素还存在的话)。
removeEQ(array[lastRet]);
// 将上个索引赋值为-1,表示已移除。因此当上个索引的值为-1时有两种可能,一是尚未迭代,二是上个迭代的元素已被移除。
lastRet = -1;
}