PriorityBlockingQueue 是帶優先級的無界阻塞隊列,每次出隊都返回優先級最高的元素,是二叉樹最小堆的實現。 PriorityBlockingQueue 數據結構和 PriorityQueue 一致,而線程安全性使用的是 ReentrantLock。html
// 最大可分配隊列容量 Integer.MAX_VALUE - 8,減 8 是由於有的 VM 實如今數組頭有些內容 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 默認隊列容量11,這裏不是 HashMap,不須要 hash 取餘,所以沒必要是 2^n private static final int DEFAULT_INITIAL_CAPACITY = 11; // 數組結構,是二叉樹最小堆的實現 private transient Object[] queue; private transient int size;
PriorityBlockingQueue 使用 ReentrantLock 保證數據安全性,數據結構使用的是數組。PriorityBlockingQueue 數組的結構和 PriorityQueue 一致,是基於平衡二叉堆實現,父節點下標是 n,左節點則是 2n + 1,右節點是 2n + 2。queue[0] 永遠都是最小的元素。java
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 1. 擴容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; // 2. 將節點 e 插入數據 array 的第 n 個位置 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
代碼很簡單,添加元素時 offer 作了兩件事:一是判斷是否須要擴容(tryGrow),二是將元素 e 插入到數組中(siftUpComparable)。先看一下如何進行擴容的,至於元素添加在 poll 時再一塊兒分析。數組
// 集合中元素個數 size>=queue.length 則進行擴容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); // tryGrow 最終只有一個線程能擴容成功,其它線程經過 while 自旋檢查當前擴容是否完畢 private void tryGrow(Object[] array, int oldCap) { // 1. 釋放鎖,這樣在數組擴容期間其它線程能夠正常出隊 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; // 2. allocationSpinLock 是數組擴容的獨佔鎖 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // oldGap<64則擴容新增oldcap+2,否者擴容50%,而且最大爲MAX_ARRAY_SIZE int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } // 3.1 若是 oldCap=MAX_ARRAY_SIZE 則 newCap 就會變成負數 // 3.2 若是 queue 已經改變,則有其它線程已經完成擴容 ok // 線程1已經完成擴容,線程2執行到這裏時 queue=newArray if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } // 4. 線程1 cas 成功後,線程2會進入這個地方,而後線程2讓出 cpu // 儘可能讓線程1執行下面代碼獲取鎖,可是這得不到確定的保證。 if (newArray == null) // back off if another thread is allocating Thread.yield(); // 5. 從新獲取鎖,只有一個線程能夠最終完成數組的擴容。 // cas 只進行了數組的初始化,即 newArray=new Object[newCap],可能有多個線程都成功了 lock.lock(); // 6. 數組元素拷貝到新數組中,完成擴容。可能有多個線程都初始化了 newArray if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
tryGrow 目的是擴容,這裏要思考下爲啥在擴容前要先釋放鎖,而後使用 cas 控制只有一個線程能夠擴容成功呢?安全
其實這裏不先釋放鎖也是能夠的,也就是在整個擴容期間一直持有鎖,可是擴容是須要花時間的,若是擴容的時候還佔用鎖,那麼其餘線程在這個時候是不能進行出隊和入隊操做的,這大大下降了併發性。數據結構
因此在擴容前釋放鎖,這容許其餘出隊線程能夠進行出隊操做,可是因爲釋放了鎖,因此也容許在擴容時候進行入隊操做,這就會致使多個線程進行擴容會出現問題,因此這裏使用了一個 spinlock 用 cas 控 制只有一個線程能夠進行擴容,失敗的線程調用 Thread.yield() 讓出 cpu,目的意在讓擴容線程擴容後優先調用 lock.lock 從新獲取鎖,可是這得不到必定的保證,有可能調用 Thread.yield() 的線程先獲取了鎖。若是這時候擴容線程還沒擴容完畢,其餘線程是經過自旋檢查當前擴容是否完畢。併發
那 copy 元素數據到新數組爲啥放到獲取鎖後面那?緣由應該是由於可見性問題,由於 queue 並無被 volatile 修飾。另外有可能在擴容時候進行了出隊操做,若是直接拷貝可能看到的數組元素不是最新的。而經過調用 Lock 後,獲取的數組則是最新的,而且在釋放鎖前 數組內容不會變化。oop
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } // 出隊 private E dequeue() { int n = size - 1; // 1. 沒有元素直接返回 null if (n < 0) return null; else { Object[] array = queue; // 2. array[0] 永遠都是最小的元素 E result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; // 3. 由於 array[0] 已經出隊,如今須要將元素 array[n] 插入到 0 這個位置 if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; // 4. 返回 array[0] return result; } }
// 數組結構,是二叉樹最小堆的實現,array[0] 永遠是優先級最高的元素 private transient Object[] queue; // offer 時將元素 e 插入到節點 n 位置 siftUpComparable(n, e, array); // poll 時將最後一個元素 array[n] 插入到 0 位置 siftDownComparable(0, x, array, n);
(1) siftUpComparable源碼分析
// 將元素 x 插入數據 array 的第 k 個位置 private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; }
這個排序看過去有些奇怪,怎麼有 parent,而且下標是 (k-1)>>>1 呢?其實這裏的操做就反應了優先隊列的真正數據結構,其其實是一個二叉樹,將二叉樹存儲在數組之中而已。根節點就是數組的 0 位。下圖給出其具體結構:ui
PriorityQueue 是一個徹底二叉樹,且不容許出現 null 節點,其父節點都比葉子節點小,這個是堆排序中的小頂堆。二叉樹存入數組的方式很簡單,就是從上到下,從左到右。徹底二叉樹能夠和數組中的位置一一對應:this
如今在看 siftUpComparable 代碼就輕鬆多了,實際上就是將要插入的元素 x 和它的父節點作對比,若是比父節點大就一直向上移動。由於比較後元素是在向上移動,因此叫 siftUpComparable
(2) siftDownComparable
// 將元素 x 插入數據 array 的第 k 個位置,n 表示當前數組的最後一個位置 private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } }
siftDownComparable(0, x, array, n) 將元素 x 和第 0 位的左右子節點進行比較,若是 x 大於這兩個子節點則向下移動,小的子節點則上移。這樣 array[0] 又變成最小的值了。
參考:
天天用心記錄一點點。內容也許不重要,但習慣很重要!