Java多線程進階(三三)—— J.U.C之collections框架:LinkedBlockingQueue

圖片描述

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、LinkedBlockingQueue簡介

LinkedBlockingQueue是在JDK1.5時,隨着J.U.C包引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基於單鏈表實現:java

clipboard.png

LinkedBlockingQueue是一種近似有界阻塞隊列,爲何說近似?由於LinkedBlockingQueue既能夠在初始構造時就指定隊列的容量,也能夠不指定,若是不指定,那麼它的容量大小默認爲Integer.MAX_VALUEnode

LinkedBlockingQueue除了底層數據結構(單鏈表)與ArrayBlockingQueue不一樣外,另一個特色就是:
它維護了兩把鎖——takeLockputLock
takeLock用於控制出隊的併發,putLock用於入隊的併發。這也就意味着,同一時刻,只能只有一個線程能執行入隊/出隊操做,其他入隊/出隊線程會被阻塞;可是,入隊和出隊之間能夠併發執行,即同一時刻,能夠同時有一個線程進行入隊,另外一個線程進行出隊,這樣就能夠提高吞吐量。segmentfault

在ArrayBlockingQueue章節中,咱們說過,ArrayBlockingQueue維護了一把全局鎖,不管是出隊仍是入隊,都共用這把鎖,這就致使任一時間點只有一個線程可以執行。那麼對於「生產者-消費者」模式來講,意味着生產者和消費者不能併發執行。

2、LinkedBlockingQueue原理

構造

LinkedBlockingQueue提供了三種構造器:數組

/**
 * 默認構造器.
 * 隊列容量爲Integer.MAX_VALUE.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
/**
 * 顯示指定隊列容量的構造器
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
/**
 * 從已有集合構造隊列.
 * 隊列容量爲Integer.MAX_VALUE
 */
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();                     // 這裏加鎖僅僅是爲了保證可見性
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)              // 隊列不能包含null元素
                throw new NullPointerException();
            if (n == capacity)          // 隊列已滿
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));    // 隊尾插入元素
            ++n;
        }
        count.set(n);                   // 設置元素個數
    } finally {
        putLock.unlock();
    }
}

能夠看到,若是不指定容量,那麼它的容量大小默認爲Integer.MAX_VALUE。另外,LinkedBlockingQueue使用了一個原子變量AtomicInteger記錄隊列中元素的個數,以保證入隊/出隊併發修改元素時的數據一致性。數據結構

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    /**
     * 隊列容量.
     * 若是不指定, 則爲Integer.MAX_VALUE
     */
    private final int capacity;

    /**
     * 隊列中的元素個數
     */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 隊首指針.
     * head.item == null
     */
    transient Node<E> head;

    /**
     * 隊尾指針.
     * last.next == null
     */
    private transient Node<E> last;

    /**
     * 出隊鎖
     */
    private final ReentrantLock takeLock = new ReentrantLock();

    /**
     * 隊列空時,出隊線程在該條件隊列等待
     */
    private final Condition notEmpty = takeLock.newCondition();

    /**
     * 入隊鎖
     */
    private final ReentrantLock putLock = new ReentrantLock();

    /**
     * 隊列滿時,入隊線程在該條件隊列等待
     */
    private final Condition notFull = putLock.newCondition();

    /**
     * 鏈表結點定義
     */
    static class Node<E> {
        E item;

        Node<E> next;   // 後驅指針

        Node(E x) {
            item = x;
        }
    }

    //...
}

構造完成後,LinkedBlockingQueue的初始結構以下:併發

clipboard.png

插入部分元素後的LinkedBlockingQueue結構:性能

clipboard.png

核心方法

因爲接口和ArrayBlockingQueue徹底同樣,因此LinkedBlockingQueue會阻塞線程的方法也一共有4個:put(E e)offer(e, time, unit)take()poll(time, unit),咱們先來看插入元素的方法。this

插入元素——put(E e)spa

/**
 * 在隊尾插入指定的元素.
 * 若是隊列已滿,則阻塞線程.
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();            // 獲取「入隊鎖」
    try {
        while (count.get() == capacity) {   // 隊列已滿, 則線程在notFull上等待
            notFull.await();
        }
        enqueue(node);                      // 將新結點連接到「隊尾」

        /**
         * c+1 表示的元素個數.
         * 若是,則喚醒一個「入隊線程」
         */
        c = count.getAndIncrement();        // c表示入隊前的隊列元素個數
        if (c + 1 < capacity)               // 入隊後隊列未滿, 則喚醒一個「入隊線程」
            notFull.signal();
    } finally {
        putLock.unlock();
    }

    if (c == 0)                             // 隊列初始爲空, 則喚醒一個「出隊線程」
        signalNotEmpty();
}

