解讀 Java 併發隊列 BlockingQueue

最近得空,想寫篇文章好好說說 java 線程池問題,我相信不少人都只知其一;不知其二的,包括我本身在仔仔細細看源碼以前,也有許多的不解,甚至有些地方我一直都沒有理解到位。
java

說到線程池實現,那麼就不得不涉及到各類 BlockingQueue 的實現,那麼我想就 BlockingQueue 的問題和你們分享分享我瞭解的一些知識。node

本文沒有像以前分析 AQS 那樣一行一行源碼分析了,不過仍是把其中最重要和最難理解的代碼說了一遍,因此難免篇幅略長。本文涉及到比較多的 Doug Lea 對 BlockingQueue 的設計思想,但願有心的讀者真的能夠有一些收穫,我以爲本身仍是寫了一些乾貨的。算法

本文直接參考 Doug Lea 寫的 Java doc 和註釋,這也是咱們在學習 java 併發包時最好的材料了。但願你們能有所思、有所悟,學習 Doug Lea 的代碼風格,並將其優雅、嚴謹的做風應用到咱們寫的每一行代碼中。api

BlockingQueue

開篇先介紹下 BlockingQueue 這個接口的規則,後面再看其實現。數組

首先,最基本的來講, BlockingQueue 是一個先進先出的隊列(Queue),爲何說是阻塞(Blocking)的呢?是由於 BlockingQueue 支持當獲取隊列元素可是隊列爲空時,會阻塞等待隊列中有元素再返回;也支持添加元素時,若是隊列已滿,那麼等到隊列能夠放入新元素時再放入。安全

BlockingQueue 是一個接口,繼承自 Queue,因此其實現類也能夠做爲 Queue 的實現來使用,而 Queue 又繼承自 Collection 接口。bash

BlockingQueue 對插入操做、移除操做、獲取元素操做提供了四種不一樣的方法用於不一樣的場景中使用:一、拋出異常;二、返回特殊值(null 或 true/false,取決於具體的操做);三、阻塞等待此操做,直到這個操做成功;四、阻塞等待此操做,直到成功或者超時指定時間。總結以下:數據結構

Throws exception
Special value
Blocks
Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek()
not applicable
not applicable

BlockingQueue 的各個實現都遵循了這些規則,固然咱們也不用死記這個表格,知道有這麼回事,而後寫代碼的時候根據本身的須要去看方法的註釋來選取合適的方法便可。多線程

對於 BlockingQueue,咱們的關注點應該在 put(e) 和 take() 這兩個方法,由於這兩個方法是帶阻塞的。併發

BlockingQueue 不接受 null 值的插入,相應的方法在碰到 null 的插入時會拋出 NullPointerException 異常。null 值在這裏一般用於做爲特殊值返回(表格中的第三列),表明 poll 失敗。因此,若是容許插入 null 值的話,那獲取的時候,就不能很好地用 null 來判斷究竟是表明失敗,仍是獲取的值就是 null 值。

一個 BlockingQueue 多是有界的,若是在插入的時候,發現隊列滿了,那麼 put 操做將會阻塞。一般,在這裏咱們說的無界隊列也不是說真正的無界,而是它的容量是 Integer.MAX_VALUE(21億多)。

BlockingQueue 是設計用來實現生產者-消費者隊列的,固然,你也能夠將它當作普通的 Collection 來用,前面說了,它實現了 java.util.Collection 接口。例如,咱們能夠用 remove(x) 來刪除任意一個元素,可是,這類操做一般並不高效,因此儘可能只在少數的場合使用,好比一條消息已經入隊,可是須要作取消操做的時候。

BlockingQueue 的實現都是線程安全的,可是批量的集合操做如 addAll, containsAll, retainAllremoveAll 不必定是原子操做。如 addAll(c) 有可能在添加了一些元素後中途拋出異常,此時 BlockingQueue 中已經添加了部分元素,這個是容許的,取決於具體的實現。

BlockingQueue 不支持 close 或 shutdown 等關閉操做,由於開發者可能但願不會有新的元素添加進去,此特性取決於具體的實現,不作強制約束。

最後,BlockingQueue 在生產者-消費者的場景中,是支持多消費者和多生產者的,說的其實就是線程安全問題。

