Java併發編程筆記之PriorityBlockingQueue源碼分析

JDK 中無界優先級隊列PriorityBlockingQueue 內部使用堆算法保證每次出隊都是優先級最高的元素,元素入隊時候是如何建堆的,元素出隊後如何調整堆的平衡的?算法

PriorityBlockingQueue是帶優先級的無界阻塞隊列,每次出隊都返回優先級最好或者最低的元素,內部是平衡二叉樹堆的實現。數組

首先看一下PriorityBlockingQueue類圖結構,以下:數據結構

能夠看到PriorityBlockingQueue內部有個數組queue用來存放隊列元素,size用來存放隊列元素個數,allocationSpinLock 是個自旋鎖,用CAS操做來保證只有一個線程能夠擴容隊列,多線程

狀態爲0 或者1,其中0標示當前沒有在進行擴容,1標示當前正在擴容。併發

 

咱們首先看看PriorityBlockingQueue的構造函數,源碼以下:函數

 private static final int DEFAULT_INITIAL_CAPACITY = 11;

 public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

如上構造函數,默認隊列容量爲11,默認比較器爲null,也就是使用元素的compareTo方法進行比較來肯定元素的優先級,這意味着隊列元素必須實現Comparable接口。oop

 

接下來咱們主要看PriorityBlockingQueue的幾個操做的源碼,以下:性能

  1.offer 操做,offer操做的做用是在隊列插入一個元素,因爲是無界隊列,因此一直返回true,源碼以下:this

public boolean offer(E e) {

    if (e == null)
        throw new NullPointerException();

    //獲取獨佔鎖
    final ReentrantLock lock = this.lock;
    lock.lock();

    int n, cap;
    Object[] array;

    //若是當前元素個數>=隊列容量,則擴容(1)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);

    try {
        Comparator<? super E> cmp = comparator;

        //默認比較器爲null (2)
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            //自定義比較器 (3)
            siftUpUsingComparator(n, e, array, cmp);

        //隊列元素增長1,而且激活notEmpty的條件隊列裏面的一個阻塞線程(9)
        size = n + 1;
        notEmpty.signal();//激活調用take()方法被阻塞的線程
    } finally {
        //釋放獨佔鎖
        lock.unlock();
    }
    return true;
}

能夠看到上面代碼,offer操做主流程比較簡單,接下來主要關注PriorityBlockingQueue是如何進行擴容的和內部如何創建堆的,首先看擴容源碼以下:spa

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); //釋放獲取的鎖
    Object[] newArray = null;

    //cas成功則擴容(4)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //oldGap<64則擴容新增oldcap+2,否者擴容50%,而且最大爲MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // 若是一開始容量很小,則擴容寬度變大
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // 可能溢出
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }

    //第一個線程cas成功後,第二個線程會進入這個地方,而後第二個線程讓出cpu,儘可能讓第一個線程執行下面點獲取鎖,可是這得不到確定的保證。(5)
    if (newArray == null) // 若是兩外一個線程正在分配,則讓出
        Thread.yield();
    lock.lock();//(6)
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

tryGrow 目的是擴容,這裏要思考下爲啥在擴容前要先釋放鎖,而後使用 cas 控制只有一個線程能夠擴容成功呢?

其實這裏不先釋放鎖也是能夠的,也就是在整個擴容期間一直持有鎖,可是擴容是須要花時間的,若是擴容的時候還佔用鎖,那麼其餘線程在這個時候是不能進行出隊和入隊操做的,

這大大下降了併發性。因此爲了提升性能,使用CAS控制只有一個線程能夠進行擴容,而且在擴容前釋放了鎖,讓其餘線程能夠進行入隊和出隊操做。

 

spinlock鎖使用CAS控制只有一個線程能夠進行擴容,CAS失敗的線程會調用Thread.yield() 讓出 cpu,目的是爲了讓擴容線程擴容後優先調用 lock.lock 從新獲取鎖,

可是這得不到必定的保證。有可能yield的線程在擴容線程擴容完成前已經退出,並執行了代碼(6)獲取到了鎖。若是當前數組擴容還沒完畢,當前線程會再次調用tryGrow方法,

而後釋放鎖,這又給擴容線程獲取鎖提供了機會,若是這時候擴容線程還沒擴容完畢,則當前線程釋放鎖後又調用yield方法讓出CPU。可知當擴容線程進行擴容期間,

其餘線程是原地自旋經過代碼(1)檢查當前擴容是否完畢,等擴容完畢後才退出代碼(1)的循環。

 

