ConcurrentLinkedQueue源碼分析

​ 在併發編程中,咱們可能常常須要用到線程安全的隊列,JDK提供了兩種模式的隊列:阻塞隊列和非阻塞隊列。阻塞隊列使用鎖實現,非阻塞隊列使用CAS實現。ConcurrentLinkedQueue是一個基於鏈表實現的無界線程安全隊列,對於。下面看看JDK是如何使用非阻塞的方式來實現線程安全隊列ConcurrentLinkedQueue的。java

成員屬性

​ ConcurrentLinkedQueue由head和tail節點組成,節點與節點之間經過next鏈接,從而來組成一個鏈表結構的隊列。node

private transient volatile Node<E> head;
private transient volatile Node<E> tail;
複製代碼

Node類

​ Node有兩個屬性item和指向下一個節點的next,item和next都被聲明成volatile類型,使用CAS來保證更新的線程安全。編程

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }
	//更改Node中的數據域item 
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    //更改Node中的指針域next
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    //更改Node中的指針域next
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
複製代碼

構造方法

​ 默認的無參構造,head和tail默認狀況下指向item爲null的Node哨兵結點。元素入隊時被加入隊尾,出隊時候從隊列頭部獲取一個元素。安全

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
複製代碼

offer方法

​ 在讀源碼並按照其執行流程分析以前,先給個結論:tail不必定指向對象真正的尾節點,後面咱們分析以後會發現這個特色。多線程