相信上面說的每一句都很清楚了,BlockingQueue 是一個比較簡單的線程安全容器,下面我會分析其具體的在 JDK 中的實現,這裏又到了 Doug Lea 表演時間了。

BlockingQueue 實現之 ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界隊列實現類,底層採用數組來實現。

其併發控制採用可重入鎖來控制,無論是插入操做仍是讀取操做,都須要獲取到鎖才能進行操做。

若是讀者看過我以前寫的《一行一行源碼分析清楚 AbstractQueuedSynchronizer(二)》 的關於 Condition 的文章的話,那麼你必定能很容易看懂 ArrayBlockingQueue 的源碼,它採用一個 ReentrantLock 和相應的兩個 Condition 來實現。

ArrayBlockingQueue 共有如下幾個屬性:

// 用於存放元素的數組
final Object[] items;
// 下一次讀取操做的位置
int takeIndex;
// 下一次寫入操做的位置
int putIndex;
// 隊列中的元素數量
int count;

// 如下幾個就是控制併發用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
複製代碼

咱們用個示意圖來描述其同步機制:

array-blocking-queue

ArrayBlockingQueue 實現併發同步的原理就是,讀操做和寫操做都須要獲取到 AQS 獨佔鎖才能進行操做。若是隊列爲空,這個時候讀操做的線程進入到讀線程隊列排隊,等待寫線程寫入新的元素,而後喚醒讀線程隊列的第一個等待線程。若是隊列已滿,這個時候寫操做的線程進入到寫線程隊列排隊,等待讀線程將隊列元素移除騰出空間,而後喚醒寫線程隊列的第一個等待線程。

對於 ArrayBlockingQueue,咱們能夠在構造的時候指定如下三個參數:

  1. 隊列容量,其限制了隊列中最多容許的元素個數;
  2. 指定獨佔鎖是公平鎖仍是非公平鎖。非公平鎖的吞吐量比較高,公平鎖能夠保證每次都是等待最久的線程獲取到鎖;
  3. 能夠指定用一個集合來初始化,將此集合中的元素在構造方法期間就先添加到隊列中。

更具體的源碼我就不進行分析了,由於它就是 AbstractQueuedSynchronizer 中 Condition 的使用,感興趣的讀者請看我寫的《一行一行源碼分析清楚 AbstractQueuedSynchronizer(二)》,由於只要看懂了那篇文章,ArrayBlockingQueue 的代碼就沒有分析的必要了,固然,若是你徹底不懂 Condition,那麼基本上也就能夠說看不懂 ArrayBlockingQueue 的源碼了。

BlockingQueue 實現之 LinkedBlockingQueue

底層基於單向鏈表實現的阻塞隊列,能夠當作無界隊列也能夠當作有界隊列來使用。看構造方法:

// 傳說中的無界隊列
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
複製代碼
// 傳說中的有界隊列
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
複製代碼

咱們看看這個類有哪些屬性:

// 隊列容量
private final int capacity;

// 隊列中的元素數量
private final AtomicInteger count = new AtomicInteger(0);

// 隊頭
private transient Node<E> head;

// 隊尾
private transient Node<E> last;

// take, poll, peek 等讀操做的方法須要獲取到這個鎖
private final ReentrantLock takeLock = new ReentrantLock();

// 若是讀操做的時候隊列是空的,那麼等待 notEmpty 條件
private final Condition notEmpty = takeLock.newCondition();

// put, offer 等寫操做的方法須要獲取到這個鎖
private final ReentrantLock putLock = new ReentrantLock();

// 若是寫操做的時候隊列是滿的,那麼等待 notFull 條件
private final Condition notFull = putLock.newCondition();
複製代碼

這裏用了兩個鎖,兩個 Condition,簡單介紹以下:

takeLock 和 notEmpty 怎麼搭配:若是要獲取(take)一個元素,須要獲取 takeLock 鎖,可是獲取了鎖還不夠,若是隊列此時爲空,還須要隊列不爲空(notEmpty)這個條件(Condition)。

putLock 須要和 notFull 搭配:若是要插入(put)一個元素,須要獲取 putLock 鎖,可是獲取了鎖還不夠,若是隊列此時已滿,還須要隊列不是滿的(notFull)這個條件(Condition)。

首先,這裏用一個示意圖來看看 LinkedBlockingQueue 的併發讀寫控制,而後再開始分析源碼:

linked-blocking-queue

看懂這個示意圖,源碼也就簡單了,讀操做是排好隊的,寫操做也是排好隊的,惟一的併發問題在於一個寫操做和一個讀操做同時進行,只要控制好這個就能夠了。

先上構造方法:

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
複製代碼

注意,這裏會初始化一個空的頭結點,那麼第一個元素入隊的時候,隊列中就會有兩個元素。讀取元素時,也老是獲取頭節點後面的一個節點。count 的計數值不包括這個頭節點。

咱們來看下 put 方法是怎麼將元素插入到隊尾的:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // 若是你糾結這裏爲何是 -1,能夠看看 offer 方法。這就是個標識成功、失敗的標誌而已。
    int c = -1;
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 必需要獲取到 putLock 才能夠進行插入操做
    putLock.lockInterruptibly();
    try {
        // 若是隊列滿,等待 notFull 的條件知足。
        while (count.get() == capacity) {
            notFull.await();
        }
        // 入隊
        enqueue(node);
        // count 原子加 1,c 仍是加 1 前的值
        c = count.getAndIncrement();
        // 若是這個元素入隊後,還有至少一個槽可使用,調用 notFull.signal() 喚醒等待線程。
        // 哪些線程會等待在 notFull 這個 Condition 上呢?
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 入隊後,釋放掉 putLock
        putLock.unlock();
    }
    // 若是 c == 0,那麼表明隊列在這個元素入隊前是空的(不包括head空節點),
    // 那麼全部的讀線程都在等待 notEmpty 這個條件,等待喚醒,這裏作一次喚醒操做
    if (c == 0)
        signalNotEmpty();
}

// 入隊的代碼很是簡單,就是將 last 屬性指向這個新元素,而且讓原隊尾的 next 指向這個元素
// 這裏入隊沒有併發問題,由於只有獲取到 putLock 獨佔鎖之後,才能夠進行此操做
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

// 元素入隊後,若是須要,調用這個方法喚醒讀線程來讀
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}
複製代碼

咱們再看看 take 方法:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 首先,須要獲取到 takeLock 才能進行出隊操做
    takeLock.lockInterruptibly();
    try {
        // 若是隊列爲空,等待 notEmpty 這個條件知足再繼續執行
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 出隊
        x = dequeue();
        // count 進行原子減 1
        c = count.getAndDecrement();
        // 若是此次出隊後,隊列中至少還有一個元素,那麼調用 notEmpty.signal() 喚醒其餘的讀線程
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 出隊後釋放掉 takeLock
        takeLock.unlock();
    }
    // 若是 c == capacity,那麼說明在這個 take 方法發生的時候,隊列是滿的
    // 既然出隊了一個,那麼意味着隊列不滿了,喚醒寫線程去寫
    if (c == capacity)
        signalNotFull();
    return x;
}
// 取隊頭,出隊
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    // 以前說了,頭結點是空的
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    // 設置這個爲新的頭結點
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
// 元素出隊後,若是須要,調用這個方法喚醒寫線程來寫
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}
複製代碼

源碼分析就到這裏結束了吧,畢竟仍是比較簡單的源碼,基本上只要讀者認真點都看得懂。

BlockingQueue 實現之 SynchronousQueue

它是一個特殊的隊列,它的名字其實就蘊含了它的特徵 - - 同步的隊列。爲何說是同步的呢?這裏說的並非多線程的併發問題,而是由於當一個線程往隊列中寫入一個元素時,寫入操做不會當即返回,須要等待另外一個線程來將這個元素拿走;同理,當一個讀線程作讀操做的時候,一樣須要一個相匹配的寫線程的寫操做。這裏的 Synchronous 指的就是讀線程和寫線程須要同步,一個讀線程匹配一個寫線程。

咱們比較少使用到 SynchronousQueue 這個類,不過它在線程池的實現類 ScheduledThreadPoolExecutor 中獲得了應用,感興趣的讀者能夠在看完這個後去看看相應的使用。

雖然上面我說了隊列,可是 SynchronousQueue 的隊列實際上是虛的,其不提供任何空間(一個都沒有)來存儲元素。數據必須從某個寫線程交給某個讀線程,而不是寫到某個隊列中等待被消費。