當擴容線程擴容完畢後會重置自旋鎖變量allocationSpinLock 爲 0,這裏並無使用UNSAFE方法的CAS進行設置是由於同時只可能有一個線程獲取了該鎖,而且 allocationSpinLock 被修飾爲了 volatile。

當擴容線程擴容完畢後會執行代碼 (6) 獲取鎖,獲取鎖後複製當前 queue 裏面的元素到新數組。

 

接下來咱們看看建堆算法,源碼以下:

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;

    //隊列元素個數>0則判斷插入位置,否者直接入隊(7)
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;(8)
}

接下來用圖來解釋上面的算法過程,假設隊列初始化容量爲2,建立的優先級隊列的泛型參數爲Integer。

首先調用隊列offer(2)方法,但願插入元素2到隊列,插入前隊列狀態以下圖所示:

 

 首先執行代碼(1),從上圖變量值能夠知道判斷值爲false,因此緊接着執行代碼(2),因爲 k=n=size=0 因此代碼(7)判斷結果爲 false,因此會執行代碼(8)直接把元素 2 入隊,最後執行代碼(9)設置 size 的值加 1,這時候隊列的狀態以下圖:

而後調用隊列的 offer(4) 時候,首先執行代碼(1),從上圖變量值可知判斷爲 false,因此執行代碼(2),因爲 k=1, 因此進入 while 循環,因爲 parent=0;e=2;key=4; 默認元素比較器是使用元素的 compareTo 方法,

可知 key>e 因此執行 break 退出 siftUpComparable 中的循環; 而後把元素存到數組下標爲 1 的地方,最後執行代碼(9)設置 size 的值加 1,這時候隊列狀態爲:

而後調用隊列的offer(6) 時候,首先執行代碼(1),從上圖變量值知道這時候判斷值爲true,因此嗲用tryGrow進行數組擴容,因爲2 < 64 因此newCap=2 + (2+2)=6; 而後建立新數組並拷貝,

而後調用siftUpComparable 方法,因爲 k=2>0 進入 while 循環,因爲 parent=0;e=2;key=6;key>e 因此 break 後退出 while 循環; 並把元素 6 放入數組下標爲 2 的地方,最後設置 size 的值加 1,如今隊列狀態:

而後調用隊列的 offer(1) 時候,首先執行代碼(1),從上圖變量值知道此次判斷值爲 false,因此執行代碼(2),因爲k=3, 因此進入 while 循環,因爲parent=0;e=4;key=1; key<e,因此把元素 4 複製到數組下標爲 3 的地方,

而後 k=0 退出 while 循環;而後把元素 1 存放到下標爲 0 地方,如今狀態:

此時此刻的二叉樹堆的樹形圖以下:

可知堆的根元素是 1,也就是這是一個最小堆,那麼當調用這個優先級隊列的 poll 方法時候,會一次返回堆裏面值最小的元素。

 

  2.poll操做,poll 操做做用是獲取隊列內部堆樹的根節點元素,若是隊列爲空,則返回 null。源碼以下:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();//獲取獨佔鎖
    try {
        return dequeue();
    } finally {
        lock.unlock();//釋放獨佔鎖
    }
}

如上代碼能夠知道在進行出隊操做過程當中要先加鎖,這意味着,當前線程進行出隊操做的時候,其餘線程不能再進行入隊和出隊操做,可是從前面介紹offer函數的時候,知道這時候能夠有其餘線程進行擴容,

接下來,咱們要看一下出隊操做的dequeue方法的源碼以下:

private E dequeue() {

    //隊列爲空,則返回null
    int n = size - 1;
    if (n < 0)
        return null;
    else {

        //獲取隊頭元素(1)
        Object[] array = queue;
        E result = (E) array[0];

        //獲取隊尾元素,並值null(2)
        E x = (E) array[n];
        array[n] = null;

        Comparator<? super E> cmp = comparator;
        if (cmp == null)//(3)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;//(4)
        return result;
    }
}

如上代碼,若是隊列爲空則直接返回 null,否者執行代碼(1)獲取數組第一個元素做爲返回值存放到變量 Result,這裏要注意一下數組裏面第一個元素是優先級最小或者最大的元素,出隊操做就是返回這個元素。

而後代碼(2)獲取隊列尾部元素存放到變量X,而且置空尾部節點,而後執行代碼(3)插入變量X 到數組下標爲 0 的位置後,從新調整堆爲最大或者最小堆,而後返回。

這裏重要的是看如何去掉堆的根節點後,使用剩下的節點從新調整爲一個最大或者最小堆。

接下來咱們看看siftDownComparable 的源碼,以下:

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // 假設左邊子樹最小
                Object c = array[child];(5int right = child + 1;(6)
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)(8)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;(9)
        }
    }

