Java併發包源碼學習系列:阻塞隊列實現之PriorityBlockingQueue源碼解析

系列傳送門:html

PriorityBlockingQueue概述

PriorityBlockingQueue是一個支持優先級的無界阻塞隊列,基於數組的二叉堆,其實就是線程安全的PriorityQueuejava

指定排序規則有兩種方式:編程

  1. 傳入PriorityBlockingQueue中的元素實現Comparable接口,自定義compareTo方法。
  2. 初始化PriorityBlockingQueue時,指定構造參數Comparator,自定義compare方法來對元素進行排序。

須要注意的是若是兩個對象的優先級相同,此隊列並不保證它們之間的順序。api

PriorityBlocking能夠傳入一個初始容量,其實也就是底層數組的最小容量,以後會使用tryGrow擴容。數組

類圖結構及重要字段

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = 5595510919245408276L;
    
    /**
     * 默認的容量爲 11 
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * 數組的最大容量
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * 平衡二叉堆 實現 優先級隊列, 底層用數組結構存儲二叉堆 
     * 假設一個n爲數組中的索引,數組是從索引0開始存儲元素的,所以
     * queue[n]的左兒子存在queue[2*n+1]位置,右兒子存在queue[2*(n+1)]位置
     * 
     * 根據比較器排序,若是沒有指定比較器,則按照元素天然順序排序。
     * 默認是小根堆,第一個元素是堆中最小元素
     *
     */
    private transient Object[] queue;

    /**
     * 優先級隊列中元素個數
     */
    private transient int size;

    /**
     * 比較器,若是按照天然序排序,那麼此屬性可設置爲 null
     */
    private transient Comparator<? super E> comparator;

    /**
     * 全部須要保證線程安全的操做都要先獲取這把鎖
     */
    private final ReentrantLock lock;

    /**
     * 隊列空的時候,條件隊列存放阻塞線程,爲何沒有隊列滿呢?緣由在於它是無界隊列
     */
    private final Condition notEmpty;

    /**
     * 用於CAS操做,後面會看到,這個字段用於擴容時
     */
    private transient volatile int allocationSpinLock;

    /**
     * 只用於序列化和反序列化
     */
    private PriorityQueue<E> q;
    
}

什麼是二叉堆

這邊安利一個數據結構的可視化網站:數據結構可視化網站安全

二叉堆是徹底二叉樹,除了最後一層,其餘節點都是滿的,且最後一層節點從左到右排列,以下:數據結構

二叉堆分爲大根堆和小根堆,通常來講都是小根堆任意一個節點都小於它的左右子節點的值,根節點就是堆中的最小的值併發

堆可使用數組存儲,數組的下標能夠從0開始,也能夠從1開始,各有好處,固然JDK中堆的實現是從0開始的哦。工具

  • 若是從索引爲1的位置開始存儲元素,第k個節點的左右子節點的下標:(2k, 2k + 1),父節點的座標能夠很容易求:floor(k / 2),floor表示下取整。
  • 若是從0開始,第k個節點的左右子節點的下標:(2k + 1, 2k + 2),父節點的座標也能夠很容易求:floor((k - 1) / 2),floor表示下取整。

我以前手寫堆的時候,都是使用的第一種方式,我就提一嘴第一種的思路,使用第一種思路介紹一下小根堆的幾個基本操做,以後咱們會詳細分析JDK中的實現,也就是第二種。oop

堆的基本操做

堆中最重要核心的兩個操做即是如何將元素向上調整or向下調整

向上調整void up(int u)

以插入操做爲例,二話不說,直接在數組末尾插上元素,接着再一一貫上層比較,比較的原則的就是:咱們只須要比較當前這個數是否是比它的父節點小,若是比它小,就進行交換,不然則中止交換。

思路很是簡單,你能夠思考一下其合理性:咱們想,若是咱們每次插入數據的時候,都作一次向上調整的操做,咱們必定可以保證,每次都是在一個符合條件的二叉堆上插入數,對吧。那這樣的話,自己就知足任何一個父節點一定比其子節點小的條件,若是待調整節點更小,那他必然也小於另外一個子節點,因爲咱們一直迭代作,最後必定會知足條件。

// 向上調整 u 是當前的索引
    private void up (int u) {
        // 若是發現當前的節點比父節點小
        while (u / 2 > 0 && h[u / 2] > h[u]) {
            // 就和父節點交換一下
            heap_swap(u / 2, u);
            u /= 2;
        }
    }

這邊也給出插入一個元素x的僞代碼:

void insert(int x){
        size ++; // 最後一個元素指針
        heap[size] = x; // 賦值
        up(size); // 向上調整
    }

向下調整void down(int u)

爲何須要向下調整呢,以刪除操做爲例,咱們知道,要在數組頭部刪除一個元素且保證後面元素的順序是比較麻煩的,咱們一般在遇到刪除堆頂的時候,直接將數組的最後一個元素heap[size--]將heap[0]覆蓋,接着執行down(0),自上而下地執行調整操做。

