JDK源碼那些事兒之PriorityBlockingQueue

今天繼續說一說阻塞隊列的實現,今天的主角就是優先級阻塞隊列PriorityBlockingQueue,從命名上看以爲應該是有序的,畢竟是優先級隊列,那麼其實是什麼狀況,咱們一塊兒看下其內部實現,提早說明下,由於PriorityBlockingQueue涉及到了堆排序的相關使用,若是沒了解清楚,能夠參考我以前寫的關於堆排序的相關說明java

前言

JDK版本號:1.8.0_171

PriorityBlockingQueue是一個無限容量的阻塞隊列,固然,最終仍是受內存限制,內部實現是數組,不停增加下去會致使OOM,因爲其無限容量的特性,在入隊操做時不存在阻塞這個說法,只要內存足夠都能入隊,固然,入隊操做線程仍是須要爭搶互斥鎖的,只是不會存在隊列已滿狀況下的阻塞等待操做算法

同時,雖然這個被稱爲優先級阻塞隊列,可是入隊操做以後並不會當即進行排序調整,只有在出隊操做或drainTo轉移隊列時纔是被優先級隊列排過序的。PriorityBlockingQueue是經過Comparator來進行排序,因此入隊的對象自己已經實現Comparator接口,或者傳入一個Comparator實例對象才能夠api

PriorityBlockingQueue排序是經過最小堆實現的,以前的文章裏我已經專門說明了堆排序的算法,這裏再也不詳細說明,不明白的能夠先去參考堆排序的講解部分。優先級隊列初始容量默認爲11,當入隊空間不足時會進行擴容操做,擴容大小根據擴容前的容量決定數組

類定義

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable

PriorityBlockingQueue關係圖

常量/變量

/**
     * 默認初始化數組長度11
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * 容許的最大數組長度,減8是由於有可能部分虛擬機會用一部分空間來保存對象頭信息
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * 
     * 優先級隊列經過平衡二叉堆實現,可類比堆排序算法
     * 那麼queue[n]對應的左右子節點分別爲queue[2*n+1]和[2*(n+1)]
     * 隊列中的對象必須是可比較的,默認的天然排序或自行實現的的Comparator均可
     * 隊列非空,則queue[0]爲最小值,即以最小二叉堆排序
     */
    private transient Object[] queue;

    /**
     * 優先級隊列queue包含的元素個數
     */
    private transient int size;

    /**
     * 比較器對象,在使用天然排序比較時爲null
     */
    private transient Comparator<? super E> comparator;

    /**
     * 互斥鎖,只有一個ReentrantLock
     */
    private final ReentrantLock lock;

    /**
     * 非空信號量,隊列爲空時阻塞出隊線程
     * 只須要判斷隊列爲空的狀況,隊列沒有滿的狀況,因此纔是無限容量隊列
     */
    private final Condition notEmpty;

    /**
     * Spin鎖,經過CAS操做實現
     */
    private transient volatile int allocationSpinLock;

    /**
     * 在序列化中使用,爲了兼容老版本
     */
    private PriorityQueue<E> q;

allocationSpinLock在對象中的內存偏移量獲取在靜態代碼塊中實現以下,後續使用CAS操做用到oop

// Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long allocationSpinLockOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = PriorityBlockingQueue.class;
            allocationSpinLockOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("allocationSpinLock"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

構造方法

在不傳參時,默認初始化數組長度爲11,即優先級隊列默認容量爲11,主要在於傳入集合參數時須要進行判斷是否知足PriorityBlockingQueue使用的條件,非空,可比較的對象,另外,還需判斷是否須要進行堆化操做ui

public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
    public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        // 是否須要從新排序標識,即堆化標識
        boolean heapify = true; // true if not known to be in heap order
        // 空值檢查標識
        boolean screen = true;  // true if must screen for nulls
        // 集合爲SortedSet,則使用其Comparator排序,因爲其已有序,直接複製便可,無需堆化操做
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        // 集合爲PriorityBlockingQueue,則使用其Comparator排序
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            // 精確到PriorityBlockingQueue類,因爲其已有序,直接複製便可,無需堆化操做
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        // 假如沒有正確返回Object[],則複製a
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        // 對集合進行空值校驗
        // this.comparator != null 判斷SortedSet類型的對象空值狀況
        // n == 1 這個感受應該是對天然排序的對象作的操做 n >= 1 纔對,不然就在heapify()堆化操做比較時拋錯,能夠本身嘗試放List
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        // 堆化操做
        if (heapify)
            heapify();
    }

