死磕java concurrent包系列(五)基於AQS的條件隊列把LinkedBlockingQueue「扒光」

LinkedBlockingQueue的基礎

LinkedBlockingQueue是一個基於鏈表的阻塞隊列,實際使用上與ArrayBlockingQueue徹底同樣,咱們只須要把以前烤雞的例子中的Queue對象替換一下便可。若是對於ArrayBlockingQueue不熟悉,能夠去看看https://juejin.im/post/5c0f79f3f265da61561f1becjava

LinkedBlockingQueue源碼分析

源碼在node上註釋寫明瞭,它是基於一個「two lock queue」算法實現的,感興趣的同窗能夠參考這篇paper:www.cs.rochester.edu/u/scott/pap… 這篇文章爲了提高在多處理器的機器上的更好性能的併發而提出了這個算法,其中心思想是:經過兩把鎖分別控制併發,入隊時:只須要鎖Tail Node,出隊時,只須要鎖Head Node。 回到LinkedBlockingQueue,先看看內部成員變量:node

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -6903933977591709194L;

    /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
複製代碼

每一個添加到LinkedBlockingQueue隊列中的數據都將被封裝成Node節點(這個node不一樣於AQS中的node,它是一個單向鏈表),其中head和last分別指向隊列的頭結點和尾結點。與ArrayBlockingQueue不一樣的是,LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對併發進行控制,也就是說,添加和刪除操做並非互斥操做,能夠同時進行,這樣也就能夠大大提升吞吐量。這裏再次強調若是沒有給LinkedBlockingQueue指定容量大小,其默認值將是Integer.MAX_VALUE,若是存在添加速度大於刪除速度時候,有可能會內存溢出,這點在使用前但願慎重考慮。至於LinkedBlockingQueue的實現原理圖與ArrayBlockingQueue是相似的,除了對添加和移除方法使用單獨的鎖控制外,二者都使用了不一樣的Condition條件對象做爲等待隊列,用於掛起take線程和put線程。 總結以下圖: 算法

image.png

LinkedBlockingQueue的阻塞添加

一樣的,添加的方法主要有:add offer 和put。咱們先看看非阻塞添加的add和offer方法,這兩個方法的區別一樣是添加失敗時,add方法是拋異常,offer方法是返回false數組

public boolean add(E e) {
     if (offer(e))
         return true;
     else
         throw new IllegalStateException("Queue full");
}
複製代碼
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        //由於存在併發操做移出和入隊互不衝突,與arrayBlockingQueue不一樣,count被聲明爲Atomic
        final AtomicInteger count = this.count;
        //隊列滿了直接返回
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //由於存在併發問題,加鎖以後再次判斷一下隊列有沒有滿
            if (count.get() < capacity) {
                //入隊
                enqueue(node);
                //容量+1返回舊值
                c = count.getAndIncrement();
                //由於在入隊時可能同時有出隊的線程同時把元素移除,因此在入隊後作一個補償,
                //若是隊列還有空間,那麼喚醒一個如歸的線程執行添加操做
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        //c==0,只有可能最開始就是一個空隊列(注意上面的c返回的是舊值)此時由於恰好添加了一個元素,
        //因此喚醒消費的線程去取移出元素
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
複製代碼
//入隊操做
private void enqueue(Node<E> node) {
     //隊列尾節點指向新的node節點
     last = last.next = node;
}

//signalNotEmpty方法去喚醒移出元素的線程,爲何要先獲取鎖才能signal呢?不懂的同窗回去看看AQS:
//由於條件隊列是基於AQS的鎖存在的,用法上必需要這麼用,不然會拋出異常
private void signalNotEmpty() {
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lock();
          //喚醒獲取並刪除元素的線程
          notEmpty.signal();
      } finally {
          takeLock.unlock();
      }
  }

複製代碼

這裏的Offer()方法作了兩件事:bash

  • 第一件事是判斷隊列是否滿,滿了就直接釋放鎖,沒滿就將節點封裝成Node入隊,而後加鎖後再次判斷隊列添加完成後是否已滿,不滿就繼續喚醒等到在條件對象notFull上的添加線程。
  • 第二件事是,判斷是否須要喚醒等到在notEmpty條件對象上的消費線程。

接下來看看put方法,與offer方法一模一樣:併發

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //鎖可被中斷
        putLock.lockInterruptibly();
        try {
          //隊列滿時加入notFull條件隊列
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            //隊列尚未滿時,繼續喚醒添加線程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //c==0,只有可能最開始就是一個空隊列(注意上面的c返回的是舊值)此時由於恰好添加了一個元素,
        //因此喚醒消費的線程去取移出元素
        if (c == 0)
            signalNotEmpty();
    }

複製代碼

這裏有幾個問題:高併發

問題1:
爲何添加完成後是繼續喚醒在條件隊列notFull上的添加線程而不是像ArrayBlockingQueue那樣直接喚醒notEmpty條件對象上的消費線程?源碼分析

分析1: 先回想一下ArrayBlockingQueue:它內部只有一個鎖,在內部完成添加元素操做後直接喚醒消費線程去消費。若是ArrayBlockingQueue在添加元素以後再喚醒添加線程的話,消費的線程就可能一直被block,沒法執行。 而爲了不這種狀況,對於LinkedBlockingQueue來講,他有兩個鎖,添加和刪除元素不是互斥的,添加的過程當中可能已經刪除好幾個元素了,因此他在設計上要儘量的去喚醒兩個條件隊列。 添加線程在隊列沒有滿時本身直接喚醒本身的其餘添加線程,若是沒有等待的添加線程,直接結束了。若是有就直到隊列元素已滿才結束掛起。注意消費線程的執行過程也是如此。這也是爲何LinkedBlockingQueue的吞吐量要相對大些的緣由。post