調整的規則也比較簡單,其實就是判斷當前元素和左右孩子的大小關係,和最小的那個交換,遞歸地去調整,直到沒法交換爲止。

// 向下調整
    private void down (int u) {
        int t = u;
        if (u * 2 <= size && h[u * 2] < h[t]) t = u * 2; // 判斷左兒子是否存在, 且若是左兒子比它小,就更新座標
        if (u * 2 + 1 <= size && h[u * 2 + 1] < h[t]) t = u * 2 + 1; // 同理
        if (u != t) { // 若是須要交換
            heap_swap(u, t);// 交換一下
            down(t); // 繼續作這個操做
        }
    }

這邊給出刪除小根堆中的最小值的僞代碼:

int poll(){
        int res = heap[1]; // 堆頂是最小值
        heap[1] = heap[size--]; // 直接將最後一個元素覆蓋堆頂,並size-1
        down(1); // 執行向下調整
        return res;
    }

咱們但願刪除第k個元素或者更新第k個元素都是比較簡便的:

// 刪除位置爲k的元素
void removeAt(int k){
    heap[k] = heap[size --];
    // 分別作一次向下操做和向上操做,其中一個判斷一定只會執行一次
    down(k);
    up(k);
}
// 更新位置爲k的元素爲x
void updateAt(int k, int x){
    heap[k] = x;
    down(k);
    up(k);
}

到這裏,我就用簡略代碼簡單地介紹了二叉堆的核心操做,咱們待會會看到其實源碼的思想不變,可是考慮的東西會更多一些,若是到這裏你可以徹底明白,源碼的實現其實也就不難啦。

構造器

// 使用默認的容量11
	public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

	// 指定容量大小
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
	
	// 指定容量和比較器
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

    // 傳入集合
    public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        // 須要堆化,後面說明該方法
        if (heapify)
            heapify();
    }

接下來我將會把一些核心組件方法都拎出來分析一下,他們頗有可能會在後面的操做方法中被頻繁調用,因此接下來很重要哦。

擴容方法tryGrow

咱們說了,PriorityBlockingQueue是無界的隊列,傳入的capacity也不是最終的容量,它和咱們以前學習的許多集合同樣,有動態擴容的機制,咱們先來瞅一瞅:

private void tryGrow(Object[] array, int oldCap) {
        // 釋放鎖的操做
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        // CAS 操做將allocationSpinLock變爲1, 若是已是1了,就跳到下面
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                // 節點個數<64  new = old + old + 2
                // 節點個數>=64 new = old + old / 2
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // 但願節點數較小的時候,增加快一點
                                       (oldCap >> 1));
                // 擴容以後越界了
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                //queue != array 的狀況 其餘線程已經爲queue分配了其餘的空間
                if (newCap > oldCap && queue == array)
                    // 分配一個加大容量的數組
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        // 多是其餘線程在進行擴容操做
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        // 從新獲取鎖
        lock.lock();
        // 複製元素
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

能夠發現的是,在動態擴容以前,將lock釋放,代表這個方法必定是在獲取鎖以後才被調用的。

爲啥在擴容以前先釋放鎖,並使用CAS控制只有一個線程能夠擴容成功呢?

擴容是須要時間的,若是在整個擴容期間一直持有鎖的話,其餘線程在這時是不能進行出隊和入隊操做的,這大大下降了併發性能。

spinlock鎖使用CAS控制只有一個線程能夠進行擴容,失敗的線程執行Thread.yield()讓出CPU,目的是讓擴容的線程優先調用lock.lock()優先獲取鎖,可是這得不到保證,所以須要後面的判斷。

另外自旋鎖變量allocationSpinLock在擴容結束後重置爲0,並無使用UNSAFE方法的CAS進行設置是由於:

  1. 同時只可能有一個線程獲取到該鎖。
  2. allocationSpinLock是volatile修飾。

源碼中向上調整和向下調整實現

準確地說,源碼中應該是調整 + 插入,不斷調整,找到插入的位置,給該位置賦值。但,若是你理解了前面的調整思想,相信你會很快理解源碼中的實現。

siftUpComparable

將x插入到堆中,注意這裏是不斷和父節點比較,最終找到插入位置。

// 將x插入到堆中,注意這裏是不斷和父節點比較,最終找到插入位置
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    // 若是不傳入Comparable的實現,這裏會強轉失敗,拋出異常
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        //a[k]的父節點位置
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        // 若是比父節點大就不用交換了
        if (key.compareTo((T) e) >= 0)
            break;
        // 將父元素移下來
        array[k] = e;
        // k向上移
        k = parent;
    }
    // 退出循環後,k的位置就是待插入的位置
    array[k] = key;
}

siftDownComparable

移除k位置的元素,並調整二叉堆,具體思想就是,通常經過向下調整找到覆蓋位置,用x覆蓋便可,x通常能夠從隊尾獲取。