重要方法

tryGrow

擴容操做,在offer中獲取鎖的時候調用,擴容以前先釋放鎖,經過CAS操做將allocationSpinLock標識置爲1,表示當前正在擴容中,擴容完畢則從新獲取鎖,allocationSpinLock標識置爲0this

private void tryGrow(Object[] array, int oldCap) {
        // 先釋放鎖
        lock.unlock(); // must release and then re-acquire main lock
        // 準備的新數組
        Object[] newArray = null;
        // 其餘線程未進行擴容操做時嘗試使用CAS更新allocationSpinLock標識爲1,成功則當前線程取得擴容操做權限
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                // 原數組容量小於64,則每次增加oldCap + 2
                // 原數組容量大於等於64,則每次增加oldCap的一半
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                // 新的數組容量大於最大的數組長度限制
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    // 原數組容量加1就已經溢出或者超過最大長度限制直接拋出OOM
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    // 設置新數組容量爲最大值
                    newCap = MAX_ARRAY_SIZE;
                }
                // 擴容成功且當前數組沒有被其餘線程操做,則建立一個新數組
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                // 將擴容標識恢復
                allocationSpinLock = 0;
            }
        }
        // 其餘線程已經在擴容了,讓出cpu
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        // 從新得到鎖
        lock.lock();
        // 擴容成功且原數組沒被其餘線程操做則複製原數組到新數組中
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

siftUpComparable/siftUpUsingComparator

相似堆排序的操做,不一樣在於,這些方法是類比插入新節點,即數組中添加新的值時調用,添加完以後整個堆須要進行調整,Up也說明了是從下往上進行堆的平衡調整。在調用這個方法前,堆應該是已經平衡的,若是未平衡,須要先進行堆化操做,參考heapify方法。入隊操做時,將x插入k的位置上,入隊時至關於將新元素放入k的位置上(還未徹底執行,須要知足堆特性),因爲新添加元素可能會破壞整個堆,因此須要從下往上調整整個堆,直到x大於等於其父節點或者到達根節點spa

/**
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     */
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            // 找到k位置處的父節點
            int parent = (k - 1) >>> 1;
            // 父節點對應的值
            Object e = array[parent];
            // 父子節點比較,k位置處的節點大於等於其父節點,則退出,不須要對堆進行調整了
            if (key.compareTo((T) e) >= 0)
                break;
            // k位置處的節點小於其父節點,則將k位置處的值改成其父節點值
            array[k] = e;
            // k指向其父節點,至關於堆向上遞進了一層,繼續while判斷其父節點是否須要調整
            k = parent;
        }
        // 結束調整時k指向的位置即爲插入x的值,即key
        array[k] = key;
    }
    
    /**
     * 同上,區別在於這個使用了一個比較對象cmp,上邊是天然排序
     */
    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

siftDownComparable/siftDownUsingComparator

出隊操做時使用,出隊出的是堆頂元素,即array[0],那麼出隊完成以後堆頂元素空缺,將array[n]處的元素放入位置0處(還未真正執行,須要先驗證是否知足堆特性),這裏翻譯說的是插入,能夠這麼理解,出隊時至關於將最後一個葉子節點移動到根(堆頂位置),這裏就須要從上往下調整整個堆,使其知足堆的特性,這裏按小頂堆處理,最上邊則是最小值,需知足節點值小於其兩個子節點的值便可線程