private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}
public boolean offer(E e) {
    //(1)若是e爲null會拋出空指針異常
    checkNotNull(e);
    //(2)建立一個新的Node結點,Node的構造函數中會調用Unsafe類的putObject方法
    final Node<E> newNode = new Node<E>(e);
    //(3)從尾節點插入新的結點
    for (Node<E> t = tail, p = t;;) {
        //q爲尾節點的next結點,可是在多線程中,若是有別的線程修改了tail結點那麼在本線程中能夠看到p!=null(後
        //面的CAS就是這樣作的)
        Node<E> q = p.next;
        //(4)若是q爲null,說明如今p是尾節點,那麼能夠執行添加
        if (q == null) {
            //(5)這裏使用cas設置p結點的next結點爲newNode
            //(傳入null,比較p的next是否爲null,爲null則將next設置爲newNode)
            if (p.casNext(null, newNode)) {
                //(6)下面是更新tail結點的代碼
                //在CAS執行成功以後,p(原鏈表的tail)結點的next已是newNode,這裏就設置tail結點爲newNode
                if (p != t) // hop two nodes at a time
                    // 若是p不等於t,說明有其它線程先一步更新tail
                    // 也就不會走到q==null這個分支了
                    // p取到的多是t後面的值
                    // 把tail原子更新爲新節點
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
        }
        //若是被移除了
        else if (p == q)
            //(7)多線程操做的時候,可能會有別的線程使用poll方法移除元素後可能會把head的next變成head,因此這裏須要找到新的head:這裏請參考後面的poll方法的講解圖示進行理解
            p = (t != (t = tail)) ? t : head;
        else
            // (8)查詢尾節點
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
複製代碼

​ 上面是offer方法的實現以及註釋,這裏咱們分爲單線程執行和多線程執行兩種狀況,按照上面的源碼實現一步步分析整個的流程。先討論單線程執行的過程併發

單線程執行

​ 在單線程環境下執行,那麼就直接按照方法實現一步步執行判斷便可,下面經過適當的圖示來講明這個過程框架

  1. 首先當一個線程調用offer方法的時候,在代碼(1)處進行非空檢查,爲null拋出異常,不爲null執行(2)函數

  2. 代碼(2)Node<E> newNode = new Node<E>(e)使用item做爲構造函數的參數,建立一個新的結點性能

  3. 代碼(3)for (Node<E> t = tail, p = t;;)從隊列尾部開始自旋循環,保證從隊列尾部添加新的結點測試

  4. 得到tailnext結點(q),此時的隊列狀況以下圖所示(默認構造方法中將head和tail都指向的是一個item爲null的結點)。此時的q指向的是null

  1. 代碼(4)if (q == null)處執行判斷q==null爲true

  2. 代碼(5)if (p.casNext(null, newNode))處執行的是將p的next結以CAS方式更新爲咱們建立的newNode。(其中CAS會判斷p的next是否爲null,爲null才更新爲newNode

  3. 此時的p==t,因此不會執行更新tail的代碼塊(6)casTail(t, newNode),而是從offer方法退出。這時候隊列狀況以下所示

  1. 那麼這一個線程執行完,但是tail尚未改變呢:實際上第二次進行offer的時候,會發現p=tail,p.next!=null,就會執行代碼(8)p = (p != t && t != (t = tail)) ? t : q,簡單分析一下:

    • p != t:p爲tail,t爲tail,因此爲false
    • t != (t = tail):顯然也是false
  2. 因此結果就是p=q,而後進行下一次循環,以後判斷的p.next就是null,因此能夠CAS成功,也由於p!=t,因此會更新tail結點。

​ 因此上面給的結論在這裏就體現了,即tail並不老是指向隊列的尾節點,那麼實際上也能夠換種方式讓tail指向尾節點,即以下這樣實現

if (e == null)
    throw new NullPointerException();
Node<E> n = new Node<E>(e);
for (;;) {
    Node<E> t = tail;
    if (t.casNext(null, n) && casTail(t, n)) {
        return true;
    }
}
複製代碼

​ 可是若是大量的入隊操做,那麼每次都須要以CAS方式更新tail指向的結點,當數據量很大的時候對性能的影響是很大的。因此最終實現上,是以減小CAS操做來提升大數量的入隊操做的性能:每間隔1次(tail指向和真正的尾節點之間差1)進行CAS操做更新tail指向尾節點(可是距離越長帶來的負面效果就是每次入隊時定位尾節點的時間就越長,由於循環體須要多循環一次來定位出尾節點(將指向真正的尾節點,而後添加newNode))。其實在前面分析成員屬性時候也知道了,tail是被volatile修飾的,而CAS方式本質上仍是對於volatile變量的讀寫操做,而volatile的寫操做開銷大於讀操做的,因此Concurrent Linked Queue的是線上是經過增長對於volatile變量的讀操做次數從而相對的減小對其寫操做。下面是單線程執行offer方法的時候tail指向的變化簡圖示意

多線程執行

​ 上面演示的單個線程的執行,那麼當在多線程環境下執行的話會發生什麼狀況,這裏假設兩個線程併發的執行.

狀況1

這裏分析的其實就是假設多個線程都會執行到CAS更新p.next結點的代碼,咱們下面看一下,假設threadA調用offer(item1),threadB調用offer(item2)都執行到p.casNext(null, newNode)位置處

  • CAS操做的原子性,假設threadA先執行了上面那行代碼,併成功更新了p.next爲newNode
  • 這時候threadB天然在進行CAS比較的時候就會失敗了(p.next!=null),因此會進行下一次循環從新獲取tail結點而後嘗試更新

這時候的隊列狀況以下

  • threadB得到tail結點以後,發現其q!=nullq=p.next,p=tail

  • 繼續判斷p==q也是false,因此執行代碼(8)

  • 分析一下p = (p != t && t != (t = tail)) ? t : q這個代碼

    1. p != t:p爲tail,t爲tail,因此爲false
    2. t != (t = tail):顯然也是false
    3. 因此上面三目運算的結果就是p=q,以下圖所示結果

  • 而後再次執行循環,這時候p.next就是null了,因此能夠執行代碼(5)p.casNext(null,newNode)。這個時候CAS判斷獲得p.next == null,因此能夠設置p.next=Node(item2)

  • CAS成功後,判斷p!=t(如上圖所示),因此就能夠設置tail爲Node(item2)了。而後從offer退出,這個時候隊列狀況爲

​ 能夠看出,狀況1中假設兩個線程初始時候都拿到的是p=tail,p.next=null,那麼都會執行CAS嘗試添加newNode,可是隻有一個線程可以在第一次循環的時候添加成功而後返回true(可是這時候的tail尚未變化,相似單線程總結那塊的tail和真正的尾節點差1或0),因此另外一個線程會在第二次循環中從新嘗試,這個時候就會改變p的指向,即p = (p != t && t != (t = tail)) ? t : q代碼處。而後再第三次循環中才能真正CAS添加成功(固然咱們這裏分析的是假想的兩個線程狀況,實際多線程環境確定更復雜,可是邏輯仍是差很少的)

狀況2

​ 這裏分析的是主要是代碼p = (p != t && t != (t = tail)) ? t : q的另外一種狀況,即p=t的狀況,仍是先分析一下這行,假設如今

  • p != t爲true,
  • t != (t = tail) : 也爲true(左邊的t是再循環開始的時候得到的指向tail的信息,括號中從新得到tail並賦值給t,這個時候有可能別的線程已經更改了 volatile修飾的tail了)

​ 那麼結果就是p 從新指向隊列的尾節點tail了,下面假想一種這樣的狀況

​ 實際上這種是利用volatile的可見性快速將一個要添加元素的線程找到當前隊列的尾節點,避免多餘的循環。 如圖,假設threadA此時讀取了變量tail,threadB恰好在這個時候添加若干Node後,此時會修改tail指針,那麼這個時候線程A再次執行t=tail時t會指向另一個節點,因此threadA先後兩次讀取的變量t指向的節點不相同,即t != (t = tail)爲true,而且因爲t指向節點的變化p != t也爲true,此時該行代碼的執行結果爲p和t最新的t指針指向了同一個節點,而且此時t也是隊列真正的尾節點。那麼,如今已經定位到隊列真正的隊尾節點,就能夠執行offer操做了。

狀況3

​ 上面咱們討論的都是多線程去添加元素的操做,那麼當既有線程offer也有線程調用poll方法的時候呢,這裏就要調用offer方法中的代碼塊(7)了。由於尚未說到poll方法,因此這裏的代碼就先不作解釋,下面講poll方法在多線程中的執行的時候,會拿offer-poll-offer這種狀況進行說明,那麼offer方法就可能執行這幾行代碼了。

else if (p == q)
    //(7)多線程操做的時候,可能會有別的線程使用poll方法移除元素後可能會把head的next變成head,因此這裏須要找到新的head
    p = (t != (t = tail)) ? t : head;
複製代碼

add方法

public boolean add(E e) {
    return offer(e);//這裏仍是調用的offer方法,上面說到了,這裏就不說明了
}
複製代碼

poll方法

​ poll方法是在隊列頭部獲取並移除一個元素,若是隊列爲空就返回null,下面先看下poll方法的源碼,而後仍是分別分析單線程和多線程下的執行

public E poll() {
    //標記
    restartFromHead:
    for (;;) {//自旋循環
        for (Node<E> h = head, p = h, q;;) {
            //(1)保存當前結點的item
            E item = p.item;
            //(2)若是當前結點的值不爲null,那就將其變爲null
            if (item != null && p.casItem(item, null)) {
                //(3)CAS成功以後會標記當前結點,並從鏈表中移除
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            //(4)若是隊列爲空會返回null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //(5)若是當前結點被自引用了,從新找尋新的隊列頭節點
            else if (p == q)
                continue restartFromHead;
            else
                p = q; //進行下一次循環,改變p的指向位置
        }
    }
}
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}
複製代碼

上面咱們已經看了poll方法的源碼,下面咱們就按照這個方法的實現經過圖示的方式來理解一下。

單線程執行

​ poll操做是從隊頭獲取元素,因此:

  • 從head結點開始循環,首先for (Node<E> h = head, p = h, q;;)得到當前隊列的頭節點,固然若是隊列一開始就爲空的時候,就以下所示

​ 因爲head結點是做爲哨兵結點存在的,因此會執行到代碼(4)else if ((q = p.next) == null),由於隊列爲空,因此直接執行updateHead(h, p),而updateHead方法中判斷的h=p,因此直接返回null。

  • 上面是隊列爲空的狀況 ,那麼當隊列不爲空的時候呢,假設如今隊列狀況以下所示

  • 因此在代碼(4)else if ((q = p.next) == null)處的判斷結果是false,

  • 因此執行下一個判斷else if (p == q),判斷結果仍是false

  • 最後執行p=q,完了以後下一次循環隊列狀態爲

  • 在新的一次循環中,能夠判斷獲得item!=null,因此使用CAS方式將item設置爲null,(這是單線程狀況下的測試)因此繼續執行if(p!=h),判斷結果爲true。因此執行if中的內容:updateHead(h, ((q = p.next) != null) ? q : p),什麼意思呢?以下所示,因此咱們這裏的結果就是q=null,因此傳入的參數爲p(p指向的位置如上圖所示)

    //updateHead方法的參數(Node h,Node p)
    q = p.next;
    if(null != q) {
    	//第二個參數就是q
    } else {
        //第二個參數就是p
    }
    複製代碼

    而後執行updateHead方法,這裏咱們須要再看一下該方法的細節

    final void updateHead(Node<E> h, Node<E> p) {
        //若是h!=p,就以CAS的方式將head結點設置爲p
        if (h != p && casHead(h, p))
            //這裏是將h結點的next結點設置爲本身(h)
            h.lazySetNext(h);
    }
    //Node類中的方法
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    複製代碼

    那麼執行完這些以後,隊列中狀態是什麼樣呢,以下圖所示。執行完畢就返回被移除的元素怒item1

多線程執行offer、poll

​ 上面分析了單線程下,調用poll方法的執行流程。其實剛剛再將offer方法的時候還有一個坑沒有解決。以下描述的狀況

  • 假設原有隊列中有一個元素item1

  • 假設在thread1調用offer方法的時候,別的線程恰好調用poll方法將head結點移除了,按照上面的分析,poll方法調用後隊列的狀況以下

  • (這裏回憶一下offer的執行流程)因此在thread1繼續執行的時候,執行的for (Node<E> t = tail, p = t;;)以後得到tail指向的位置如上圖所示,可是這個tail指向的結點的next指針指向的位置仍是本身。因此Node<E> q = p.next執行以後q=tail=p。因此在offer方法中就會執行如下判斷

    else if (p == q)
        //(7)多線程操做的時候,可能會有別的線程使用poll方法移除元素後可能會把head的next變成head,因此這裏須要找到新的head
        p = (t != (t = tail)) ? t : head;
    複製代碼

    仍是簡單分析一下p = (t != (t = tail)) ? t : head這句,以下所示。簡單分析以後就能得出,p指向了poll方法調用完畢後的新的head結點(如上圖所示的head結點),而後調用offer的線程就能正常的添加結點了,具體流程仍是和上面講到的同樣。(那這個tail又在何時被指向隊尾結點呢,實際上在調用offer方法添加完元素以後p.casNext(null, newNode),就會判斷得出p != t,那完了以後就會更新tail指向的位置了)

    //在最開始時候得到的t=tail
    t=tail; //for循環中賦值t
    //...offer的其餘代碼
    if(t != (t = tail)) { //這裏仍是同樣:tail爲volatile修飾,因此從新讀取tail變量
        p = t; //這裏表示tail結點不變(按照上圖poll執行完後的狀況,tail指向位置沒有變化,因此p不會被賦值爲t)
    } else {
        p = head; //注意這時候的head已經指向的新的首結點
    }
    複製代碼

多線程執行poll、poll

​ 分析這麼多,咱們發現跟offer方法留坑同樣,poll還有一處代碼尚未分析,因此下面仍是經過圖示進行分析,先看下這個代碼框架。

//標記
restartFromHead:
for (;;) {//自旋循環
    for (Node<E> h = head, p = h, q;;) {
        //...other code
        //這是自旋循環體中的一個判斷
        else if (p == q)
            continue restartFromHead;
    }
}
複製代碼

​ 仍是假設如今兩個線程去執行poll方法,

  • 初始狀況下的隊列狀態爲

  • 假設threadA執行poll方法,併成功的執行if (item != null && p.casItem(item, null))這塊,將item1設置爲了null,以下圖所示。

  • 可是threadA尚未執行updateHead方法,這個時候threadB執行poll以後,p指向了上圖中的head,以下所示

  • 以後threadA執行updateHead方法更新了head的指向,並將原head的next結點指向本身.那麼線程B執行q=p.next,天然獲得的就是p==q的結果了,因此這個時候就須要跳到外層循環從新獲取最新的head結點,而後繼續執行

poll方法總結

​ poll方法在移除頭部元素的時候,使用CAS操做將頭節點的item設置爲了null,而後經過沖洗設置頭節點head的指向位置來達到刪除隊列元素的效果。這個時候原來的頭部哨兵結點就是一個孤立的結點了,會被回收掉。固然,若是線程執行poll方法的時候發現head結點被修改(上面說的這種狀況),就須要跳轉到最外層循環從新獲取新的結點。

peek方法

​ 獲取隊列頭部的第一個元素但不刪除,若是隊列爲空則返回null。下面是該方法的實現

public E peek() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
複製代碼

​ 須要注意的是,第一次調用peek方法的時候會刪除哨兵結點,並讓隊列中的head結點指向隊列中的第一個元素或者null.

size方法

​ 計算當前隊列元素個數,可是由於使用的是CAS的方式在併發環境下可能由於別的線程刪除或者增長元素致使計算結果不許確。

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
//找到隊列中的第一個元素(head指向的item爲null的結點不算(就是哨兵結點)),
//沒有則返回null
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
複製代碼

remove方法

​ 傳入的參數爲要刪除的元素,若是隊列中存在該元素就刪除找到的第一個,而後返回true,不然返回false

public boolean remove(Object o) {
    if (o != null) { //若是傳入參數爲null,直接返回false
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            //找到相等的就使用cas設置爲null,只有一個線程操做成功
            //別的循環查找是否又別的匹配的obj
            if (item != null) {
                if (!o.equals(item)) {
                    //獲取next元素
                    next = succ(p);
                    continue;
                }
                removed = p.casItem(item, null);
            }

            next = succ(p);
            if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}
複製代碼

參考自《Java併發編程的藝術》

相關文章
相關標籤/搜索