// 這裏的k就是當前空缺的位置,x就是覆蓋元素好比咱們以前說的隊尾元素
	private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            // 二叉堆有一個性質,最後一層葉子最多 佔 1 / 2
            int half = n >>> 1;           // loop while a non-leaf
            // 循環非葉子節點
            while (k < half) {
                // 左孩子
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                // 右孩子
                int right = child + 1;
                // 始終用左孩子c表示最小的數
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    // 這裏若是右孩子小,更新child = right
                    c = array[child = right];
                // 若是當前的k比左孩子還要小,那就沒必要交換了,待在那正好!
                if (key.compareTo((T) c) <= 0)
                    break;
                // 小的數向上移,k向下更新
                array[k] = c;
                k = child;
            }
            // 退出循環時,必定找到了x覆蓋的位置,覆蓋便可
            array[k] = key;
        }
    }

你看看,理解了調整的思想以後,看起代碼來是否是就相對輕鬆不少啦?

heapify建堆or堆化

heapify方法可使節點任意放置的二叉樹,在O(N)的時間複雜度內轉變爲二叉堆,具體作法是,從最後一層非葉子節點自底向上執行down操做

private void heapify() {
        Object[] array = queue;
        int n = size;
        int half = (n >>> 1) - 1; // 最後一層非葉子層
        // 兩種排序規則下, 自底向上 地執行 siftdown操做
        Comparator<? super E> cmp = comparator;
        if (cmp == null) {
            for (int i = half; i >= 0; i--)
                siftDownComparable(i, (E) array[i], array, n);
        }
        else {
            for (int i = half; i >= 0; i--)
                siftDownUsingComparator(i, (E) array[i], array, n, cmp);
        }
    }

put非阻塞式插入

put方法是非阻塞的,可是操做時須要獲取獨佔鎖,若是插入元素後超過了當前的容量,會調用tryGrow進行動態擴容,接着從插入元素位置進行向上調整,插入成功後,喚醒正在阻塞的讀線程。

public void put(E e) {
        offer(e); // 無界隊列,插入操做不須要阻塞哦
    }

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        // 當前隊列中的元素個數 >= 數組的容量
        while ((n = size) >= (cap = (array = queue).length))
            // 動態擴容
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            // 下面這個if else根據是否傳入比較器選擇對應的方法,大差不差
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            // 喚醒正在阻塞的讀線程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

take阻塞式獲取

take方法是阻塞式的,若是隊列爲空,則當前線程阻塞在notEmpty維護的條件隊列中。

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 獲取鎖
        lock.lockInterruptibly();
        E result;
        try {
            // 出隊
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

	// 出隊邏輯
    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            // 保存隊頭的值,也就是返回這個值
            E result = (E) array[0];
            // 準備將隊尾的值 覆蓋第一個
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

remove移除指定元素

public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 找到匹配元素下標
            int i = indexOf(o);
            if (i == -1)
                return false;
            // 移除該下標的元素
            removeAt(i);
            return true;
        } finally {
            lock.unlock();
        }
    }
	// 遍歷底層數組, 找到匹配元素的下標
    private int indexOf(Object o) {
        if (o != null) {
            Object[] array = queue;
            int n = size;
            for (int i = 0; i < n; i++)
                if (o.equals(array[i]))
                    return i;
        }
        return -1;
    }

	// 移除下標爲i的元素
    private void removeAt(int i) {
        Object[] array = queue;
        int n = size - 1;
        if (n == i) // removed last element
            array[i] = null;
        else {
            // 老套路了,讓隊尾的元素覆蓋這裏
            E moved = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            // 向下調整
            if (cmp == null)
                siftDownComparable(i, moved, array, n);
            else
                siftDownUsingComparator(i, moved, array, n, cmp);
            // 向下調整沒成功,向上調整
            if (array[i] == moved) {
                if (cmp == null)
                    siftUpComparable(i, moved, array);
                else
                    siftUpUsingComparator(i, moved, array, cmp);
            }
            // 這也是慣用作法,上下分別作一次調整
        }
        size = n;
    }

總結

PriorityBlockingQueue是一個支持優先級的無界阻塞隊列,基於數組的二叉堆,其實就是線程安全的PriorityQueue

內部使用一個獨佔鎖來同時控制只有一個線程執行入隊和出隊操做,只是用notEmpty條件變量來控制讀線程的阻塞,由於無界隊列中入隊操做是不會阻塞的。

指定排序規則有兩種方式:

  1. 傳入PriorityBlockingQueue中的元素實現Comparable接口,自定義compareTo方法。
  2. 初始化PriorityBlockingQueue時,指定構造參數Comparator,自定義compare方法來對元素進行排序。

底層數組是可動態擴容的:先釋放鎖,保證擴容操做和讀操做能夠同時進行,提升吞吐量,接着經過CAS自旋保證擴容操做的併發安全,若是原容量爲old_c,擴容後容量爲new_c,知足:

if (old_c < 64) 
    new_c = 2 * old_c + 2
else 
    new_c = 1.5 * old_c

heapify方法可使節點任意放置的二叉樹,在O(N)的時間複雜度內轉變爲二叉堆,具體作法是,從最後一層非葉子節點自底向上執行down操做

參考閱讀

相關文章
相關標籤/搜索