你不能在 SynchronousQueue 中使用 peek 方法(在這裏這個方法直接返回 null),peek 方法的語義是隻讀取不移除,顯然,這個方法的語義是不符合 SynchronousQueue 的特徵的。SynchronousQueue 也不能被迭代,由於根本就沒有元素能夠拿來迭代的。雖然 SynchronousQueue 間接地實現了 Collection 接口,可是若是你將其當作 Collection 來用的話,那麼集合是空的。固然,這個類也是不容許傳遞 null 值的(併發包中的容器類好像都不支持插入 null 值,由於 null 值每每用做其餘用途,好比用於方法的返回值表明操做失敗)。

接下來,咱們來看看具體的源碼實現吧,它的源碼不是很簡單的那種,咱們須要先搞清楚它的設計思想。

源碼加註釋大概有 1200 行,咱們先看大框架:

// 構造時,咱們能夠指定公平模式仍是非公平模式,區別以後再說
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue() : new TransferStack();
}
abstract static class Transferer {
    // 從方法名上大概就知道,這個方法用於轉移元素,從生產者手上轉到消費者手上
    // 也能夠被動地,消費者調用這個方法來從生產者手上取元素
    // 第一個參數 e 若是不是 null,表明場景爲:將元素從生產者轉移給消費者
    // 若是是 null,表明消費者等待生產者提供元素,而後返回值就是相應的生產者提供的元素
    // 第二個參數表明是否設置超時,若是設置超時,超時時間是第三個參數的值
    // 返回值若是是 null,表明超時,或者中斷。具體是哪一個,能夠經過檢測中斷狀態獲得。
    abstract Object transfer(Object e, boolean timed, long nanos);
}
複製代碼

Transferer 有兩個內部實現類,是由於構造 SynchronousQueue 的時候,咱們能夠指定公平策略。公平模式意味着,全部的讀寫線程都遵照先來後到,FIFO 嘛,對應 TransferQueue。而非公平模式則對應 TransferStack。

synchronous-queue

咱們先採用公平模式分析源碼,而後再說說公平模式和非公平模式的區別。

接下來,咱們看看 put 方法和 take 方法:

// 寫入值
public void put(E o) throws InterruptedException {
    if (o == null) throw new NullPointerException();
    if (transferer.transfer(o, false, 0) == null) { // 1
        Thread.interrupted();
        throw new InterruptedException();
    }
}
// 讀取值並移除
public E take() throws InterruptedException {
    Object e = transferer.transfer(null, false, 0); // 2
    if (e != null)
        return (E)e;
    Thread.interrupted();
    throw new InterruptedException();
}
複製代碼

咱們看到,寫操做 put(E o) 和讀操做 take() 都是調用 Transferer.transfer(…) 方法,區別在於第一個參數是否爲 null 值。

咱們來看看 transfer 的設計思路,其基本算法以下:

  1. 當調用這個方法時,若是隊列是空的,或者隊列中的節點和當前的線程操做類型一致(如當前操做是 put 操做,而隊列中的元素也都是寫線程)。這種狀況下,將當前線程加入到等待隊列便可。
  2. 若是隊列中有等待節點,並且與當前操做能夠匹配(如隊列中都是讀操做線程,當前線程是寫操做線程,反之亦然)。這種狀況下,匹配等待隊列的隊頭,出隊,返回相應數據。

其實這裏有個隱含的條件被知足了,隊列若是不爲空,確定都是同種類型的節點,要麼都是讀操做,要麼都是寫操做。這個就要看究竟是讀線程積壓了,仍是寫線程積壓了。

咱們能夠假設出一個男女配對的場景:一個男的過來,若是一我的都沒有,那麼他須要等待;若是發現有一堆男的在等待,那麼他須要排到隊列後面;若是發現是一堆女的在排隊,那麼他直接牽走隊頭的那個女的。

既然這裏說到了等待隊列,咱們先看看其實現,也就是 QNode:

static final class QNode {
    volatile QNode next;          // 能夠看出來,等待隊列是單向鏈表
    volatile Object item;         // CAS'ed to or from null volatile Thread waiter; // 將線程對象保存在這裏,用於掛起和喚醒 final boolean isData; // 用於判斷是寫線程節點(isData == true),仍是讀線程節點 QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } ...... 複製代碼

相信說了這麼多之後,咱們再來看 transfer 方法的代碼就輕鬆多了。

/**
 * Puts or takes an item.
 */
Object transfer(Object e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        // 隊列空,或隊列中節點類型和當前節點一致,
        // 即咱們說的第一種狀況,將節點入隊便可。讀者要想着這塊 if 裏面方法其實就是入隊
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            // t != tail 說明剛剛有節點入隊,continue 便可
            if (t != tail)                  // inconsistent read
                continue;
            // 有其餘節點入隊,可是 tail 仍是指向原來的,此時設置 tail 便可
            if (tn != null) {               // lagging tail
                // 這個方法就是:若是 tail 此時爲 t 的話,設置爲 tn
                advanceTail(t, tn);
                continue;
            }
            // 
            if (timed && nanos <= 0)        // can't wait return null; if (s == null) s = new QNode(e, isData); // 將當前節點,插入到 tail 的後面 if (!t.casNext(null, s)) // failed to link in continue; // 將當前節點設置爲新的 tail advanceTail(t, s); // swing tail and wait // 看到這裏,請讀者先往下滑到這個方法,看完了之後再回來這裏,思路也就不會斷了 Object x = awaitFulfill(s, e, timed, nanos); // 到這裏,說明以前入隊的線程被喚醒了,準備往下執行 if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? x : e; // 這裏的 else 分支就是上面說的第二種狀況,有相應的讀或寫相匹配的狀況 } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null) ? x : e; } } } void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); } 複製代碼
// 自旋或阻塞,直到知足條件,這個方法返回
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {

    long lastTime = timed ? System.nanoTime() : 0;
    Thread w = Thread.currentThread();
    // 判斷須要自旋的次數,
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 若是被中斷了,那麼取消這個節點
        if (w.isInterrupted())
            // 就是將當前節點 s 中的 item 屬性設置爲 this
            s.tryCancel(e);
        Object x = s.item;
        // 這裏是這個方法的惟一的出口
        if (x != e)
            return x;
        // 若是須要,檢測是否超時
        if (timed) {
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;
        // 若是自旋達到了最大的次數,那麼檢測
        else if (s.waiter == null)
            s.waiter = w;
        // 若是自旋到了最大的次數,那麼線程掛起,等待喚醒
        else if (!timed)
            LockSupport.park(this);
        // spinForTimeoutThreshold 這個以前講 AQS 的時候其實也說過,剩餘時間小於這個閾值的時候,就
        // 不要進行掛起了,自旋的性能會比較好
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}
複製代碼

Doug Lea 的巧妙之處在於,將各個代碼湊在了一塊兒,使得代碼很是簡潔,固然也同時增長了咱們的閱讀負擔,看代碼的時候,仍是得仔細想一想各類可能的狀況。

下面,再說說前面說的公平模式和非公平模式的區別。

相信你們內心面已經有了公平模式的工做流程的概念了,我就簡單說說 TransferStack 的算法,就不分析源碼了。

  1. 當調用這個方法時,若是隊列是空的,或者隊列中的節點和當前的線程操做類型一致(如當前操做是 put 操做,而棧中的元素也都是寫線程)。這種狀況下,將當前線程加入到等待棧中,等待配對。而後返回相應的元素,或者若是被取消了的話,返回 null。
  2. 若是棧中有等待節點,並且與當前操做能夠匹配(如棧裏面都是讀操做線程,當前線程是寫操做線程,反之亦然)。將當前節點壓入棧頂,和棧中的節點進行匹配,而後將這兩個節點出棧。配對和出棧的動做其實也不是必須的,由於下面的一條會執行一樣的事情。
  3. 若是棧頂是進行匹配而入棧的節點,幫助其進行匹配並出棧,而後再繼續操做。

應該說,TransferStack 的源碼要比 TransferQueue 的複雜一些,若是讀者感興趣,請自行進行源碼閱讀。

BlockingQueue 實現之 PriorityBlockingQueue

帶排序的 BlockingQueue 實現,其併發控制採用的是 ReentrantLock,隊列爲無界隊列(ArrayBlockingQueue 是有界隊列,LinkedBlockingQueue 也能夠經過在構造函數中傳入 capacity 指定隊列最大的容量,可是 PriorityBlockingQueue 只能指定初始的隊列大小,後面插入元素的時候,若是空間不夠的話會自動擴容)。

簡單地說,它就是 PriorityQueue 的線程安全版本。不能夠插入 null 值,同時,插入隊列的對象必須是可比較大小的(comparable),不然報 ClassCastException 異常。它的插入操做 put 方法不會 block,由於它是無界隊列(take 方法在隊列爲空的時候會阻塞)。

它的源碼相對比較簡單,本節將介紹其核心源碼部分。

咱們來看看它有哪些屬性:

// 構造方法中,若是不指定大小的話,默認大小爲 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 數組的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 這個就是存放數據的數組
private transient Object[] queue;

// 隊列當前大小
private transient int size;

// 大小比較器,若是按照天然序排序,那麼此屬性可設置爲 null
private transient Comparator<? super E> comparator;

// 併發控制所用的鎖,全部的 public 且涉及到線程安全的方法,都必須先獲取到這個鎖
private final ReentrantLock lock;

// 這個很好理解,其實例由上面的 lock 屬性建立
private final Condition notEmpty;

// 這個也是用於鎖,用於數組擴容的時候,須要先獲取到這個鎖,才能進行擴容操做
// 其使用 CAS 操做
private transient volatile int allocationSpinLock;

// 用於序列化和反序列化的時候用,對於 PriorityBlockingQueue 咱們應該比較少使用到序列化
private PriorityQueue q;
複製代碼

此類實現了 Collection 和 Iterator 接口中的全部接口方法,對其對象進行迭代並遍歷時,不能保證有序性。若是你想要實現有序遍歷,建議採用 Arrays.sort(queue.toArray()) 進行處理。PriorityBlockingQueue 提供了 drainTo 方法用於將部分或所有元素有序地填充(準確說是轉移,會刪除原隊列中的元素)到另外一個集合中。還有一個須要說明的是,若是兩個對象的優先級相同(compare 方法返回 0),此隊列並不保證它們之間的順序。

PriorityBlockingQueue 使用了基於數組的二叉堆來存放元素,全部的 public 方法採用同一個 lock 進行併發控制。

二叉堆:一顆徹底二叉樹,它很是適合用數組進行存儲,對於數組中的元素 a[i],其左子節點爲 a[2

i+1],其右子節點爲 a[2\
i + 2],其父節點爲 a[(i-1)/2],其堆序性質爲,每一個節點的值都小於其左右子節點的值。二叉堆中最小的值就是根節點,可是刪除根節點是比較麻煩的,由於須要調整樹。

簡單用個圖解釋一下二叉堆,我就不說太多專業的嚴謹的術語了,這種數據結構的優勢是一目瞭然的,最小的元素必定是根元素,它是一棵滿的樹,除了最後一層,最後一層的節點從左到右緊密排列。

priority-blocking-queue-1

下面開始 PriorityBlockingQueue 的源碼分析,首先咱們來看看構造方法:

// 默認構造方法,採用默認值(11)來進行初始化
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
// 指定數組的初始大小
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
// 指定比較器
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}
// 在構造方法中就先填充指定的集合中的元素
public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    // 
    boolean heapify = true; // true if not known to be in heap order
    boolean screen = true;  // true if must screen for nulls
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    }
    else if (c instanceof PriorityBlockingQueue<?>) {
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    Object[] a = c.toArray();
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) heapify(); } 複製代碼