插入元素時,首先須要得到「入隊鎖」,若是隊列滿了,則當前線程須要在notFull條件隊列等待;不然,將新元素連接到隊列尾部。線程

這裏須要注意的是兩個地方:

①每入隊一個元素後,若是隊列還沒滿,則須要喚醒其它可能正在等待的「入隊線程」:

/**
 * c+1 表示的元素個數.
 * 若是,則喚醒一個「入隊線程」
 */
c = count.getAndIncrement();        // c表示入隊前的隊列元素個數
if (c + 1 < capacity)               // 入隊後隊列未滿, 則喚醒一個「入隊線程」
    notFull.signal();

② 每入隊一個元素,都要判斷下隊列是否空了,若是空了,說明可能存在正在等待的「出隊線程」,須要喚醒它:

if (c == 0)                             // 隊列爲空, 則喚醒一個「出隊線程」
    signalNotEmpty();
這裏爲何不像ArrayBlockingQueue那樣,入隊完成後,直接喚醒一個在notEmpty上等待的出隊線程?

由於ArrayBlockingQueue中,入隊/出隊用的是同一把鎖,二者不會併發執行,因此每入隊一個元素(拿到鎖),就能夠通知可能正在等待的「出隊線程」。(同一個鎖的兩個條件隊列:notEmptynotFull

ArrayBlockingQueue中的enqueue方法:

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)     // 隊列已滿,則重置索引爲0
        putIndex = 0;
    count++;                            // 元素個數+1
    notEmpty.signal();                  // 喚醒一個notEmpty上的等待線程(能夠來隊列取元素了)
}

而LinkedBlockingQueue中,入隊/出隊用的是兩把鎖,入隊/出隊是會併發執行的。入隊鎖對應的是notFull條件隊列,出隊鎖對應的是notEmpty條件隊列,因此每入隊一個元素,應當當即去喚醒可能阻塞的其它入隊線程。當隊列爲空時,說明後面再來「出隊線程」,必定都會阻塞,因此此時能夠去喚醒一個出隊線程,以提高性能。

試想如下,若是去掉上面的①和②,當入隊線程拿到「入隊鎖」,入隊元素後,直接嘗試喚醒出隊線程,會要求去拿出隊鎖,這樣持有鎖A的同時,再去嘗試獲取鎖B,極可能引發死鎖,就算經過打破死鎖的條件避免死鎖,每次操做同時獲取兩把鎖也會下降性能。

刪除元素——table()

刪除元素的邏輯和插入元素相似。刪除元素時,首先須要得到「出隊鎖」,若是隊列爲空,則當前線程須要在notEmpty條件隊列等待;不然,從隊首出隊一個元素:

/**
 * 從隊首出隊一個元素
 */
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;   // 獲取「出隊鎖」
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {                  // 隊列爲空, 則阻塞線程
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();                // c表示出隊前的元素個數
        if (c > 1)                                  // 出隊前隊列非空, 則喚醒一個出隊線程
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)                              // 隊列初始爲滿,則喚醒一個入隊線程
        signalNotFull();
    return x;
}
/**
 * 隊首出隊一個元素.
 */
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h;         // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

上面須要的注意點和插入元素同樣:
①每出隊一個元素前,若是隊列非空,則須要喚醒其它可能正在等待的「出隊線程」:

c = count.getAndDecrement();                // c表示出隊前的元素個數
if (c > 1)                                  // 出隊前隊列非空, 則喚醒一個出隊線程
    notEmpty.signal();

② 每入隊一個元素,都要判斷下隊列是否滿,若是是滿的,說明可能存在正在等待的「入隊線程」,須要喚醒它:

if (c == capacity)                              // 隊列初始爲滿,則喚醒一個入隊線程
    signalNotFull();

3、總結

概括一下,LinkedBlockingQueueArrayBlockingQueue比較主要有如下區別:

  1. 隊列大小不一樣。ArrayBlockingQueue初始構造時必須指定大小,而LinkedBlockingQueue構造時既能夠指定大小,也能夠不指定(默認爲Integer.MAX_VALUE,近似於無界);
  2. 底層數據結構不一樣。ArrayBlockingQueue底層採用數組做爲數據存儲容器,而LinkedBlockingQueue底層採用單鏈表做爲數據存儲容器;
  3. 二者的加鎖機制不一樣。ArrayBlockingQueue使用一把全局鎖,即入隊和出隊使用同一個ReentrantLock鎖;而LinkedBlockingQueue進行了鎖分離,入隊使用一個ReentrantLock鎖(putLock),出隊使用另外一個ReentrantLock鎖(takeLock);
  4. LinkedBlockingQueue不能指定公平/非公平策略(默認都是非公平),而ArrayBlockingQueue能夠指定策略。
相關文章
相關標籤/搜索