public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
/** * 默認初始化數組長度11 */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * 容許的最大數組長度,減8是由於有可能部分虛擬機會用一部分空間來保存對象頭信息 */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * * 優先級隊列經過平衡二叉堆實現,可類比堆排序算法 * 那麼queue[n]對應的左右子節點分別爲queue[2*n+1]和[2*(n+1)] * 隊列中的對象必須是可比較的,默認的天然排序或自行實現的的Comparator均可 * 隊列非空,則queue[0]爲最小值,即以最小二叉堆排序 */ private transient Object[] queue; /** * 優先級隊列queue包含的元素個數 */ private transient int size; /** * 比較器對象,在使用天然排序比較時爲null */ private transient Comparator<? super E> comparator; /** * 互斥鎖,只有一個ReentrantLock */ private final ReentrantLock lock; /** * 非空信號量,隊列爲空時阻塞出隊線程 * 只須要判斷隊列爲空的狀況,隊列沒有滿的狀況,因此纔是無限容量隊列 */ private final Condition notEmpty; /** * Spin鎖,經過CAS操做實現 */ private transient volatile int allocationSpinLock; /** * 在序列化中使用,爲了兼容老版本 */ private PriorityQueue<E> q;
// Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long allocationSpinLockOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = PriorityBlockingQueue.class; allocationSpinLockOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("allocationSpinLock")); } catch (Exception e) { throw new Error(e); } }
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); // 是否須要從新排序標識,即堆化標識 boolean heapify = true; // true if not known to be in heap order // 空值檢查標識 boolean screen = true; // true if must screen for nulls // 集合爲SortedSet,則使用其Comparator排序,因爲其已有序,直接複製便可,無需堆化操做 if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } // 集合爲PriorityBlockingQueue,則使用其Comparator排序 else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; // 精確到PriorityBlockingQueue類,因爲其已有序,直接複製便可,無需堆化操做 if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. // 假如沒有正確返回Object[],則複製a if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); // 對集合進行空值校驗 // this.comparator != null 判斷SortedSet類型的對象空值狀況 // n == 1 這個感受應該是對天然排序的對象作的操做 n >= 1 纔對,不然就在heapify()堆化操做比較時拋錯,能夠本身嘗試放List if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; // 堆化操做 if (heapify) heapify(); }
private void tryGrow(Object[] array, int oldCap) { // 先釋放鎖 lock.unlock(); // must release and then re-acquire main lock // 準備的新數組 Object[] newArray = null; // 其餘線程未進行擴容操做時嘗試使用CAS更新allocationSpinLock標識爲1,成功則當前線程取得擴容操做權限 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 原數組容量小於64,則每次增加oldCap + 2 // 原數組容量大於等於64,則每次增加oldCap的一半 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); // 新的數組容量大於最大的數組長度限制 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; // 原數組容量加1就已經溢出或者超過最大長度限制直接拋出OOM if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); // 設置新數組容量爲最大值 newCap = MAX_ARRAY_SIZE; } // 擴容成功且當前數組沒有被其餘線程操做,則建立一個新數組 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { // 將擴容標識恢復 allocationSpinLock = 0; } } // 其餘線程已經在擴容了,讓出cpu if (newArray == null) // back off if another thread is allocating Thread.yield(); // 從新得到鎖 lock.lock(); // 擴容成功且原數組沒被其餘線程操做則複製原數組到新數組中 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
/** * @param k the position to fill * @param x the item to insert * @param array the heap array */ private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { // 找到k位置處的父節點 int parent = (k - 1) >>> 1; // 父節點對應的值 Object e = array[parent]; // 父子節點比較,k位置處的節點大於等於其父節點,則退出,不須要對堆進行調整了 if (key.compareTo((T) e) >= 0) break; // k位置處的節點小於其父節點,則將k位置處的值改成其父節點值 array[k] = e; // k指向其父節點,至關於堆向上遞進了一層,繼續while判斷其父節點是否須要調整 k = parent; } // 結束調整時k指向的位置即爲插入x的值,即key array[k] = key; } /** * 同上,區別在於這個使用了一個比較對象cmp,上邊是天然排序 */ private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; }
/** * @param k the position to fill * @param x the item to insert * @param array the heap array * @param n heap size */ private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; // 最後的非葉子節點位置 int half = n >>> 1; // loop while a non-leaf while (k < half) { // k的左子節點 int child = (k << 1) + 1; // assume left child is least // k的左子節點對應的值 Object c = array[child]; // k的右子節點 int right = child + 1; // 左右子節點中最大值 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; // 對比,其節點值小於等於其左右子節點,則代表堆調整完畢 if (key.compareTo((T) c) <= 0) break; // k節點處的值爲其子節點中最大的值 array[k] = c; // k指向其子節點最大值的那個索引位置 k = child; } // 結束調整時k指向的位置即爲插入x的值,即key array[k] = key; } } /** * 同上,區別在於這個使用了一個比較對象cmp,上邊是天然排序 */ private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } }
private E dequeue() { // 當前隊列元素的長度 int n = size - 1; // 無值 返回null if (n < 0) return null; else { Object[] array = queue; // 保存堆頂元素array[0] E result = (E) array[0]; // 保存最後一個元素 E x = (E) array[n]; // 置空清除最後一個元素 array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) // 經過天然排序使得整個堆保持小頂堆的特性,下面說 siftDownComparable(0, x, array, n); else // 經過傳入的比較類對象排序使得整個堆保持小頂堆的特性 siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
/** * 構造方法傳入集合時用到 * 將集合進行堆化操做,知足堆的特性 */ private void heapify() { Object[] array = queue; int n = size; // 最後一個非葉子節點 int half = (n >>> 1) - 1; Comparator<? super E> cmp = comparator; if (cmp == null) { // 從最後一個非葉子節點開始調整 // 這裏使用的是siftDownComparable,使得節點及其子節點知足堆特性 // 逐步向上遍歷,最終使得整個數組知足堆特性 for (int i = half; i >= 0; i--) siftDownComparable(i, (E) array[i], array, n); } else { // 同上,多了個比較對象 for (int i = half; i >= 0; i--) siftDownUsingComparator(i, (E) array[i], array, n, cmp); } }
public boolean offer(E e) { if (e == null) throw new NullPointerException(); // 得到鎖 final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 數組容量不夠時進行擴容操做,上邊已經說過tryGrow這部分 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; // 入隊操做,將新節點放入最後,須要使用siftUpComparable從下往上進行調整 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); // 容量+1 size = n + 1; // 隊列有數據則喚醒阻塞的出隊線程 notEmpty.signal(); } finally { lock.unlock(); } return true; }
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result; }
在刪除36節點後,若是把24節點放入刪除後的節點上,此時會致使array[i] == moved,須要向上調整,其實也是由於堆的特性致使,堆只保證了堆頂元素的有序性,其餘元素若是調整則須要從新進行平衡操做
private void removeAt(int i) { Object[] array = queue; int n = size - 1; // 移除最後一個節點,不須要調整堆 if (n == i) // removed last element array[i] = null; else { // 下面至關於刪除隊列中間某個節點進行的操做 // 相似出隊操做,只不過如今不必定是堆頂 E moved = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) // 至關於將n處的節點放入刪除的節點位置i處,以後向下進行堆化平衡操做 siftDownComparable(i, moved, array, n); else siftDownUsingComparator(i, moved, array, n, cmp); // 向下無平衡操做,則需向上進行堆化平衡操做 // 若是向下調整了,則i處的子節點取代了i,原來i處的節點必定大於等於i的父節點,因此i的子節點也大於等於i的父節點 if (array[i] == moved) { if (cmp == null) siftUpComparable(i, moved, array); else siftUpUsingComparator(i, moved, array, cmp); } } size = n; }
public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { // 集合長度 int n = Math.min(size, maxElements); for (int i = 0; i < n; i++) { // 先添加 c.add((E) queue[0]); // In this order, in case add() throws. // 出隊操做,堆已平衡 dequeue(); } return n; } finally { lock.unlock(); } }
public Iterator<E> iterator() { return new Itr(toArray()); } public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return Arrays.copyOf(queue, size); } finally { lock.unlock(); } }
/** * 迭代器,其中的數組是拷貝了當前數組的快照 */ final class Itr implements Iterator<E> { // 數組,這裏保存的實際上是原數組的快照,參考迭代調用方法 final Object[] array; // Array of all elements // 遊標,下一次next執行時對應的值的索引 int cursor; // index of next element to return // 上一個next元素索引值,即上一次next()執行返回的那個值的索引,無則爲-1 int lastRet; // index of last element, or -1 if no such // 獲取迭代器時調用,看上邊源碼 Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } public E next() { // 遊標指向已超過數組長度,拋錯 if (cursor >= array.length) throw new NoSuchElementException(); // 更新lastRet,記錄next值索引 lastRet = cursor; // 返回next應該獲取的值,同時遊標索引+1 return (E)array[cursor++]; } public void remove() { // 無值拋錯 if (lastRet < 0) throw new IllegalStateException(); // 這裏調用removeEQ方法進行移除操做,注意,這裏刪除的是PriorityBlockingQueue的原數組中對應的值,不是這個拷貝數組 removeEQ(array[lastRet]); // 置爲-1 lastRet = -1; } }