/**
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param n heap size
     */
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            // 最後的非葉子節點位置
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                // k的左子節點
                int child = (k << 1) + 1; // assume left child is least
                // k的左子節點對應的值
                Object c = array[child];
                // k的右子節點
                int right = child + 1;
                // 左右子節點中最大值
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                // 對比,其節點值小於等於其左右子節點,則代表堆調整完畢
                if (key.compareTo((T) c) <= 0)
                    break;
                // k節點處的值爲其子節點中最大的值
                array[k] = c;
                // k指向其子節點最大值的那個索引位置
                k = child;
            }
            // 結束調整時k指向的位置即爲插入x的值,即key
            array[k] = key;
        }
    }

    /** 
     * 同上,區別在於這個使用了一個比較對象cmp,上邊是天然排序
     */
    private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                    int n,
                                                    Comparator<? super T> cmp) {
        if (n > 0) {
            int half = n >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                Object c = array[child];
                int right = child + 1;
                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                    c = array[child = right];
                if (cmp.compare(x, (T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = x;
        }
    }

dequeue

出隊核心操做,須要先獲取互斥鎖才能執行,出隊元素爲array[0]節點,出隊以後進行堆排序的操做,步驟以下:翻譯

  • 保存array[n]值爲x,清除數組中n處的值
  • 經過siftDownComparable方法將x插入0(即堆頂位置)處操做
  • siftDownComparable自身向下依次去進行整個堆的平衡調整
  • 堆(數組)長度-1
private E dequeue() {
        // 當前隊列元素的長度
        int n = size - 1;
        // 無值 返回null
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            // 保存堆頂元素array[0]
            E result = (E) array[0];
            // 保存最後一個元素
            E x = (E) array[n];
            // 置空清除最後一個元素
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                // 經過天然排序使得整個堆保持小頂堆的特性,下面說
                siftDownComparable(0, x, array, n);
            else
                // 經過傳入的比較類對象排序使得整個堆保持小頂堆的特性
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

heapify

堆化操做,從最後一個非葉子節點開始,循環對每一個節點平衡,直到堆頂,完成堆的平衡操做

/**
     * 構造方法傳入集合時用到
     * 將集合進行堆化操做,知足堆的特性
     */
    private void heapify() {
        Object[] array = queue;
        int n = size;
        // 最後一個非葉子節點
        int half = (n >>> 1) - 1;
        Comparator<? super E> cmp = comparator;
        if (cmp == null) {
            // 從最後一個非葉子節點開始調整
            // 這裏使用的是siftDownComparable,使得節點及其子節點知足堆特性
            // 逐步向上遍歷,最終使得整個數組知足堆特性
            for (int i = half; i >= 0; i--)
                siftDownComparable(i, (E) array[i], array, n);
        }
        else {
            // 同上,多了個比較對象
            for (int i = half; i >= 0; i--)
                siftDownUsingComparator(i, (E) array[i], array, n, cmp);
        }
    }

offer

入隊操做,最終都是調用offer,這裏使用siftUpComparable從下向上調整,由於咱們是將新值放到了隊列最後,應向上進行調整

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        // 得到鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        // 數組容量不夠時進行擴容操做,上邊已經說過tryGrow這部分
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            // 入隊操做,將新節點放入最後,須要使用siftUpComparable從下往上進行調整
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            // 容量+1
            size = n + 1;
            // 隊列有數據則喚醒阻塞的出隊線程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

poll/take

出隊操做,最終調用dequeue,上邊已說過,其他部分同以前講過的阻塞隊列相似,不過多說明

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null && nanos > 0)
                nanos = notEmpty.awaitNanos(nanos);
        } finally {
            lock.unlock();
        }
        return result;
    }

removeAt

刪除隊列中的某個元素,remove和removeEQ方法也是使用這個方法來進行操做的。這裏有個地方須要注意下,在刪除非最後一個節點時須要進行堆調整,把最後一個節點當成新值添加到刪除位置,先經過siftDownComparable/siftDownUsingComparator向下進行平衡調整,若是沒有進行調整,則須要調用siftUpComparable/siftUpUsingComparator向上進行調整,有些人可能不是很明白,其實想下堆的特性就能瞭解,在數組中並非徹底有序的,在最小堆中只要知足父節點小於等於其子節點便可,因此這裏在註釋上我也進行了說明,若是向下調整了,則i處的子節點取代了i,原來i處的節點必定大於等於i的父節點,因此i的子節點也大於等於i的父節點,不須要向上調整了。例以下圖這種狀況,就須要繼續向上調整:

removeAt堆調整

在刪除36節點後,若是把24節點放入刪除後的節點上,此時會致使array[i] == moved,須要向上調整,其實也是由於堆的特性致使,堆只保證了堆頂元素的有序性,其餘元素若是調整則須要從新進行平衡操做

private void removeAt(int i) {
        Object[] array = queue;
        int n = size - 1;
        // 移除最後一個節點,不須要調整堆
        if (n == i) // removed last element
            array[i] = null;
        else {
            // 下面至關於刪除隊列中間某個節點進行的操做
            // 相似出隊操做,只不過如今不必定是堆頂
            E moved = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                // 至關於將n處的節點放入刪除的節點位置i處,以後向下進行堆化平衡操做
                siftDownComparable(i, moved, array, n);
            else
                siftDownUsingComparator(i, moved, array, n, cmp);
            // 向下無平衡操做,則需向上進行堆化平衡操做
            // 若是向下調整了,則i處的子節點取代了i,原來i處的節點必定大於等於i的父節點,因此i的子節點也大於等於i的父節點
            if (array[i] == moved) {
                if (cmp == null)
                    siftUpComparable(i, moved, array);
                else
                    siftUpUsingComparator(i, moved, array, cmp);
            }
        }
        size = n;
    }

drainTo

轉移maxElements個元素到集合c中,從源碼實現上能夠看到轉移以後的元素是有序的,而不是像PriorityBlockingQueue裏的數組是無序的,每次轉移,先直接添加堆頂元素,再出隊操做,循環調用使得轉移後的集合有序

public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 集合長度
            int n = Math.min(size, maxElements);
            for (int i = 0; i < n; i++) {
                // 先添加
                c.add((E) queue[0]); // In this order, in case add() throws.
                // 出隊操做,堆已平衡
                dequeue();
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

迭代器

迭代器使用時的方法以下,每次調用建立一個新的迭代器對象Itr,入參調用了toArray()方法,拷貝了當前數組隊列,不是直接放入的原數組隊列

public Iterator<E> iterator() {
        return new Itr(toArray());
    }
    public Object[] toArray() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return Arrays.copyOf(queue, size);
        } finally {
            lock.unlock();
        }
    }

