Java併發編程筆記之ConcurrentLinkedQueue源碼探究

JDK 中基於鏈表的非阻塞無界隊列 ConcurrentLinkedQueue 原理剖析,ConcurrentLinkedQueue 內部是如何使用 CAS 非阻塞算法來保證多線程下入隊出隊操做的線程安全?算法

ConcurrentLinkedQueue是線程安全的無界非阻塞隊列,其底層數據結構是使用單向鏈表實現,入隊和出隊操做是使用咱們常常提到的CAS來保證線程安全的。安全

咱們首先看一下ConcurrentLinkedQueue的類圖結構先,好有一個內部邏輯有一個大概的印象,以下圖所示:數據結構

能夠清楚的看到ConcurrentLinkedQueue內部的隊列是使用單向鏈表方式實現,類中兩個volatile 類型的Node 節點分別用來存放隊列的首位節點。多線程

首先咱們先來看一下ConcurrentLinkedQueue的構造函數,以下:併發

public ConcurrentLinkedQueue() {
   head = tail = new Node<E>(null);
}

經過無參構造函數可知默認頭尾節點都是指向 item 爲 null 的哨兵節點。函數

Node節點內部則維護一個volatile 修飾的變量item 用來存放節點的值,next用來存放鏈表的下一個節點,從而連接爲一個單向無界鏈表,這就是單向無界的根本緣由。以下圖:spa

 

接下來看ConcurrentLinkedQueue 主要關注入隊,出隊,獲取隊列元素的方法的源碼,以下所示:線程

1.首先看入隊方法offer,offer 操做是在隊列末尾添加一個元素,若是傳遞的參數是 null 則拋出 NPE 異常,否者因爲 ConcurrentLinkedQueue 是無界隊列該方法一直會返回 true。另外因爲使用 CAS 無阻塞算法,該方法不會阻塞調用線程,其源碼以下:3d

 

