Java多線程進階(二九)—— J.U.C之collections框架:ConcurrentLinkedQueue

圖片描述

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、ConcurrentLinkedQueue簡介

ConcurrentLinkedQueue是JDK1.5時隨着J.U.C一塊兒引入的一個支持併發環境的隊列。從名字就能夠看出來,ConcurrentLinkedQueue底層是基於鏈表實現的。java

Doug Lea在實現ConcurrentLinkedQueue時,並無利用鎖或底層同步原語,而是徹底基於自旋+CAS的方式實現了該隊列。回想一下AQS,AQS內部的CLH等待隊列也是利用了這種方式。node

因爲是徹底基於無鎖算法實現的,因此當出現多個線程同時進行修改隊列的操做(好比同時入隊),極可能出現CAS修改失敗的狀況,那麼失敗的線程會進入下一次自旋,再嘗試入隊操做,直到成功。因此,在併發量適中的狀況下,ConcurrentLinkedQueue通常具備較好的性能。算法

clipboard.png

2、ConcurrentLinkedQueue原理

隊列結構

咱們來看下ConcurrentLinkedQueue的內部結構,:segmentfault

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
    implements Queue<E>, java.io.Serializable {
 
    /**
     * 隊列頭指針
     */
    private transient volatile Node<E> head;
 
    /**
     * 隊列尾指針.
     */
    private transient volatile Node<E> tail;
 
    // Unsafe mechanics
     
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
     
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
    /**
     * 隊列結點定義
     */
    private static class Node<E> {
        volatile E item;        // 元素值
        volatile Node<E> next;  // 後驅指針
 
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
 
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
 
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
 
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
 
        // Unsafe mechanics
 
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
 
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
 
    //...
}

能夠看到,ConcurrentLinkedQueue內部就是一個簡單的單鏈表結構,每入隊一個元素就是插入一個Node類型的結點。字段head指向隊列頭,tail指向隊列尾,經過Unsafe來CAS操做字段值以及Node對象的字段值。併發

clipboard.png


構造器定義

ConcurrentLinkedQueue包含兩種構造器:性能

/**
 * 構建一個空隊列(head,tail均指向一個佔位結點).
 */
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
/**
 * 根據已有集合,構造隊列
 */
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

咱們重點看下空構造器,經過空構造器創建的ConcurrentLinkedQueue對象,其headtail指針並不是指向null,而是指向一個item值爲null的Node結點——哨兵結點,以下圖:
clipboard.pngthis


入隊操做

元素的入隊是在隊尾插入元素,關於隊列的操做,若是讀者不熟悉,能夠參考《Algorithms 4th》或個人這篇博文:https://www.jianshu.com/p/f9b...spa

ConcurrentLinkedQueue的入隊代碼很簡單,卻很是精妙:線程

/**
 * 入隊一個元素.
 *
 * @throws NullPointerException 元素不能爲null
 */
public boolean add(E e) {
    return offer(e);
}
/**
 * 在隊尾入隊元素e, 直到成功
 */
public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    for (Node<E> t = tail, p = t; ; ) {             // 自旋, 直到插入結點成功
        Node<E> q = p.next;
        if (q == null) {                            // CASE1: 正常狀況下, 新結點直接插入到隊尾
            if (p.casNext(null, newNode)) {
                // CAS競爭插入成功
                if (p != t)                         // CAS競爭失敗的線程會在下一次自旋中進入該邏輯
                    casTail(t, newNode);            // 從新設置隊尾指針tail
                return true;
            }
            // CAS競爭插入失敗,則進入下一次自旋
 
        } else if (p == q)                          // CASE2: 發生了出隊操做
            p = (t != (t = tail)) ? t : head;
        else
            // 將p從新指向隊尾結點
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

咱們來分析下上面offer方法的實現。單線程的狀況下,元素入隊比較好理解,直接線性地在隊首插入元素便可。如今咱們假設有兩個線程ThreadAThreadB同時進行入隊操做:3d

①ThreadA先單獨入隊兩個元素九、2

此時隊列的結構以下:
clipboard.png


②ThreadA入隊元素「10」,ThreadB入隊元素「25」

此時ThreadA和ThreadB若併發執行,咱們看下會發生什麼:

一、ThreadA和ThreadB同時進入自旋中的如下代碼塊:

if (q == null) {                            // CASE1: 正常狀況下, 新結點直接插入到隊尾
    if (p.casNext(null, newNode)) {
        // CAS競爭插入成功
        if (p != t)                         // CAS競爭失敗的線程會在下一次自旋中進入該邏輯
            casTail(t, newNode);            // 從新設置隊尾指針tail
        return true;
    }
    // CAS競爭插入失敗,則進入下一次自旋
 
}

二、ThreadA執行cas操做(p.casNext)成功,插入新結點「10」

ThreadA執行完成後,直接返回true,隊列結構以下:
clipboard.png

三、ThreadB執行cas操做(p.casNext)失敗

因爲CAS操做同時修改隊尾元素,致使ThreadB操做失敗,則ThreadB進入下一次自旋;
在下一次自旋中,進入如下代碼塊:

else
    // 將p從新指向隊尾結點
    p = (p != t && t != (t = tail)) ? t : q;

上述分支的做用就是讓p指針從新定位到隊尾結點,此時隊列結構以下:
clipboard.png

而後ThreadB會繼續下一次自旋,並再次進入如下代碼塊:

if (q == null) {                            // CASE1: 正常狀況下, 新結點直接插入到隊尾
    if (p.casNext(null, newNode)) {
        // CAS競爭插入成功
        if (p != t)                         // CAS競爭失敗的線程會在下一次自旋中進入該邏輯
            casTail(t, newNode);            // 從新設置隊尾指針tail
        return true;
    }
    // CAS競爭插入失敗,則進入下一次自旋
 
}

此時,CAS操做成功,隊列結構以下:
clipboard.png

因爲此時p!=t ,因此會調用casTail方法從新設置隊尾指針:

casTail(t, newNode);            // 從新設置隊尾指針tail

最終隊列以下:
clipboard.png

從上面的分析過程能夠看到,因爲入隊元素必定是要連接到隊尾的,但併發狀況下隊尾結點可能隨時變化,因此就須要指針定位最新的隊尾結點,並在入隊時判斷隊尾結點是否改變了,若是改變了,就須要從新設置定位指針,而後在下一次自旋中繼續嘗試入隊操做。

上面整個執行步驟有一段分支尚未覆蓋到:

else if (p == q)                          // CASE2: 發生了出隊操做
     p = (t != (t = tail)) ? t : head;

這個分支只有在元素入隊的同時,針對該元素也發生了「出隊」操做纔會執行,咱們後面會分析元素的「出隊」,理解了「出隊」操做再回頭來看這個分支就容易理解不少了。


出隊操做

隊列中元素的「出隊」是從隊首移除元素,咱們來看下ConcurrentLinkedQueue是如何實現出隊的:

/**
 * 在隊首出隊元素, 直到成功
 */
public E poll() {
    restartFromHead:
    for (; ; ) {
        for (Node<E> h = head, p = h, q; ; ) {
            E item = p.item;
 
            if (item != null && p.casItem(item, null)) {    // CASE2: 隊首是非哨兵結點(item!=null)
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            } else if ((q = p.next) == null) {      // CASE1: 隊首是一個哨兵結點(item==null)
                updateHead(h, p);
                return null;
            } else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

仍是經過示例來看,假設初始的隊列結構以下:
clipboard.png

①ThreadA先單獨進行出隊操做

因爲head所指的是item==null的結點,因此ThreadA會執行如下分支:

else
   p = q;

clipboard.png

而後進入下一次自旋,在自旋中執行如下分支,若是CAS操做成功,則移除首個有效元素,並從新設置頭指針:

if (item != null && p.casItem(item, null)) {    // CASE2: 隊首是非哨兵結點(item!=null)
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

此時隊列的結構以下:
clipboard.png

若是ThreadA的CAS操做失敗呢?

CAS操做失敗則會進入如下分支,並從新開始自旋:

else if (p == q)
    continue restartFromHead;

最終前面兩個null結點會被GC回收,隊列結構以下:
clipboard.png


②ThreadA繼續進行出隊操做

ThreadA繼續執行「出隊」操做,仍是執行如下分支:

if (item != null && p.casItem(item, null)) {    // CASE2: 隊首是非哨兵結點(item!=null)
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

可是此時p==h,因此僅將頭結點置null,這實際上是一種「懶刪除」的策略。

出隊元素「2」:
clipboard.png

出隊元素「10」:
clipboard.png

最終隊列結果以下:
clipboard.png


③ThreadA進行出隊,其它線程進行入隊

這是最特殊的一種狀況,當隊列中只剩下一個元素時,若是同時發生出隊和入隊操做,會致使隊列出現下面這種結構:(假設ThreadA進行出隊元素「25」,ThreadB進行入隊元素「11」)

clipboard.png

此時tail.next=tail自身,因此ThreadB在執行入隊時,會進入到offer方法的如下分支:

else if (p == q)                          // CASE2: 發生了出隊操做
    p = (t != (t = tail)) ? t : head;

3、總結

ConcurrentLinkedQueue使用了自旋+CAS的非阻塞算法來保證線程併發訪問時的數據一致性。因爲隊列自己是一種鏈表結構,因此雖然算法看起來很簡單,但其實須要考慮各類併發的狀況,實現複雜度較高,而且ConcurrentLinkedQueue不具有實時的數據一致性,實際運用中,隊列通常在生產者-消費者的場景下使用得較多,因此ConcurrentLinkedQueue的使用場景並不如阻塞隊列那麼多。

另外,關於ConcurrentLinkedQueue還有如下須要注意的幾點:

  1. ConcurrentLinkedQueue的迭代器是弱一致性的,這在併發容器中是比較廣泛的現象,主要是指在一個線程在遍歷隊列結點而另外一個線程嘗試對某個隊列結點進行修改的話不會拋出ConcurrentModificationException,這也就形成在遍歷某個還沒有被修改的結點時,在next方法返回時能夠看到該結點的修改,但在遍歷後再對該結點修改時就看不到這種變化。
  2. size方法須要遍歷鏈表,因此在併發狀況下,其結果不必定是準確的,只能供參考。
相關文章
相關標籤/搜索