今天繼續說一說阻塞隊列的實現,今天的主角就是優先級阻塞隊列PriorityBlockingQueue,從命名上看以爲應該是有序的,畢竟是優先級隊列,那麼其實是什麼狀況,咱們一塊兒看下其內部實現,提早說明下,由於PriorityBlockingQueue涉及到了堆排序的相關使用,若是沒了解清楚,能夠參考我以前寫的關於堆排序的相關說明java
JDK版本號:1.8.0_171
PriorityBlockingQueue是一個無限容量的阻塞隊列,固然,最終仍是受內存限制,內部實現是數組,不停增加下去會致使OOM,因爲其無限容量的特性,在入隊操做時不存在阻塞這個說法,只要內存足夠都能入隊,固然,入隊操做線程仍是須要爭搶互斥鎖的,只是不會存在隊列已滿狀況下的阻塞等待操做算法
同時,雖然這個被稱爲優先級阻塞隊列,可是入隊操做以後並不會當即進行排序調整,只有在出隊操做或drainTo轉移隊列時纔是被優先級隊列排過序的。PriorityBlockingQueue是經過Comparator來進行排序,因此入隊的對象自己已經實現Comparator接口,或者傳入一個Comparator實例對象才能夠api
PriorityBlockingQueue排序是經過最小堆實現的,以前的文章裏我已經專門說明了堆排序的算法,這裏再也不詳細說明,不明白的能夠先去參考堆排序的講解部分。優先級隊列初始容量默認爲11,當入隊空間不足時會進行擴容操做,擴容大小根據擴容前的容量決定數組
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
/** * 默認初始化數組長度11 */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * 容許的最大數組長度,減8是由於有可能部分虛擬機會用一部分空間來保存對象頭信息 */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * * 優先級隊列經過平衡二叉堆實現,可類比堆排序算法 * 那麼queue[n]對應的左右子節點分別爲queue[2*n+1]和[2*(n+1)] * 隊列中的對象必須是可比較的,默認的天然排序或自行實現的的Comparator均可 * 隊列非空,則queue[0]爲最小值,即以最小二叉堆排序 */ private transient Object[] queue; /** * 優先級隊列queue包含的元素個數 */ private transient int size; /** * 比較器對象,在使用天然排序比較時爲null */ private transient Comparator<? super E> comparator; /** * 互斥鎖,只有一個ReentrantLock */ private final ReentrantLock lock; /** * 非空信號量,隊列爲空時阻塞出隊線程 * 只須要判斷隊列爲空的狀況,隊列沒有滿的狀況,因此纔是無限容量隊列 */ private final Condition notEmpty; /** * Spin鎖,經過CAS操做實現 */ private transient volatile int allocationSpinLock; /** * 在序列化中使用,爲了兼容老版本 */ private PriorityQueue<E> q;
allocationSpinLock在對象中的內存偏移量獲取在靜態代碼塊中實現以下,後續使用CAS操做用到oop
// Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long allocationSpinLockOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = PriorityBlockingQueue.class; allocationSpinLockOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("allocationSpinLock")); } catch (Exception e) { throw new Error(e); } }
在不傳參時,默認初始化數組長度爲11,即優先級隊列默認容量爲11,主要在於傳入集合參數時須要進行判斷是否知足PriorityBlockingQueue使用的條件,非空,可比較的對象,另外,還需判斷是否須要進行堆化操做ui
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); // 是否須要從新排序標識,即堆化標識 boolean heapify = true; // true if not known to be in heap order // 空值檢查標識 boolean screen = true; // true if must screen for nulls // 集合爲SortedSet,則使用其Comparator排序,因爲其已有序,直接複製便可,無需堆化操做 if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } // 集合爲PriorityBlockingQueue,則使用其Comparator排序 else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; // 精確到PriorityBlockingQueue類,因爲其已有序,直接複製便可,無需堆化操做 if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. // 假如沒有正確返回Object[],則複製a if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); // 對集合進行空值校驗 // this.comparator != null 判斷SortedSet類型的對象空值狀況 // n == 1 這個感受應該是對天然排序的對象作的操做 n >= 1 纔對,不然就在heapify()堆化操做比較時拋錯,能夠本身嘗試放List if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; // 堆化操做 if (heapify) heapify(); }
擴容操做,在offer中獲取鎖的時候調用,擴容以前先釋放鎖,經過CAS操做將allocationSpinLock標識置爲1,表示當前正在擴容中,擴容完畢則從新獲取鎖,allocationSpinLock標識置爲0this
private void tryGrow(Object[] array, int oldCap) { // 先釋放鎖 lock.unlock(); // must release and then re-acquire main lock // 準備的新數組 Object[] newArray = null; // 其餘線程未進行擴容操做時嘗試使用CAS更新allocationSpinLock標識爲1,成功則當前線程取得擴容操做權限 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 原數組容量小於64,則每次增加oldCap + 2 // 原數組容量大於等於64,則每次增加oldCap的一半 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); // 新的數組容量大於最大的數組長度限制 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; // 原數組容量加1就已經溢出或者超過最大長度限制直接拋出OOM if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); // 設置新數組容量爲最大值 newCap = MAX_ARRAY_SIZE; } // 擴容成功且當前數組沒有被其餘線程操做,則建立一個新數組 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { // 將擴容標識恢復 allocationSpinLock = 0; } } // 其餘線程已經在擴容了,讓出cpu if (newArray == null) // back off if another thread is allocating Thread.yield(); // 從新得到鎖 lock.lock(); // 擴容成功且原數組沒被其餘線程操做則複製原數組到新數組中 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
相似堆排序的操做,不一樣在於,這些方法是類比插入新節點,即數組中添加新的值時調用,添加完以後整個堆須要進行調整,Up也說明了是從下往上進行堆的平衡調整。在調用這個方法前,堆應該是已經平衡的,若是未平衡,須要先進行堆化操做,參考heapify方法。入隊操做時,將x插入k的位置上,入隊時至關於將新元素放入k的位置上(還未徹底執行,須要知足堆特性),因爲新添加元素可能會破壞整個堆,因此須要從下往上調整整個堆,直到x大於等於其父節點或者到達根節點spa
/** * @param k the position to fill * @param x the item to insert * @param array the heap array */ private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { // 找到k位置處的父節點 int parent = (k - 1) >>> 1; // 父節點對應的值 Object e = array[parent]; // 父子節點比較,k位置處的節點大於等於其父節點,則退出,不須要對堆進行調整了 if (key.compareTo((T) e) >= 0) break; // k位置處的節點小於其父節點,則將k位置處的值改成其父節點值 array[k] = e; // k指向其父節點,至關於堆向上遞進了一層,繼續while判斷其父節點是否須要調整 k = parent; } // 結束調整時k指向的位置即爲插入x的值,即key array[k] = key; } /** * 同上,區別在於這個使用了一個比較對象cmp,上邊是天然排序 */ private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; }
出隊操做時使用,出隊出的是堆頂元素,即array[0],那麼出隊完成以後堆頂元素空缺,將array[n]處的元素放入位置0處(還未真正執行,須要先驗證是否知足堆特性),這裏翻譯說的是插入,能夠這麼理解,出隊時至關於將最後一個葉子節點移動到根(堆頂位置),這裏就須要從上往下調整整個堆,使其知足堆的特性,這裏按小頂堆處理,最上邊則是最小值,需知足節點值小於其兩個子節點的值便可線程
/** * @param k the position to fill * @param x the item to insert * @param array the heap array * @param n heap size */ private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; // 最後的非葉子節點位置 int half = n >>> 1; // loop while a non-leaf while (k < half) { // k的左子節點 int child = (k << 1) + 1; // assume left child is least // k的左子節點對應的值 Object c = array[child]; // k的右子節點 int right = child + 1; // 左右子節點中最大值 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; // 對比,其節點值小於等於其左右子節點,則代表堆調整完畢 if (key.compareTo((T) c) <= 0) break; // k節點處的值爲其子節點中最大的值 array[k] = c; // k指向其子節點最大值的那個索引位置 k = child; } // 結束調整時k指向的位置即爲插入x的值,即key array[k] = key; } } /** * 同上,區別在於這個使用了一個比較對象cmp,上邊是天然排序 */ private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } }
出隊核心操做,須要先獲取互斥鎖才能執行,出隊元素爲array[0]節點,出隊以後進行堆排序的操做,步驟以下:翻譯
private E dequeue() { // 當前隊列元素的長度 int n = size - 1; // 無值 返回null if (n < 0) return null; else { Object[] array = queue; // 保存堆頂元素array[0] E result = (E) array[0]; // 保存最後一個元素 E x = (E) array[n]; // 置空清除最後一個元素 array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) // 經過天然排序使得整個堆保持小頂堆的特性,下面說 siftDownComparable(0, x, array, n); else // 經過傳入的比較類對象排序使得整個堆保持小頂堆的特性 siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
堆化操做,從最後一個非葉子節點開始,循環對每一個節點平衡,直到堆頂,完成堆的平衡操做
/** * 構造方法傳入集合時用到 * 將集合進行堆化操做,知足堆的特性 */ private void heapify() { Object[] array = queue; int n = size; // 最後一個非葉子節點 int half = (n >>> 1) - 1; Comparator<? super E> cmp = comparator; if (cmp == null) { // 從最後一個非葉子節點開始調整 // 這裏使用的是siftDownComparable,使得節點及其子節點知足堆特性 // 逐步向上遍歷,最終使得整個數組知足堆特性 for (int i = half; i >= 0; i--) siftDownComparable(i, (E) array[i], array, n); } else { // 同上,多了個比較對象 for (int i = half; i >= 0; i--) siftDownUsingComparator(i, (E) array[i], array, n, cmp); } }
入隊操做,最終都是調用offer,這裏使用siftUpComparable從下向上調整,由於咱們是將新值放到了隊列最後,應向上進行調整
public boolean offer(E e) { if (e == null) throw new NullPointerException(); // 得到鎖 final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 數組容量不夠時進行擴容操做,上邊已經說過tryGrow這部分 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; // 入隊操做,將新節點放入最後,須要使用siftUpComparable從下往上進行調整 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); // 容量+1 size = n + 1; // 隊列有數據則喚醒阻塞的出隊線程 notEmpty.signal(); } finally { lock.unlock(); } return true; }
出隊操做,最終調用dequeue,上邊已說過,其他部分同以前講過的阻塞隊列相似,不過多說明
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result; }
刪除隊列中的某個元素,remove和removeEQ方法也是使用這個方法來進行操做的。這裏有個地方須要注意下,在刪除非最後一個節點時須要進行堆調整,把最後一個節點當成新值添加到刪除位置,先經過siftDownComparable/siftDownUsingComparator向下進行平衡調整,若是沒有進行調整,則須要調用siftUpComparable/siftUpUsingComparator向上進行調整,有些人可能不是很明白,其實想下堆的特性就能瞭解,在數組中並非徹底有序的,在最小堆中只要知足父節點小於等於其子節點便可,因此這裏在註釋上我也進行了說明,若是向下調整了,則i處的子節點取代了i,原來i處的節點必定大於等於i的父節點,因此i的子節點也大於等於i的父節點,不須要向上調整了。例以下圖這種狀況,就須要繼續向上調整:
在刪除36節點後,若是把24節點放入刪除後的節點上,此時會致使array[i] == moved,須要向上調整,其實也是由於堆的特性致使,堆只保證了堆頂元素的有序性,其餘元素若是調整則須要從新進行平衡操做
private void removeAt(int i) { Object[] array = queue; int n = size - 1; // 移除最後一個節點,不須要調整堆 if (n == i) // removed last element array[i] = null; else { // 下面至關於刪除隊列中間某個節點進行的操做 // 相似出隊操做,只不過如今不必定是堆頂 E moved = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) // 至關於將n處的節點放入刪除的節點位置i處,以後向下進行堆化平衡操做 siftDownComparable(i, moved, array, n); else siftDownUsingComparator(i, moved, array, n, cmp); // 向下無平衡操做,則需向上進行堆化平衡操做 // 若是向下調整了,則i處的子節點取代了i,原來i處的節點必定大於等於i的父節點,因此i的子節點也大於等於i的父節點 if (array[i] == moved) { if (cmp == null) siftUpComparable(i, moved, array); else siftUpUsingComparator(i, moved, array, cmp); } } size = n; }
轉移maxElements個元素到集合c中,從源碼實現上能夠看到轉移以後的元素是有序的,而不是像PriorityBlockingQueue裏的數組是無序的,每次轉移,先直接添加堆頂元素,再出隊操做,循環調用使得轉移後的集合有序
public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { // 集合長度 int n = Math.min(size, maxElements); for (int i = 0; i < n; i++) { // 先添加 c.add((E) queue[0]); // In this order, in case add() throws. // 出隊操做,堆已平衡 dequeue(); } return n; } finally { lock.unlock(); } }
迭代器使用時的方法以下,每次調用建立一個新的迭代器對象Itr,入參調用了toArray()方法,拷貝了當前數組隊列,不是直接放入的原數組隊列
public Iterator<E> iterator() { return new Itr(toArray()); } public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return Arrays.copyOf(queue, size); } finally { lock.unlock(); } }
看下其迭代器實現類,保存的是當前數組的一個拷貝,可是remove操做是刪除的PriorityBlockingQueue原數組中對應的元素,須要注意
/** * 迭代器,其中的數組是拷貝了當前數組的快照 */ final class Itr implements Iterator<E> { // 數組,這裏保存的實際上是原數組的快照,參考迭代調用方法 final Object[] array; // Array of all elements // 遊標,下一次next執行時對應的值的索引 int cursor; // index of next element to return // 上一個next元素索引值,即上一次next()執行返回的那個值的索引,無則爲-1 int lastRet; // index of last element, or -1 if no such // 獲取迭代器時調用,看上邊源碼 Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } public E next() { // 遊標指向已超過數組長度,拋錯 if (cursor >= array.length) throw new NoSuchElementException(); // 更新lastRet,記錄next值索引 lastRet = cursor; // 返回next應該獲取的值,同時遊標索引+1 return (E)array[cursor++]; } public void remove() { // 無值拋錯 if (lastRet < 0) throw new IllegalStateException(); // 這裏調用removeEQ方法進行移除操做,注意,這裏刪除的是PriorityBlockingQueue的原數組中對應的值,不是這個拷貝數組 removeEQ(array[lastRet]); // 置爲-1 lastRet = -1; } }
迭代器是原數組的一個快照版本,故也是無序的,若是想經過迭代器獲取有序數組是不可能的,同時,使用時須要注意remove方法,避免誤刪
至此,PriorityBlockingQueue源碼基本說明完畢,須要理解的在於如下幾點:
以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