【Java併發編程】—–「J.U.C」:ConcurrentLinkedQueue

前言

在以前的文章中,已經對ArrayBlockingQueueLinkedBlockingQueue這兩個比較經常使用的阻塞隊列作了源碼分析,咱們知道其內部都是經過ReentrantLock來保證數據讀寫的線程安全,經過Condition來完成線程等待和喚醒,只不過ArrayBlockingQueue在讀寫時使用了一把鎖所完成,而LinkedBlockingQueue對於讀和寫分別使用了兩把鎖來進行處理,從而達到讀寫分離的效果。算法

然而,經過鎖機制來實現一個線程安全的隊列,在併發不是特別高的狀況下並非很是合適,由於在大多數狀況下都只有幾個線程同時訪問,而每次執行都須要去加一次鎖,從而致使線程進行上下文切換,影響總體性能。所以,JDK還爲咱們提供了一個無鎖線程安全的隊列——ConcurrentLinkedQueue,其底層使用CAS來實現無阻塞的併發控制。本文將該隊列的實現機制和源碼作一個分析,讓咱們共同看看Doug Lea大神是如何巧妙地經過無鎖機制來實現一個線程安全的隊列。編程

1.ConcurrentLinkedQueue介紹

首先讓咱們看看JDK文檔對該類的描述:安全

ConcurrentLinkedQueue的API描述.png多線程

ConcurrentLinkedQueue是一個基於鏈表、無界、線程安全的隊列。這個隊列將元素按照先進先出的順序進行存儲。隊列的頭節點是在隊列中存在時間最久的節點,隊列的尾節點是在隊列中存在時間最短的節點。新的元素會被插入到隊列的尾部,而隊列的元素的獲取操做則會從隊列的頭部去獲取元素。ConcurrentLinekedQueue適合做爲多個線程共享訪問的集合。與大多數併發集合的實現相似,該類也不容許添加null元素,該類的實現使用了一個高效的無鎖算法,其算法的是基於podc-1996.pdf所改進的。併發

除了上面所描述的基本特性以外,ConcurrentLinkedQueue中還有一些其餘的特色:工具

  • 隊列中的最後一個元素的next屬性老是爲null,而且最後一個節點能夠經過tail節點以時間複雜度爲o(1)的方式到達,也能夠經過head以時間複雜度o(n)的方式到達。即隊列中的最後一個元素,其老是可達的。
  • 節點出隊列,是經過執行CAS操做,將其item設置爲null。這裏與咱們傳統想象的節點刪除不太同樣,它是經過將一個節點的item標記爲null,標記這個節點已經出隊列,在下次操做的時候,若是一旦發現節點的item爲null時,就將該節點的next設置爲本身,從而真正完成節點從隊列的移除,所以節點的刪除是延遲處理的。 (下面咱們在分析節點出隊列的過程時能夠更加清楚地看到其實現細節)
  • 若是隊列中的節點其item不爲null,它們從是能夠從head開始,逐個查找到對應的節點。
  • ConcurrentLinkedQueue的size方法不是一個常量級別的操做,每一次獲取隊列的大小,都須要總體地對隊列作一次遍歷操做,而且獲得值也是一個非精確的值,由於在遍歷的過程當中,隊列的結構可能也會被其餘線程所改變。

2. ConcurrentLinkedQueue內部結構分析

因爲是基於鏈表的實現方式,與其餘的併發隊列相似,都會在內部定義一個節點類,ConcurrentLinedQueu亦是如此,首先咱們看一下節點的定義:源碼分析

//該節點是一個靜態內部類,所以其只能做用於該隊列內部
      private static class Node<E> {
       /*
        * 當前節點存儲的元素,注意到這裏使用volatile關鍵字來對節點進行
        * 修飾,其目的是在併發讀的時候保證內存的可見性
        */
        volatile E item;
        //當前節點的下一個節點
        volatile Node<E> next;

        /**
         * 構建新的節點,這裏沒有使用volatile的方式來對節點的元素值進行設置,而是使用普通的寫方式
         * 由於對於一個新增的節點,只有在其被成功插入到隊列尾部纔對外可見,所以在這裏沒有對數據可見性的強制要求  
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
        /*
         *  經過Unsafe來完成對當前節點元素的CAS操做
         */
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
        /*
         *  使用普通的方式來設置當前節點的下一個節點
         */
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
        /*
         *  經過Unsafe來完成對當前節點的下一個節點的CAS操做
         */
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = Node.class;
                //經過Unsafe來獲取一個Node節點的item屬性在內存中相對該對象的位置偏移量
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));

                //經過Unsafe來獲取一個Node節點的next節點屬性在內存中相對該對象的位置偏移量
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

上面節點的定義與以前分析的SynchronousQueue中的內部定義的節點很是相似,這裏再也不過多闡述。性能

隊列的屬性定義this