public boolean offer(E e) {
    //(1)e爲null則拋出空指針異常
    checkNotNull(e);

   //(2)構造Node節點
    final Node<E> newNode = new Node<E>(e);

    //(3)從尾節點進行插入
    for (Node<E> t = tail, p = t;;) {

        Node<E> q = p.next;

        //(4)若是q==null說明p是尾節點,則執行插入
        if (q == null) {

            //(5)使用CAS設置p節點的next節點
            if (p.casNext(null, newNode)) {
                //(6)cas成功,則說明新增節點已經被放入鏈表,而後設置當前尾節點
                if (p != t)
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
        }
        else if (p == q)//(7)
            //多線程操做時候,因爲poll操做移除元素後有可能會把head變爲自引用,而後head的next變爲新head,因此這裏須要
            //從新找新的head,由於新的head後面的節點纔是正常的節點。
            p = (t != (t = tail)) ? t : head;
        else
            //(8) 尋找尾節點
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

 類圖結構時候談到構造隊列時候參構造函數建立了一個 item 爲 null 的哨兵節點,而且 head 和 tail 都是指向這個節點,下面經過圖形結合來說解下 offer 操做的代碼實現。指針

  1.首先看一下,當一個線程調用offer(item)時候狀況:首先代碼(1)對傳參判斷空檢查,若是爲null 則拋出空指針異常,而後代碼(2)則使用item做爲構造函數參數建立一個新的節點,

代碼(3)從隊列尾部節點開始循環,目的是從隊列尾部添加元素。以下圖:

 

上圖是執行代碼(4)時候隊列的狀況,這時候節點 p , t ,head ,tail 同時指向了item爲null的哨兵節點,因爲哨兵節點的next節點爲null,因此這裏q指向也是null。

代碼(4)發現q==null  則執行代碼(5),經過CAS原子操做判斷p 節點的next節點是否爲null,若是爲null 則使用節點newNode替換p 的next節點,

而後執行代碼(6),因爲 p == t ,因此沒有設置尾部節點,而後退出offer方法,這時候隊列的狀態圖以下:

 

上面講解的是一個線程調用offer方法的狀況下,若是多個線程同時調用,就會存在多個線程同時執行到代碼(5),假設線程A調用offer(item1),

線程B調用offer(item2),線程 A 和線程B同時到 p.casNext(null,newNode)。而CAS的比較並設置操做是原子性的,假設線程A先執行了比較設置操做,

則發現當前P的next節點確實是null ,則會原子性更新next節點爲newNode,這時候線程B 也會判斷p 的next節點是否爲null,結果發現不是null,(由於線程 A 已經設置了 p 的 next 爲 newNode)則會跳到代碼(3),

而後執行到代碼(4)的時候的隊列分佈圖以下:

 根據這個狀態圖可知線程B會執行代碼(8),而後q 賦值給了p,這個時候狀態圖爲:

而後線程B再次跳轉到代碼(3)執行,當執行到代碼(4)時候隊列狀態圖爲:

因爲這時候q == null ,因此線程B 會執行步驟(5),經過CAS操做判斷 當前p的next 節點是否爲null ,不是則再次循環後嘗試,是則使用newNode替換,假設CAS成功了,那麼執行步驟(6),

因爲 p != t 因此設置tail節點爲newNode ,而後退出offer方法。這時候隊列的狀態圖爲:

到如今爲止,offer代碼在執行路徑如今就差步驟(7)尚未執行過,其實這個要在執行poll操做纔會出現的,這裏先看一下執行poll操做後可能會存在的一種狀況,以下圖所示:

下面分析下當隊列處於這種狀態調用offer添加元素代碼執行到代碼(4)的時候的隊列狀態圖,以下:

因爲q節點不爲空而且p==q 因此執行代碼(7),由於 t == tail因此p 被賦值爲head ,而後進入循環,循環後執行到代碼(4)的時候的隊列狀態圖,以下:

因爲 q ==null,因此執行代碼(5),進行CAS操做,若是當前沒有其餘線程執行offer操做,則CAS操做會成功,p的next節點被設置爲新增節點,而後執行代碼(6),

因爲p != t 因此設置新節點爲隊列尾節點,如今隊列狀態圖,以下:

在這裏的自引用的節點會被垃圾回收掉,可見offer操做裏面關鍵步驟是代碼(5)經過原子CAS操做來進行控制同時只有一個線程能夠追加元素到隊列末尾,進行cas競爭失敗的線程,

則會經過循環一次次嘗試進行cas操做,知道cas成功纔會返回,也就是經過使用無限循環裏面不斷進行CAS嘗試方式來替代阻塞算法掛起調用線程,相比阻塞算法,這是使用CPU資源換取阻塞帶來的開銷。

 

  2.poll操做,poll 操做是在隊列頭部獲取而且移除一個元素,若是隊列爲空則返回 null,咱們首先看改方法的源碼,以下:

public E poll() {
    //(1) goto標記
    restartFromHead:

    //(2)無限循環
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {

            //(3)保存當前節點值
            E item = p.item;

            //(4)當前節點有值則cas變爲null
            if (item != null && p.casItem(item, null)) {
                //(5)cas成功標誌當前節點以及從鏈表中移除
                if (p != h) 
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            //(6)當前隊列爲空則返回null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //(7)自引用了,則從新找新的隊列頭節點
            else if (p == q)
                continue restartFromHead;
            else//(8)
                p = q;
        }
    }
 }
  final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

poll操做是從隊頭獲取元素,因此代碼(2)內層循環是從head節點開始迭代,代碼(3)獲取當前隊頭的節點,當隊列一開始爲空的時候隊列狀態爲:

因爲head 節點指向的item 爲null 的哨兵節點,因此會執行到代碼(6),假設這個過程沒有線程調用offer,則此時q等於null  ,以下圖:

因此執行updateHead方法,因爲h 等於 p因此沒有設置頭節點,poll方法直接返回null。

假設執行到代碼(6)的時候已經有其餘線程調用了offer 方法成功添加了一個元素到隊列,這時候q執行的是新增元素的節點,這時候隊列狀態圖爲:

因此代碼(6)判斷結果爲false,而後會轉向代碼(7)執行,而此時p不等於q,因此轉向代碼(8)執行,執行結果是p指向了節點q,此時的隊列狀態以下:

而後程序轉向代碼(3)執行,p如今指向的元素值不爲null,則執行p.casItem(item, null) 經過 CAS 操做嘗試設置 p 的 item 值爲 null,

若是此時沒有其餘線程進行poll操做,CAS成功則執行代碼(5),因爲此時 p != h ,因此設置頭節點爲p,poll而後返回被從隊列移除的節點值item。此時隊列狀態爲:

這個狀態就是前面提到offer操做的時候,offer代碼的執行路徑(7)執行的前提狀態。

假如如今一個線程調用了poll操做,則在執行代碼(4)的時候的隊列狀態爲:

能夠看到這時候執行代碼(6)返回null。

如今poll的代碼還有個分支(7)尚未被執行過,那麼何時會執行呢?假設線程A執行poll操做的時候,當前的隊列狀態,以下:

那麼執行p.casItem(item, null) 經過 CAS 操做嘗試設置 p 的 item 值爲 null。

假設 CAS 設置成功則標示該節點從隊列中移除了,此時隊列狀態爲:

而後因爲p != h,因此會執行updateHead 方法,假如線程A執行updateHead前,另一個線程B開始poll操做,這時候線程B的p指向head節點,

可是尚未執行到代碼(6),這時候隊列狀態爲:

而後線程A執行 updateHead 操做,執行完畢後線程 A 退出,這時候隊列狀態爲:

而後線程B繼續執行代碼(6)q=p.next因爲該節點是自引用節點因此p==q,因此會執行代碼(7)跳到外層循環restartFromHead,從新獲取當前隊列隊頭 head, 如今狀態爲:

 

總結:poll元素移除一個 元素的時候,只是簡單的使用CAS操做把當前節點的item值設置爲null,而後經過從新設置頭節點讓該元素從隊列裏面摘除,

被摘除的節點就成了孤立節點,這個節點會被在GC的時候會被回收掉。另外,執行分支中若是發現頭節點被修改了要跳到外層循環從新獲取新的頭節點。

 

  3.peek操做,peek 操做是獲取隊列頭部一個元素(只不獲取不移除),若是隊列爲空則返回 null,其源碼以下:

public E peek() {
   //(1)
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //(2)
            E item = p.item;
            //(3)
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            //(4)
            else if (p == q)
                continue restartFromHead;
            else
            //(5)
                p = q;
        }
    }
}

代碼結構與poll操做相似,不一樣於代碼(3)的使用只是少了castItem 操做,其實這很正常,由於peek只是獲取隊列頭元素值,並不清空其值,

根據前面咱們知道第一次執行 offer 後 head 指向的是哨兵節點(也就是 item 爲 null 的節點),那麼第一次peek的時候,代碼(3)中會發現item==null,

而後會執行 q = p.next, 這時候 q 節點指向的纔是隊列裏面第一個真正的元素或者若是隊列爲 null 則 q 指向 null。

 

當隊列爲空的時候,隊列狀態圖,以下:

這時候執行updateHead 因爲 h 節點等於 p 節點因此不進行任何操做,而後 peek 操做會返回 null。

當隊列中至少有一個元素的時候(假如只有一個),這時候隊列狀態爲:

這時候執行代碼(5)這時候 p 指向了 q 節點,而後執行代碼(3)這時候隊列狀態爲:

執行代碼(3)發現 item 不爲 null,則執行 updateHead 方法,因爲 h!=p, 因此設置頭結點,設置後隊列狀態爲:

能夠看到其實就是剔除了哨兵節點。

 

總結:peek操做代碼與poll操做相似,只是前者只獲取隊列頭元素,可是並不從隊列裏面刪除,然後者獲取後須要從隊列裏面刪除,另外,在第一次調用peek操做的時候,

會刪除哨兵節點,並讓隊列的head節點指向隊列裏面第一個元素或者null。

 

  4.size方法,獲取當前隊列元素個數,在併發環境下不是頗有用,由於 CAS 沒有加鎖因此從調用 size 函數到返回結果期間有可能增刪元素,致使統計的元素個數不精確。源碼以下:

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // 最大返回Integer.MAX_VALUE
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
//獲取第一個隊列元素(哨兵元素不算),沒有則爲null
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
//獲取當前節點的next元素,若是是自引入節點則返回真正頭節點
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

 

  5.remove方法,若是隊列裏面存在該元素則刪除給元素,若是存在多個則刪除第一個,並返回 true,否者返回 false。源碼以下:

public boolean remove(Object o) {

    //查找元素爲空,直接返回false
    if (o == null) return false;
    Node<E> pred = null;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;

        //相等則使用cas值null,同時一個線程成功,失敗的線程循環查找隊列中其它元素是否有匹配的。
        if (item != null &&
            o.equals(item) &&
            p.casItem(item, null)) {

            //獲取next元素
            Node<E> next = succ(p);

            //若是有前驅節點,而且next不爲空則連接前驅節點到next,
            if (pred != null && next != null)
                pred.casNext(p, next);
            return true;
        }
        pred = p;
    }
    return false;
}

 

ConcurrentLinkedQueue 底層使用單向鏈表數據結構來保存隊列元素,每一個元素被包裝爲了一個 Node 節點,隊列是靠頭尾節點來維護的,建立隊列時候頭尾節點指向一個 item 爲 null 的哨兵節點,

第一次 peek 或者 first 時候會把 head 指向第一個真正的隊列元素。因爲使用非阻塞 CAS 算法,沒有加鎖,因此獲取 size 的時候有可能進行了 offer,poll 或者 remove 操做,致使獲取的元素個數不精確,因此在併發狀況下 size 函數不是頗有用。

 

  • JDK 中基於鏈表的非阻塞無界隊列 ConcurrentLinkedQueue 原理剖析,ConcurrentLinkedQueue 內部是如何使用 CAS 非阻塞算法來保證多線程下入隊出隊操做的線程安全?
相關文章
相關標籤/搜索