接下來,咱們來看看其內部的自動擴容實現:

private void tryGrow(Object[] array, int oldCap) {
    // 這邊作了釋放鎖的操做
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // 用 CAS 操做將 allocationSpinLock 由 0 變爲 1,也算是獲取鎖
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // 若是節點個數小於 64,那麼增長的 oldCap + 2 的容量
            // 若是節點數大於等於 64,那麼增長 oldCap 的一半
            // 因此節點數較小時,增加得快一些
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) :
                                   (oldCap >> 1));
            // 這裏有可能溢出
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // 若是 queue != array,那麼說明有其餘線程給 queue 分配了其餘的空間
            if (newCap > oldCap && queue == array)
                // 分配一個新的大數組
                newArray = new Object[newCap];
        } finally {
            // 重置,也就是釋放鎖
            allocationSpinLock = 0;
        }
    }
    // 若是有其餘的線程也在作擴容的操做
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // 從新獲取鎖
    lock.lock();
    // 將原來數組中的元素複製到新分配的大數組中
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}
複製代碼

擴容方法對併發的控制也很是的巧妙,釋放了原來的獨佔鎖 lock,這樣的話,擴容操做和讀操做能夠同時進行,提升吞吐量。

下面,咱們來分析下寫操做 put 方法和讀操做 take 方法。