爲了提升快速查找隊列中第一個節點和最後一個節點,所以ConcurrentLinkedQueue中分別定義了一個head節點和tail節點來快速定位。spa

/**
     * 不變性:
     *  - 隊列中全部未刪除的節點均可以經過head節點的succ方法查找到
     *  - head節點必定不可能等於null
     *  - (tmp = head).next != tmp,即head的next不能指向本身。
     * 
     * 可變性:
     * - head的item可能爲null,也可能不爲null
     * - tail節點可能會滯後於head節點,所以從head節點未必必定能夠找到tail節點
     * 
     */
    private transient volatile Node<E> head;
    
    /**
     * 不變性:
     *  - 節點中的最後一個元素老是能夠經過tail的succ方法來獲取
     *  - tail節點不等於null
     * 
     * 可變性:
     *  - head的item可能爲null,也可能不爲null
     *  - tail 節點的next可能指向本身,也可能不指向本身
     *  - tail節點可能會滯後於head節點,所以從head節點未必必定能夠找到tail節點
     */
    private transient volatile Node<E> tail;

    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

ConcurrentLinkedQueue源碼分析

經過上面的描述咱們知道了該隊列是經過一個頭節點和一個尾節點,而後將中間連接節點之間兩兩相鏈接構成一個隊列,下面讓咱們分析一下ConcurrentLinkedQueue的內部的具體實現。(在這裏須要說明一下,因爲其節點是徹底地基於Unsafe來完成CAS的操做,若是你對該內容還不是很熟悉的話,能夠參考個人深刻分析Java中的原子操做這篇文章,裏面對原子操做有一個比較細緻的描述。

入隊列源碼分析

在分析源碼以前,咱們先經過幾張圖來講明一下ConcurrentLinkedQueue的入隊列的總體過程。在瞭解完總體的過程後,再結合源碼去分析細節會更加容易理解:

隊列初始化

隊列初始化狀態

在隊列剛建立時,head和tail同時指向空節點,咱們也稱其爲dummy節點。

dummy節點的說明

添加元素a

添加元素a

向隊列中添加元素a,此時只是新節點追加到第一個節點的後面,可是tail節點並未發生改變。

添加元素b

添加元素b

向隊列中添加元素b,此時新節點與原先的tail節點之間的距離大於1,所以tail節點在這個時候會更新,真正的指向了最後一個節點

添加元素c

添加元素c

因爲新節點與tail節點的距離沒有大於1,所以此時tail節點一樣不會發生更新。

添加元素d

添加元素d

經過上面的圖示,咱們能夠看到ConcurrentLinkedQueue在入隊列過程當中很是明顯的一個特色就是tail指針不是實時更新的,即tail節點可能會滯後於隊列中真正的最後一個節點,只有當最後的一個節點與tail節點以前的距離大於1時纔會更新,而這樣設計的目的就是爲了減小避免每增長一個節點,tail節點都須要去執行一次CAS操做的狀況發生。

public boolean offer(E e) {
       // 因爲元素不容許爲null,所以對元素作一個檢查
        checkNotNull(e);
      // 生成待插入的節點
        final Node<E> newNode = new Node<E>(e);
        
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // 當q等於null時,則p就是隊列中的最後一個元素  
            if (q == null) {
               // 執行cas操做,若是執行成功,則newNode成爲隊列的最後一個元素,但它未必是tail指向的元素
                if (p.casNext(null, newNode)) {
                    // 經過判斷p!=t,從而肯定當前新節點與tail指向的節點之間的距離是否大於1
                    // 若是大於1,則須要更新tail指針
                    if (p != t) 
                        casTail(t, newNode);  
                    return true;
                }
            }
            // 若是p==q,則意味着當前p節點已經被從隊列中移除(若是單純從入隊列看是看不出來的,後面結合出隊列再回頭分析)
            else if (p == q)
                /* 判斷在執行過程當中tail是否發生變化,若是未發生變化,則tail也已經脫落隊列
                 * 由於 p = t = tail,而p已經脫離隊列,從而推斷出tail也脫離了隊列
                 * 那麼此時只能從head開始,從新查找隊列的最後一個元素
                 * 若是tail發生了變化,則直接從當前隊列的tail開始查找隊列的最後一個元素
                 */
                p = (t != (t = tail)) ? t : head;
            else
               /**
                *  因爲p節點的next不爲null,而且p節點並未從隊列中刪除,所以須要繼續查找隊列的最後一個節點
                *  判斷執行過程當中tail節點是否發生了變化
                *  若是發生了變化,則讓p執行當前的tail,不然就讓p直接指向它的next節點q
                */
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

可能會有很多朋友對上面t != (t = tail)的處理感到疑惑,疑惑的緣由可能會以爲一個變量本身和本身比較,那不是必定爲true嘛,怎麼還會出現等於false的可能呢。爲了理解這個問題,咱們一塊兒看一下下面的這段代碼:

public class VarCompareTest {
    static volatile int b = 2;

    public static void main(String[] args) {
        int a = b;
        int c = a != (a = b) ? 5 : 4;
        System.out.println(a);
        System.out.println(c);
    }
}

這段代碼的也用到了上面相似的t != (t = tail),可是在編譯完成後,咱們經過IDEA自己的反編譯工具來查看一下對應的Class文件的結果(目前因爲對字節碼指令不熟悉,所以不從那個角度去解讀):

反編譯Class文件後的結果

 

經過上面的代碼,咱們能夠看到,一開始a的值就等於b的值,其值爲2;緊接着a的值首先經過一個局部變量記錄,而後再將b的值賦值給a,因爲b可能存在多線程修改的可能,此時b的值可能被其餘線程改爲了3,所以a的值也會變爲3,最後再拿2與3進行比較,即var10000 != b進行比較,就會存在等於false的狀況發生了。對於t != (t = tail)的狀況也是如此,相信經過這個例子說明你們應該明白其中的原理了!

出隊列源碼分析

在分析出隊列源碼以前,咱們也結合上面的圖來看一下出隊列的總體過程:

隊列當前狀態

這裏假設隊列是基於上面入隊列以後的狀態進行的。

移除第1個節點

移除第一個節點

本來指向head的節點,此時的next指向了本身,所以它從隊列中真正的移除。存儲a的節點,其item置爲了null,而且head節點發生變動,真正指向了隊列中第一個有效(真正存儲數據)的節點。

移除第2個節點

移除第2個節點

此時只是僅僅將節點的item設置爲了null,可是head節點沒有發生變動,這樣作的目的也是爲了減小一次CAS操做,它會等到下一次纔去變動。

移除第3個節點

移除第3個節點

移除第4個節點

移除第4個節點

看完上圖的分析,相信你們對ConcurrentLinkedQueue出隊列的操做應該有一個直觀的理解了,下面咱們看下源碼的具體實現:

public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                 // 若是元素的item不爲null,則說明p節點是當前隊列中的第一個未被刪除的節點
                //  此時也說明head指向的節點確實是隊列中的第一個元素
                // 經過CAS操做,將item設置爲null,來標記其已經被刪除
                if (item != null && p.casItem(item, null)) {
                   // 判斷p節點與head指向的節點是不是同一個,若是不是則須要將head節點向前移動
                    if (p != h) 
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // 節點的next爲null,則說明隊列爲空,只有一個dummy節點
                else if ((q = p.next) == null) {
                   // 嘗試將dummy節點p設置爲新的head
                    updateHead(h, p);
                    return null;
                }
                // 若是p節點被刪除,只能從隊列的head從頭開始再次查找
                else if (p == q)
                    // 跳回到最外層的循環,從新執行一次Node<E> h = head, p = h, q;操做
                    continue restartFromHead;
                else
                   // 因爲head指向的節點其item爲null,即head指向的節點不是一個有效節點,所以繼續經過head的next繼續查找
                    p = q;
            }
        }
    }

   final void updateHead(Node<E> h, Node<E> p) {
       // 判斷新設置的節點與原來的head節點是不是同一個,若是不是則將新節點設置爲新的head節點,
       // 而且將原來的head節點的next指向本身。
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

tail節點滯後於head節點的場景分析

通常來講,head節點都在隊列的左邊,而tail節點在隊列的右邊。然而,在ConcurrentLinkedQueue中,可能存在tail節點在左邊,而head節點卻跑到了右邊的狀況,這種場景咱們將其稱爲tail lag behind head。下面咱們分析一下在什麼樣的場景下會發生這樣的狀況,這對於真正理解ConcurrentLinkedQueue很是重要!

添加一個元素

從上圖能夠看到,在添加完一個元素後,tail節點並有發生改變。此時,假設咱們去獲取隊列中的元素,隊列的結構就變成以下的樣子。

取出隊列元素

從上面的結構咱們能夠看到,此時tail節點滯後於head節點,而且咱們此時經過head節點也沒法查找到tail節點,由於該節點已經從隊列中移除。當下一次添加元素的時候,就會出現tail節點本身指向本身的狀況,此時就須要從新獲取到head,將新增的元素追加到head後面。

總結

至此,ConcurrentLinkedQueue的實現咱們已經分析完成了。該類的核心設計就在於CAS的無阻塞以及head/tail節點的延遲更新。儘可能咱們在實際的開發中基本不會去實現一個如此複雜的隊列,可是經過分析一個經典無阻塞隊列,能夠更加好地幫助咱們理解併發編程。若是存在分析不對的地方,還望大神指出。

做者:碼農一枚 連接:https://www.jianshu.com/p/32d6526494fd 來源:簡書 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。

相關文章
相關標籤/搜索