本文首發於一世流雲專欄: https://segmentfault.com/blog...
PriorityBlockingQueue
,是在JDK1.5時,隨着J.U.C包引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基於堆實現:java
PriorityBlockingQueue是一種無界阻塞隊列,在構造的時候能夠指定隊列的初始容量。具備以下特色:segmentfault
Integer.MAX_VALUE
;Comparable
接口;關於堆,若是讀者不瞭解,能夠參考下個人這篇博文預熱下——優先級隊列。api
注意: 堆分爲「大頂堆」和「小頂堆」,PriorityBlockingQueue會依據元素的比較方式選擇構建大頂堆或小頂堆。好比:若是元素是Integer這種引用類型,那麼默認就是「小頂堆」,也就是每次出隊都會是當前隊列最小的元素。
PriorityBlockingQueue提供了四種構造器:數組
/** * 默認構造器. * 默認初始容量11, 以元素天然順序比較(元素必須實現Comparable接口) */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); }
/** * 指定初始容量的構造器. * 以元素天然順序比較(元素必須實現Comparable接口) */ public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); }
/** * 指定初始容量和比較器的構造器. */ public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
/** * 從已有集合構造隊列. * 若是已經集合是SortedSet或者PriorityBlockingQueue, 則保持原來的元素順序 */ public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { // 若是是有序集合 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { // 若是是優先級隊列 PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { // 校驗是否存在null元素 for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) // 堆排序 heapify(); }
重點是第三種構造器,能夠看到,PriorityBlockingQueue內部也是利用了ReentrantLock來保證併發訪問時的線程安全。
PriorityBlockingQueue若是不指定容量,默認容量爲11,內部數組queue實際上是一種二叉樹,後續咱們會詳細介紹。安全
須要注意的是,PriorityBlockingQueue只有一個條件等待隊列——notEmpty
,由於構造時不會限制最大容量且會自動擴容,因此插入元素並不會阻塞,僅當隊列爲空時,纔可能阻塞「出隊」線程。併發
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 默認容量. */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * 最大容量. */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * 內部堆數組, 保存實際數據, 能夠當作一顆二叉樹: * 對於頂點queue[n], queue[2*n+1]表示左子結點, queue[2*(n+1)]表示右子結點. */ private transient Object[] queue; /** * 隊列中的元素個數. */ private transient int size; /** * 比較器, 若是爲null, 表示以元素自身的天然順序進行比較(元素必須實現Comparable接口). */ private transient Comparator<? super E> comparator; /** * 全局鎖. */ private final ReentrantLock lock; /** * 當隊列爲空時,出隊線程在該條件隊列上等待. */ private final Condition notEmpty; // ... }
PriorityBlockingQueue插入元素不會阻塞線程,put(E e)
方法內部實際上是調用了offer(E e)
方法:
首先獲取全局鎖(對於隊列的修改都要獲取這把鎖),而後判斷下隊列是否已經滿了,若是滿了就先進行一次內部數組的擴容(關於擴容,咱們後面會專門講):this
/** * 向隊列中插入指定元素. * 因爲隊列是無界的,因此不會阻塞線程. */ public void put(E e) { offer(e); // never need to block } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) // 隊列已滿, 則進行擴容 tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) // 比較器爲空, 則按照元素的天然順序進行堆調整 siftUpComparable(n, e, array); else // 比較器非空, 則按照比較器進行堆調整 siftUpUsingComparator(n, e, array, cmp); size = n + 1; // 隊列元素總數+1 notEmpty.signal(); // 喚醒一個可能正在等待的"出隊線程" } finally { lock.unlock(); } return true; }
上面最關鍵的是siftUpComparable和siftUpUsingComparator方法,這兩個方法內部幾乎同樣,只不過前者是一個根據元素的天然順序比較,後者則根據外部比較器比較,咱們重點看下siftUpComparable方法:spa
/** * 將元素x插入到array[k]的位置. * 而後按照元素的天然順序進行堆調整——"上浮",以維持"堆"有序. * 最終的結果是一個"小頂堆". */ private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; // 至關於(k-1)除2, 就是求k結點的父結點索引parent Object e = array[parent]; if (key.compareTo((T) e) >= 0) // 若是插入的結點值大於父結點, 則退出 break; // 不然,交換父結點和當前結點的值 array[k] = e; k = parent; } array[k] = key; }
siftUpComparable方法的做用其實就是堆的「上浮調整」,能夠把堆能夠想象成一棵徹底二叉樹,每次插入元素都連接到二叉樹的最右下方,而後將插入的元素與其父結點比較,若是父結點大,則交換元素,直到沒有父結點比插入的結點大爲止。這樣就保證了堆頂(二叉樹的根結點)必定是最小的元素。(注:以上僅針對「小頂堆」)線程
咱們經過示例來理解下入隊的整個過程:假設初始構造的隊列大小爲6,依次插入九、二、9三、十、2五、90。3d
①初始隊列狀況
②插入元素9(索引0處)
將上述數組想象成一棵徹底二叉樹,其實就是下面的結構:
③插入元素2(索引1處)
對應的二叉樹:
因爲結點2的父結點爲9,因此要進行「上浮調整」,最終隊列結構以下:
④插入元素93(索引2處)
⑤插入元素10(索引3處)
⑥插入元素25(索引4處)
⑦插入元素90(索引5處)
此時,堆不知足有序條件,由於「90」的父結點「93」大於它,因此須要「上浮調整」:
最終,堆的結構如上,能夠看到,通過調整後,堆頂元素必定是最小的。
在入隊過程當中,若是隊列內部的queue
數組已經滿了,就須要進行擴容:
public boolean offer(E e) { // ... while ((n = size) >= (cap = (array = queue).length)) // 隊列已滿, 則進行擴容 tryGrow(array, cap); // ... }
咱們來看下tryGrow方法:
private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // 擴容和入隊/出隊能夠同時進行, 因此先釋放全局鎖 Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { // allocationSpinLock置1表示正在擴容 try { // 計算新的數組大小 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // 溢出判斷 int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; // 分配新數組 } finally { allocationSpinLock = 0; } } if (newArray == null) // 擴容失敗(可能有其它線程正在擴容,致使allocationSpinLock競爭失敗) Thread.yield(); lock.lock(); // 獲取全局鎖(由於要修改內部數組queue) if (newArray != null && queue == array) { queue = newArray; // 指向新的內部數組 System.arraycopy(array, 0, newArray, 0, oldCap); } }
上述整個過程仍是比較清晰的,因爲調用tryGrow的方法必定會先獲取全局鎖,因此先釋放鎖,由於可能有線程正在出隊,擴容/出隊是能夠併發執行的(擴容的前半部分只是新建一個內部數組,不會對出隊產生影響)。擴容後的內部數組大小通常爲原來的2倍。
上述須要注意的是allocationSpinLock
字段,該字段經過CAS操做,置1表示有線程正在進行擴容。
刪除元素(出隊)的整個過程比較簡單,也是先獲取全局鎖,而後判斷隊列狀態,若是是空,則阻塞線程,不然調用dequeue
方法出隊:
/** * 出隊一個元素. * 若是隊列爲空, 則阻塞線程. */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 獲取全局鎖 E result; try { while ((result = dequeue()) == null) // 隊列爲空 notEmpty.await(); // 線程在noEmpty條件隊列等待 } finally { lock.unlock(); } return result; } private E dequeue() { int n = size - 1; // n表示出隊後的剩餘元素個數 if (n < 0) // 隊列爲空, 則返回null return null; else { Object[] array = queue; E result = (E) array[0]; // array[0]是堆頂結點, 每次出隊都刪除堆頂結點 E x = (E) array[n]; // array[n]是堆的最後一個結點, 也就是二叉樹的最右下結點 array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
從dequeue方法能夠看出,每次出隊的元素都是「堆頂結點」,對於「小頂堆」就是隊列中的最小值,對於「大頂堆」就是隊列中的最大值。
咱們看下siftDownComparable方法如何實現堆頂點的刪除:
/** * 堆的"下沉"調整. * 刪除array[k]對應的結點,並從新調整堆使其有序. * * @param k 待刪除的位置 * @param x 待比較的健 * @param array 堆數組 * @param n 堆的大小 */ private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>) x; int half = n >>> 1; // 至關於n除2, 即找到索引n對應結點的父結點 while (k < half) { /** * 下述代碼中: * c保存k的左右子結點中的較小結點值 * child保存較小結點對應的索引 */ int child = (k << 1) + 1; // k的左子結點 Object c = array[child]; int right = child + 1; // k的右子結點 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } }
上述代碼實際上是經典的堆「下沉」操做,對堆中某個頂點下沉,步驟以下:
來看個示例,假設堆的初始結構以下,如今出隊一個元素(索引0位置的元素2)。
①初始狀態
對應二叉樹結構:
②將頂點與最後一個結點調換
即將頂點「2」與最後一個結點「93」交換,而後將索引5爲止置null。
注意: 爲了提高效率(好比siftDownComparable的源碼所示)並不必定要真正交換,能夠用一個變量保存索引5處的結點值,在整個下沉操做完成後再替換。可是爲了理解這一過程,示例圖中全是以交換進行的。
③下沉索引0處結點
比較元素「93」和左右子結點中的最小者,發現「93」大於「9」,違反了「小頂堆」的規則,因此交換「93」和「9」,這一過程稱爲siftdown(下沉):
④繼續下沉索引1處結點
比較元素「93」和左右子結點中的最小者,發現「93」大於「10」,違反了「小頂堆」的規則,因此交換「93」和「10」:
⑤比較結束
因爲「93」已經沒有左右子結點了,因此下沉結束,能夠看到,此時堆恢復了有序狀態,最終隊列結構以下:
PriorityBlockingQueue屬於比較特殊的阻塞隊列,適用於有元素優先級要求的場景。它的內部和ArrayBlockingQueue同樣,使用一個了全局獨佔鎖來控制同時只有一個線程能夠進行入隊和出隊,另外因爲該隊列是無界隊列,因此入隊線程並不會阻塞。
PriorityBlockingQueue始終保證出隊的元素是優先級最高的元素,而且能夠定製優先級的規則,內部經過使用堆(數組形式)來維護元素順序,它的內部數組是可擴容的,擴容和出/入隊能夠併發進行。