public void put(E e) {
    // 直接調用 offer 方法,由於前面咱們也說了,在這裏,put 方法不會阻塞
    offer(e); 
}
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    // 首先獲取到獨佔鎖
    lock.lock();
    int n, cap;
    Object[] array;
    // 若是當前隊列中的元素個數 >= 數組的大小,那麼須要擴容了
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        // 節點添加到二叉堆中
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        // 更新 size
        size = n + 1;
        // 喚醒等待的讀線程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
複製代碼

對於二叉堆而言,插入一個節點是簡單的,插入的節點若是比父節點小,交換它們,而後繼續和父節點比較。

// 這個方法就是將數據 x 插入到數組 array 的位置 k 處,而後再調整樹
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 二叉堆中 a[k] 節點的父節點位置
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}
複製代碼

咱們用圖來示意一下,咱們接下來要將 11 插入到隊列中,看看 siftUp 是怎麼操做的。

priority-blocking-queue-2

咱們再看看 take 方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 獨佔鎖
    lock.lockInterruptibly();
    E result;
    try {
        // dequeue 出隊
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
複製代碼
private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        // 隊頭,用於返回
        E result = (E) array[0];
        // 隊尾元素先取出
        E x = (E) array[n];
        // 隊尾置空
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}
複製代碼

dequeue 方法返回隊頭,並調整二叉堆的樹,調用這個方法必須先獲取獨佔鎖。

廢話很少說,出隊是很是簡單的,由於隊頭就是最小的元素,對應的是數組的第一個元素。難點是隊頭出隊後,須要調整樹。

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        // 這裏獲得的 half 確定是非葉節點
        // a[n] 是最後一個元素,其父節點是 a[(n-1)/2]。因此 n >>> 1 表明的節點確定不是葉子節點
        // 下面,咱們結合圖來一行行分析,這樣比較直觀簡單
        // 此時 k 爲 0, x 爲 17,n 爲 9
        int half = n >>> 1; // 獲得 half = 4
        while (k < half) {
            // 先取左子節點
            int child = (k << 1) + 1; // 獲得 child = 1
            Object c = array[child];  // c = 12
            int right = child + 1;  // right = 2
            // 若是右子節點存在,並且比左子節點小
            // 此時 array[right] = 20,因此條件不知足
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            // key = 17, c = 12,因此條件不知足
            if (key.compareTo((T) c) <= 0)
                break;
            // 把 12 填充到根節點
            array[k] = c;
            // k 賦值後爲 1
            k = child;
            // 一輪事後,咱們發現,12 左邊的子樹和剛剛的差很少,都是缺乏根節點,接下來處理就簡單了
        }
        array[k] = key;
    }
}
複製代碼

priority-blocking-queue-3

記住二叉堆是一棵徹底二叉樹,那麼根節點 10 拿掉後,最後面的元素 17 必須找到合適的地方放置。首先,17 和 10 不能直接交換,那麼先將根節點 10 的左右子節點中較小的節點往上滑,即 12 往上滑,而後原來 12 留下了一個空節點,而後再把這個空節點的較小的子節點往上滑,即 13 往上滑,最後,留出了位子,17 補上便可。

我稍微調整下這個樹,以便讀者能更明白:

priority-blocking-queue-4

好了, PriorityBlockingQueue 咱們也說完了。

總結

我知道本文過長,相信一字不漏看完的讀者確定是少數。

ArrayBlockingQueue 底層是數組,有界隊列,若是咱們要使用生產者-消費者模式,這是很是好的選擇。

LinkedBlockingQueue 底層是鏈表,能夠當作無界和有界隊列來使用,因此你們不要覺得它就是無界隊列。

SynchronousQueue 自己不帶有空間來存儲任何元素,使用上能夠選擇公平模式和非公平模式。

PriorityBlockingQueue 是無界隊列,基於數組,數據結構爲二叉堆,數組第一個也是樹的根節點老是最小值。

(全文完)

相關文章
相關標籤/搜索