下面咱們結合圖來模擬上面調整堆的算法過程,接着上節隊列的狀態繼續講解,上節隊列元素序列爲 1,2,6,4:

第一次調用隊列的 poll() 方法時候,首先執行代碼(1)(2),這時候變量 size =4;n=3;result=1;x=4; 這時候隊列狀態圖以下:

而後執行代碼(3),調整堆後隊列狀態圖,以下:

第二次調用隊列的 poll() 方法時候,首先執行代碼(1)(2),這時候變量 size =3;n=2;result=2;x=6; 這時候隊列狀態圖,以下:

而後執行代碼(3)調整堆後隊列狀態圖,以下:

第三次調用隊列的 poll() 方法時候,首先執行代碼(1)(2),這時候變量 size =2;n=1;result=4;x=6; 這時候隊列狀態圖,以下:

而後執行代碼(3)調整堆後隊列狀態圖,以下:

第四次直接返回元素 6.

 

 

接下來重點說說 siftDownComparable 這個調整堆的算法: 首先說下堆調整的思路,因爲隊列數組第 0 個元素爲樹根,出隊時候要被移除,這時候數組就不在是最小堆了,因此須要調整堆,

具體是要從被移除的樹根的左右子樹中找一個最小的值來當樹根,左右子樹又會看本身做爲樹根節點的樹的左右子樹裏面哪一個是最小值,這是一個遞歸的過程,直到樹葉節點結束遞歸,

若是不明白,下面結合圖形來講明,假如當前隊列內容以下:

對應的二叉堆樹以下:

這時候若是調用了 poll(); 那麼 result=2;x=11;隊列末尾的元素設置爲 null 後,剩下的元素調整堆的步驟以下圖:

 

 

如上圖(1)樹根的 leftChildVal = 4;rightChildVal = 6; 4<6; 因此 c=4; 而後 11>4 也就是 key>c;因此使用元素 4 覆蓋樹根節點的值,如今堆對應的樹如圖(2)。

而後樹根的左子樹樹根的左右孩子節點中 leftChildVal = 8;rightChildVal = 10; 8<10; 因此 c=8; 而後發現 11>8 也就是 key>c;因此元素 8 做爲樹根左子樹的根節點,如今樹的形狀如圖(3),

這時候判斷 k<half 爲 false 就會退出循環,而後把 x=11 設置到數組下標爲 3 的地方,這時候堆樹如圖(4),至此調整堆完畢,siftDownComparable 返回 result=2,poll 方法也返回了。

 

 

  3.put操做,put 操做內部調用的 offer, 因爲是無界隊列,因此不須要阻塞,源碼以下:

public void put(E e) {
    offer(e); // never need to block
}

 

  4.take 操做,take 操做做用是獲取隊列內部堆樹的根節點元素,若是隊列爲空則阻塞,源碼以下:

public E take() throws InterruptedException {
    //獲取鎖,可被中斷
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {

        //若是隊列爲空,則阻塞,把當前線程放入notEmpty的條件隊列
        while ( (result = dequeue()) == null)
            notEmpty.await();//阻塞當前線程
    } finally {
        lock.unlock();//釋放鎖
    }
    return result;
}

如上代碼,首先經過 lock.lockInterruptibly() 獲取獨佔鎖,這個方式獲取的鎖是對中斷進行響應的。而後調用 dequeue 方法返回堆樹根節點元素,若是隊列爲空,則返回 false,

而後當前線程調用 notEmpty.await() 阻塞掛起當前線程,直到有線程調用了 offer()方法(offer 方法內在添加元素成功後調用了 notEmpty.signal 方法會激活一個阻塞在 notEmpty 的條件隊列裏面的一個線程)。

另外這裏使用 while 而不是 if 是爲了不虛假喚醒。

 

  5.size操做,獲取隊列元個數,以下代碼,在返回 size 前加了鎖,保證在調用 size() 方法時候不會有其它線程進行入隊和出隊操做,另外因爲 size 變量沒有被修飾爲 volatie,這裏加鎖也保證了多線程下 size 變量的內存可見性。源碼以下:

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

 

總結:PriorityBlockingQueue 隊列內部使用二叉樹堆維護元素優先級,內部使用數組做爲元素存儲的數據結構,這個數組是能夠擴容的,當前元素個數 >= 最大容量的時候會經過算法擴容,

出隊的時候始終保證出隊的元素是堆樹的根節點,而不是在隊列裏面停留時間最長的元素,默認元素優先級比較規則是使用元素的compareTo方法來作,用戶能夠自定義優先級的比較優先級。

相關文章
相關標籤/搜索