問題2: 爲何if (c == 0)時纔去喚醒消費線程呢性能

分析2: 什麼狀況下c等於0呢?c值是添加元素前隊列的大小,也就是說,以前是空隊列,空隊列時會有什麼狀況呢,空隊列會阻塞全部的take進程,將其封裝到notEmpty的條件隊列中。這個時候,c以前是0,如今在執行了enqueue方法後,隊列中有元素了,因此他須要當即喚醒阻塞的take進程,不然阻塞的take進程就一直block在隊列裏,一直沉睡下去。 爲何c>0時,就不會喚醒呢?由於take方法和put方法同樣,take方法每次take完元素後,若是隊列還有值,它會繼續喚醒take隊列,也就是說他只要沒有被await()阻塞,他就會一直不斷的喚醒take線程,而不須要再添加的時候再去喚醒,形成沒必要要的性能浪費

LinkedBlockingQueue的阻塞移出

相對的,咱們再看看take方法:

public E take() throws InterruptedException {
        E x;
        int c = -1;
        //獲取當前隊列大小
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//可中斷
        try {
            //若是隊列沒有數據,當前take線程到條件隊列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            //若是存在數據直接刪除並返回該數據
            x = dequeue();
            c = count.getAndDecrement();//隊列大小減1,返回以前的值
            if (c > 1)
                notEmpty.signal();//還有數據就喚醒後續的消費線程
        } finally {
            takeLock.unlock();
        }
        //知足條件(以前隊列是滿的,如今剛剛執行dequeue拿出了一個),
        //喚醒條件對象上等待隊列中的添加線程
        if (c == capacity)
            signalNotFull();
        return x;
    }

private E dequeue() {
        Node<E> h = head;//獲取頭結點
        Node<E> first = h.next; //獲取頭結的下一個節點(要刪除的節點)
        h.next = h; // help GC//本身next指向本身,即被刪除
        head = first;//更新頭結點
        E x = first.item;//獲取刪除節點的值
        first.item = null;//清空數據,由於first變成頭結點是不能帶數據的,這樣也就刪除隊列的帶數據的第一個節點
        return x;
    }


複製代碼

take方法是一個可阻塞可中斷的移除方法,主要作了兩件事:

  • 若是隊列沒有數據就掛起當前線程到 notEmpty條件對象的等待隊列中一直等待,若是有數據就刪除節點並返回數據項,同時喚醒後續消費線程;
  • 嘗試喚醒條件對象notFull上等待隊列中的添加線程:假設以前隊列中滿員了,那麼新來的put進程將會被阻塞進notFull條件隊列,而後await掛起沉睡。這個時候有線程經過take方法拿出了一個元素,若是此時不喚醒notFull條件隊列,那麼以前滿員時隊列中的線程就會一直睡死過去

總結

LinkedBlockingQueue的兩個隊列:

  • notFull條件隊列(隊列滿時阻塞的put線程): await的時機:隊列滿了 signal的時機:一是put方法放入元素後,若是隊列還有空位,會singal線程繼續添加;二是若是隊列最開始滿員,take方法移出了一個元素後,隊列還有一個空位時也會喚醒它。

  • notEmpty條件隊列(隊列空時候阻塞的take線程): await的時機:隊列空了 signal的時機:一是take方法移出元素後,若是隊列還有空位,會singal線程繼續移出;二是若是隊列最開始空的,put方法放入了一個元素後,隊列還有一個元素時也會喚醒它。

這種算法就是「two lock queue」的設計思想,這也是LinkedBlockingQueue的吞吐量較高的本質緣由

ArrayBlockingQueue和LinkedBlockingQueue的比較總結

經過上述的分析,對於LinkedBlockingQueue和ArrayBlockingQueue的基本使用以及內部實現原理咱們已較爲熟悉了,這裏咱們就對它們兩間的區別來個小結

1.隊列大小和構造方法有所不一樣,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue能夠是有界的也能夠是無界的(Integer.MAX_VALUE),對於後者而言,當添加速度大於移除速度時,在無界的狀況下,可能會形成內存溢出等問題,有坑。

2.數據存儲容器不一樣,ArrayBlockingQueue採用的是數組做爲數據存儲容器,而LinkedBlockingQueue採用的則是以Node節點做爲鏈接對象的單向鏈表。

3.從GC的角度分析:因爲ArrayBlockingQueue採用的是數組的存儲容器,所以在插入或刪除元素時不會產生或銷燬任何額外的對象實例,而LinkedBlockingQueue則會生成一個額外的Node對象。這可能在長時間內須要高效併發地處理大批量數據的時,對於GC可能存在較大影響。

4.二者的實現隊列添加或移除的鎖不同,ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即添加操做和移除操做採用的同一個ReenterLock鎖,而LinkedBlockingQueue實現的隊列中的鎖是分離的,其添加採用的是putLock,移除採用的則是takeLock,這樣能大大提升隊列的吞吐量,也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。

相關文章
相關標籤/搜索