JUC源碼分析-集合篇(七)PriorityBlockingQueue

JUC源碼分析-集合篇(七)PriorityBlockingQueue

PriorityBlockingQueue 是帶優先級的無界阻塞隊列,每次出隊都返回優先級最高的元素,是二叉樹最小堆的實現。 PriorityBlockingQueue 數據結構和 PriorityQueue 一致,而線程安全性使用的是 ReentrantLock。html

1. 基本屬性

// 最大可分配隊列容量 Integer.MAX_VALUE - 8,減 8 是由於有的 VM 實如今數組頭有些內容
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默認隊列容量11,這裏不是 HashMap,不須要 hash 取餘,所以沒必要是 2^n
private static final int DEFAULT_INITIAL_CAPACITY = 11;

// 數組結構,是二叉樹最小堆的實現
private transient Object[] queue;
private transient int size;

PriorityBlockingQueue 使用 ReentrantLock 保證數據安全性,數據結構使用的是數組。PriorityBlockingQueue 數組的結構和 PriorityQueue 一致,是基於平衡二叉堆實現,父節點下標是 n,左節點則是 2n + 1,右節點是 2n + 2。queue[0] 永遠都是最小的元素。java

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

2. 入隊 offer

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 1. 擴容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        // 2. 將節點 e 插入數據 array 的第 n 個位置
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

代碼很簡單,添加元素時 offer 作了兩件事:一是判斷是否須要擴容(tryGrow),二是將元素 e 插入到數組中(siftUpComparable)。先看一下如何進行擴容的,至於元素添加在 poll 時再一塊兒分析。數組

// 集合中元素個數 size>=queue.length 則進行擴容
while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);

// tryGrow 最終只有一個線程能擴容成功,其它線程經過 while 自旋檢查當前擴容是否完畢
private void tryGrow(Object[] array, int oldCap) {
    // 1. 釋放鎖,這樣在數組擴容期間其它線程能夠正常出隊
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // 2. allocationSpinLock 是數組擴容的獨佔鎖
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // oldGap<64則擴容新增oldcap+2,否者擴容50%,而且最大爲MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // 3.1 若是 oldCap=MAX_ARRAY_SIZE 則 newCap 就會變成負數
            // 3.2 若是 queue 已經改變,則有其它線程已經完成擴容 ok
            //     線程1已經完成擴容,線程2執行到這裏時 queue=newArray
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    // 4. 線程1 cas 成功後,線程2會進入這個地方,而後線程2讓出 cpu
    //    儘可能讓線程1執行下面代碼獲取鎖,可是這得不到確定的保證。
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();

    // 5. 從新獲取鎖,只有一個線程能夠最終完成數組的擴容。
    //    cas 只進行了數組的初始化,即 newArray=new Object[newCap],可能有多個線程都成功了
    lock.lock();
    // 6. 數組元素拷貝到新數組中,完成擴容。可能有多個線程都初始化了 newArray
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

tryGrow 目的是擴容,這裏要思考下爲啥在擴容前要先釋放鎖,而後使用 cas 控制只有一個線程能夠擴容成功呢?安全

其實這裏不先釋放鎖也是能夠的,也就是在整個擴容期間一直持有鎖,可是擴容是須要花時間的,若是擴容的時候還佔用鎖,那麼其餘線程在這個時候是不能進行出隊和入隊操做的,這大大下降了併發性。數據結構

因此在擴容前釋放鎖,這容許其餘出隊線程能夠進行出隊操做,可是因爲釋放了鎖,因此也容許在擴容時候進行入隊操做,這就會致使多個線程進行擴容會出現問題,因此這裏使用了一個 spinlock 用 cas 控 制只有一個線程能夠進行擴容,失敗的線程調用 Thread.yield() 讓出 cpu,目的意在讓擴容線程擴容後優先調用 lock.lock 從新獲取鎖,可是這得不到必定的保證,有可能調用 Thread.yield() 的線程先獲取了鎖。若是這時候擴容線程還沒擴容完畢,其餘線程是經過自旋檢查當前擴容是否完畢。併發

那 copy 元素數據到新數組爲啥放到獲取鎖後面那?緣由應該是由於可見性問題,由於 queue 並無被 volatile 修飾。另外有可能在擴容時候進行了出隊操做,若是直接拷貝可能看到的數組元素不是最新的。而經過調用 Lock 後,獲取的數組則是最新的,而且在釋放鎖前 數組內容不會變化。oop

3. 出隊 poll

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}
// 出隊
private E dequeue() {
    int n = size - 1;
    // 1. 沒有元素直接返回 null
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        // 2. array[0] 永遠都是最小的元素
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        // 3. 由於 array[0] 已經出隊,如今須要將元素 array[n] 插入到 0 這個位置
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        // 4. 返回 array[0]
        return result;
    }
}

4. 數據結構

// 數組結構,是二叉樹最小堆的實現,array[0] 永遠是優先級最高的元素
private transient Object[] queue;

// offer 時將元素 e 插入到節點 n 位置
siftUpComparable(n, e, array);
// poll 時將最後一個元素 array[n] 插入到 0 位置
siftDownComparable(0, x, array, n);

(1) siftUpComparable源碼分析

// 將元素 x 插入數據 array 的第 k 個位置
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

這個排序看過去有些奇怪,怎麼有 parent,而且下標是 (k-1)>>>1 呢?其實這裏的操做就反應了優先隊列的真正數據結構,其其實是一個二叉樹,將二叉樹存儲在數組之中而已。根節點就是數組的 0 位。下圖給出其具體結構:ui

最小堆二叉樹

PriorityQueue 是一個徹底二叉樹,且不容許出現 null 節點,其父節點都比葉子節點小,這個是堆排序中的小頂堆。二叉樹存入數組的方式很簡單,就是從上到下,從左到右。徹底二叉樹能夠和數組中的位置一一對應:this

  • 左葉子節點 = 父節點下標 * 2 + 1
  • 右葉子節點 = 父節點下標 * 2 + 2
  • 父節點 = (葉子節點 - 1) / 2

如今在看 siftUpComparable 代碼就輕鬆多了,實際上就是將要插入的元素 x 和它的父節點作對比,若是比父節點大就一直向上移動。由於比較後元素是在向上移動,因此叫 siftUpComparable

(2) siftDownComparable

// 將元素 x 插入數據 array 的第 k 個位置,n 表示當前數組的最後一個位置
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

siftDownComparable(0, x, array, n) 將元素 x 和第 0 位的左右子節點進行比較,若是 x 大於這兩個子節點則向下移動,小的子節點則上移。這樣 array[0] 又變成最小的值了。

參考:

  1. 併發隊列-無界阻塞優先級隊列PriorityBlockingQueue原理探究
  2. Java之集合(六)PriorityQueue

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索