Java併發編程筆記——J.U.C之collections框架:ConcurrentLinkedQueue

一:ConcurrentLinkedQueue簡介

ConcurrentLinkedQueue是線程安全的無界非阻塞隊列,其底層數據結構使用單向鏈表實現,對於入隊和出隊操做使用CAS來實現線程安全。java

Doug Lea在實現ConcurrentLinkedQueue時,並無利用鎖或底層同步原語,而是徹底基於自旋+CAS的方式實現了該隊列。回想一下AQS,AQS內部的CLH等待隊列也是利用了這種方式。node

因爲是徹底基於無鎖算法實現的,因此當出現多個線程同時進行修改隊列的操做(好比同時入隊),極可能出現CAS修改失敗的狀況,那麼失敗的線程會進入下一次自旋,再嘗試入隊操做,直到成功。算法

因此,在併發量適中的狀況下,ConcurrentLinkedQueue通常具備較好的性能。編程

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

二:下面是ConcurrentLinkedQueue的類圖結構

三:segmentfault

三:ConcurrentLinkedQueue原理

隊列結構

咱們來看下ConcurrentLinkedQueue的內部結構:安全

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
    implements Queue<E>, java.io.Serializable {
 
    /**
     * 隊列頭指針
     */
    private transient volatile Node<E> head;
 
    /**
     * 隊列尾指針.
     */
    private transient volatile Node<E> tail;
 
    // Unsafe mechanics
     
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
     
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
    /**
     * 隊列結點定義
     */
    private static class Node<E> {
        volatile E item;        // 元素值
        volatile Node<E> next;  // 後驅指針
 
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
 
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
 
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
 
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
 
        // Unsafe mechanics
 
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
 
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
 
    //...
}

能夠看到,ConcurrentLinkedQueue內部就是一個簡單的單鏈表結構,每入隊一個元素就是插入一個Node類型的結點。數據結構

字段head指向隊列頭,tail指向隊列尾,經過Unsafe來CAS操做字段值以及node對象的字段值。多線程

構造器的定義

ConcurrentLinkedQueue包含兩種構造器:併發

//構建一個空隊列(head,tail均指向一個佔位結點)
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
//根據以有集合構造隊列
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

咱們不妨先看下空構造器,經過空構造器創建的ConcurrentLinkedQueue對象,其head和tail指針並不是指向null,而是指向一個item值爲null的node結點,以下圖:函數

入隊操做

元素的入隊是在隊尾插入元素,ConcurrentLinkedQueue的入隊代碼很是簡單,卻也很是精妙:

尾添加offer操做是在隊列末一個元素,若是傳遞的參數是null則拋出NPE異常,不然因爲ConcurrentLinkedQueue是無界隊列,該方法會一直返回true。

另外,因爲使用CAS算法,所以該方法不會阻塞掛起調用線程。下面具體看下實現原理。

public boolean add(E e) {//入隊一個元素
    return offer(e);
}
//在隊尾入隊元素e,直到成功
public boolean offer(E e) {
    //e爲null,則拋出空指針異常
    checkNotNull(e);    

    //構造node結點,在構造函數內部調用unsafe.putObject
    final Node<E> newNode = new Node<E>(e);    

    //從尾結點進行插入
    for (Node<E> t = tail, p = t;;) {   
        Node<E> q = p.next;

         //CASE1:q==null 說明p是尾結點,則直接插入
        if (q == null) {   
            //使用CAS設置p結點的next結點
            if (p.casNext(null, newNode)) {    
                //CAS成功,則說明新增結點已經放入鏈表,而後設置當前尾結點(包含head,第1,3,4....個結點爲尾結點)
                if (p != t) // hop two nodes at a time    //CAS競爭失敗的線程會在下一次自旋中進入該邏輯
                    casTail(t, newNode);  // Failure is OK.    //從新設置隊尾指針tail
                return true;
            }    //CAS競爭失敗則進入下一次自旋
        }
        else if (p == q)    //CASE2:發生了出隊操做
            //多線程操做時,因爲poll操做移除元素後,可能會把head變爲自引用,也就是head的next變成了head,因此這裏須要從新找新的head
            p = (t != (t = tail)) ? t : head;
        else
            //尋找尾結點
            p = (p != t && t != (t = tail)) ? t : q;    //將p從新指向隊尾結點
    }
}

咱們來分析下offer方法的實現。單線程的狀況下,元素入隊比較好理解,直接線性地在隊首插入元素便可。咱們假設有兩個線程ThreadA和ThreadB同時進行入隊地操做。

①ThreadA先單獨入隊兩個元素九、2

此時隊列地結構以下:

②ThreadA入隊元素「10」,ThreadB入隊元素「25」

此時ThreadA和ThreadB若併發執行,咱們看下會發生什麼:

一、ThreadA和ThreadB同時進入自旋中的如下代碼塊:

if (q == null) {
    if (p.casNext(null, newNode)) {    //CASE1:正常狀況下,新結點直接插入到隊尾
       //CAS競爭插入成功
        if (p != t) // hop two nodes at a time//CAS競爭失敗地線程會在下一次自旋中進入該邏輯
            casTail(t, newNode);  // Failure is OK.    //從新設置隊尾指針tail
        return true;
    //CAS競爭插入失敗則進入下一次自旋
    }

二、ThreadA執行cas操做(p.casNext)成功,插入新結點「10」

ThreadA執行完成後,直接返回true,隊列結構以下:

三、ThreadB執行cas操做(p.casNext)失敗

因爲CAS操做同時修改隊尾元素,致使ThreadB操做失敗,則ThreadB進入下一次自旋;
在下一次自旋中,進入如下代碼塊:

else
    // Check for tail updates after two hops.
    p = (p != t && t != (t = tail)) ? t : q;    //將p從新指向隊尾結點

上述分支的做用就是讓p指針從新定位到隊尾結點,此時隊列結構以下:

而後ThreadB會繼續下一次自旋,並再次進入如下代碼塊:

if (q == null) {
    // p is last node
    if (p.casNext(null, newNode)) {
        // Successful CAS is the linearization point
        // for e to become an element of this queue,
        // and for newNode to become "live".
        if (p != t) // hop two nodes at a time
            casTail(t, newNode);  // Failure is OK.
        return true;
    }
    // Lost CAS race to another thread; re-read next
}

此時,CAS操做成功,隊列結構以下:

因爲此時p!=t ,因此會調用casTail方法從新設置隊尾指針:

private boolean casTail(Node<E> cmp, Node<E> val) {    //從新設置隊尾指針tail
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

這個分支只有在元素入隊的同時,針對該元素也發生了「出隊」操做纔會執行,咱們後面會分析元素的「出隊」,理解了「出隊」操做再回頭來看這個分支就容易理解不少了。

出隊操做

隊列中元素的「出隊」是從隊首移除元素,咱們來看下ConcurrentLinkedQueue是如何實現出隊的:

//在隊首出隊元素,直到成功
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {//CASE2:隊首是非哨兵結點(item!=null)
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);//CASE1:隊首是一個哨兵結點(item==null)
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

仍是經過示例來看,假設初始的隊列結構以下:

①ThreadA先單獨進行出隊操做

因爲head所指的是item==null的結點,因此ThreadA會執行如下分支:

else
    p = q;

而後進入下一次自旋,在自旋中執行如下分支,若是CAS操做成功,則移除首個有效元素,並從新設置頭指針:

if (item != null && p.casItem(item, null)) {    //CASE2:隊首是非哨兵結點
    // Successful CAS is the linearization point
    // for item to be removed from this queue.
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

此時隊列的結構以下:

若是ThreadA的CAS操做失敗呢?

CAS操做失敗則會進入如下分支,並從新開始自旋:

else if (p == q)
    continue restartFromHead;

最終前面兩個null結點會被GC回收,隊列結構以下:

②ThreadA繼續進行出隊操做

ThreadA繼續執行「出隊」操做,仍是執行如下分支:

if (item != null && p.casItem(item, null)) {
    // Successful CAS is the linearization point
    // for item to be removed from this queue.
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

可是此時p==h,因此僅將頭結點置null,這實際上是一種「懶刪除」的策略。

出隊元素「2」:

出隊元素「10」:

最終隊列結果以下:

③ThreadA進行出隊,其它線程進行入隊

這是最特殊的一種狀況,當隊列中只剩下一個元素時,若是同時發生出隊和入隊操做,會致使隊列出現下面這種結構:(假設ThreadA進行出隊元素「25」,ThreadB進行入隊元素「11」)

此時tail.next=tail自身,因此ThreadB在執行入隊時,會進入到offer方法的如下分支:

else if (p == q)    //CASE2:發生出隊操做
    // We have fallen off list.  If tail is unchanged, it
    // will also be off-list, in which case we need to
    // jump to head, from which all live nodes are always
    // reachable.  Else the new tail is a better bet.
    p = (t != (t = tail)) ? t : head;

3、總結

ConcurrentLinkedQueue使用了自旋+CAS的非阻塞算法來保證線程併發訪問時的數據一致性。因爲隊列自己是一種鏈表結構,因此雖然算法看起來很簡單,但其實須要考慮各類併發的狀況,實現複雜度較高,而且ConcurrentLinkedQueue不具有實時的數據一致性,實際運用中,隊列通常在生產者-消費者的場景下使用得較多,因此ConcurrentLinkedQueue的使用場景並不如阻塞隊列那麼多。

另外,關於ConcurrentLinkedQueue還有如下須要注意的幾點:

  1. ConcurrentLinkedQueue的迭代器是弱一致性的,這在併發容器中是比較廣泛的現象,主要是指在一個線程在遍歷隊列結點而另外一個線程嘗試對某個隊列結點進行修改的話不會拋出ConcurrentModificationException,這也就形成在遍歷某個還沒有被修改的結點時,在next方法返回時能夠看到該結點的修改,但在遍歷後再對該結點修改時就看不到這種變化。
  2. size方法須要遍歷鏈表,因此在併發狀況下,其結果不必定是準確的,只能供參考。

參考書籍

Java併發編程之美

參考連接

http://www.javashuo.com/article/p-trqovday-gk.html

相關文章
相關標籤/搜索