看下其迭代器實現類,保存的是當前數組的一個拷貝,可是remove操做是刪除的PriorityBlockingQueue原數組中對應的元素,須要注意

/**
     * 迭代器,其中的數組是拷貝了當前數組的快照
     */
    final class Itr implements Iterator<E> {
        // 數組,這裏保存的實際上是原數組的快照,參考迭代調用方法
        final Object[] array; // Array of all elements
        // 遊標,下一次next執行時對應的值的索引
        int cursor;           // index of next element to return
        // 上一個next元素索引值,即上一次next()執行返回的那個值的索引,無則爲-1
        int lastRet;          // index of last element, or -1 if no such
        
        // 獲取迭代器時調用,看上邊源碼
        Itr(Object[] array) {
            lastRet = -1;
            this.array = array;
        }

        public boolean hasNext() {
            return cursor < array.length;
        }

        public E next() {
            // 遊標指向已超過數組長度,拋錯
            if (cursor >= array.length)
                throw new NoSuchElementException();
            // 更新lastRet,記錄next值索引
            lastRet = cursor;
            // 返回next應該獲取的值,同時遊標索引+1
            return (E)array[cursor++];
        }
            
        public void remove() {
            // 無值拋錯
            if (lastRet < 0)
                throw new IllegalStateException();
            // 這裏調用removeEQ方法進行移除操做,注意,這裏刪除的是PriorityBlockingQueue的原數組中對應的值,不是這個拷貝數組
            removeEQ(array[lastRet]);
            // 置爲-1
            lastRet = -1;
        }
    }

迭代器是原數組的一個快照版本,故也是無序的,若是想經過迭代器獲取有序數組是不可能的,同時,使用時須要注意remove方法,避免誤刪

總結

至此,PriorityBlockingQueue源碼基本說明完畢,須要理解的在於如下幾點:

  • PriorityBlockingQueue是一個數組實現的無限制容量的優先級阻塞隊列
  • 默認初始容量爲11,容量不夠時可進行擴容操做
  • 經過平衡二叉最小堆實現優先級排列
  • take、poll方法出隊或drainTo轉移的集合纔是有序的

以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝

相關文章
相關標籤/搜索