本文首發於一世流雲專欄: https://segmentfault.com/blog...
ConcurrentLinkedQueue是JDK1.5時隨着J.U.C一塊兒引入的一個支持併發環境的隊列。從名字就能夠看出來,ConcurrentLinkedQueue底層是基於鏈表實現的。java
Doug Lea在實現ConcurrentLinkedQueue時,並無利用鎖或底層同步原語,而是徹底基於自旋+CAS的方式實現了該隊列。回想一下AQS,AQS內部的CLH等待隊列也是利用了這種方式。node
因爲是徹底基於無鎖算法實現的,因此當出現多個線程同時進行修改隊列的操做(好比同時入隊),極可能出現CAS修改失敗的狀況,那麼失敗的線程會進入下一次自旋,再嘗試入隊操做,直到成功。因此,在併發量適中的狀況下,ConcurrentLinkedQueue通常具備較好的性能。算法
咱們來看下ConcurrentLinkedQueue的內部結構,:segmentfault
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { /** * 隊列頭指針 */ private transient volatile Node<E> head; /** * 隊列尾指針. */ private transient volatile Node<E> tail; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentLinkedQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } } /** * 隊列結點定義 */ private static class Node<E> { volatile E item; // 元素值 volatile Node<E> next; // 後驅指針 Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } //... }
能夠看到,ConcurrentLinkedQueue內部就是一個簡單的單鏈表結構,每入隊一個元素就是插入一個Node類型的結點。字段head
指向隊列頭,tail
指向隊列尾,經過Unsafe
來CAS操做字段值以及Node對象的字段值。併發
ConcurrentLinkedQueue包含兩種構造器:性能
/** * 構建一個空隊列(head,tail均指向一個佔位結點). */ public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
/** * 根據已有集合,構造隊列 */ public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); t = newNode; } } if (h == null) h = t = new Node<E>(null); head = h; tail = t; }
咱們重點看下空構造器,經過空構造器創建的ConcurrentLinkedQueue對象,其head
和tail
指針並不是指向null
,而是指向一個item值爲null的Node
結點——哨兵結點,以下圖:
this
元素的入隊是在隊尾插入元素,關於隊列的操做,若是讀者不熟悉,能夠參考《Algorithms 4th》或個人這篇博文:https://www.jianshu.com/p/f9b...spa
ConcurrentLinkedQueue的入隊代碼很簡單,卻很是精妙:線程
/** * 入隊一個元素. * * @throws NullPointerException 元素不能爲null */ public boolean add(E e) { return offer(e); }
/** * 在隊尾入隊元素e, 直到成功 */ public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t; ; ) { // 自旋, 直到插入結點成功 Node<E> q = p.next; if (q == null) { // CASE1: 正常狀況下, 新結點直接插入到隊尾 if (p.casNext(null, newNode)) { // CAS競爭插入成功 if (p != t) // CAS競爭失敗的線程會在下一次自旋中進入該邏輯 casTail(t, newNode); // 從新設置隊尾指針tail return true; } // CAS競爭插入失敗,則進入下一次自旋 } else if (p == q) // CASE2: 發生了出隊操做 p = (t != (t = tail)) ? t : head; else // 將p從新指向隊尾結點 p = (p != t && t != (t = tail)) ? t : q; } }
咱們來分析下上面offer方法
的實現。單線程的狀況下,元素入隊比較好理解,直接線性地在隊首插入元素便可。如今咱們假設有兩個線程ThreadA和ThreadB同時進行入隊操做:3d
①ThreadA先單獨入隊兩個元素九、2
此時隊列的結構以下:
②ThreadA入隊元素「10」,ThreadB入隊元素「25」
此時ThreadA和ThreadB若併發執行,咱們看下會發生什麼:
一、ThreadA和ThreadB同時進入自旋中的如下代碼塊:
if (q == null) { // CASE1: 正常狀況下, 新結點直接插入到隊尾 if (p.casNext(null, newNode)) { // CAS競爭插入成功 if (p != t) // CAS競爭失敗的線程會在下一次自旋中進入該邏輯 casTail(t, newNode); // 從新設置隊尾指針tail return true; } // CAS競爭插入失敗,則進入下一次自旋 }
二、ThreadA執行cas操做(p.casNext)成功,插入新結點「10」
ThreadA執行完成後,直接返回true,隊列結構以下:
三、ThreadB執行cas操做(p.casNext)失敗
因爲CAS操做同時修改隊尾元素,致使ThreadB操做失敗,則ThreadB進入下一次自旋;
在下一次自旋中,進入如下代碼塊:
else // 將p從新指向隊尾結點 p = (p != t && t != (t = tail)) ? t : q;
上述分支的做用就是讓p指針從新定位到隊尾結點,此時隊列結構以下:
而後ThreadB會繼續下一次自旋,並再次進入如下代碼塊:
if (q == null) { // CASE1: 正常狀況下, 新結點直接插入到隊尾 if (p.casNext(null, newNode)) { // CAS競爭插入成功 if (p != t) // CAS競爭失敗的線程會在下一次自旋中進入該邏輯 casTail(t, newNode); // 從新設置隊尾指針tail return true; } // CAS競爭插入失敗,則進入下一次自旋 }
此時,CAS操做成功,隊列結構以下:
因爲此時p!=t
,因此會調用casTail
方法從新設置隊尾指針:
casTail(t, newNode); // 從新設置隊尾指針tail
最終隊列以下:
從上面的分析過程能夠看到,因爲入隊元素必定是要連接到隊尾的,但併發狀況下隊尾結點可能隨時變化,因此就須要指針定位最新的隊尾結點,並在入隊時判斷隊尾結點是否改變了,若是改變了,就須要從新設置定位指針,而後在下一次自旋中繼續嘗試入隊操做。
上面整個執行步驟有一段分支尚未覆蓋到:
else if (p == q) // CASE2: 發生了出隊操做 p = (t != (t = tail)) ? t : head;
這個分支只有在元素入隊的同時,針對該元素也發生了「出隊」操做纔會執行,咱們後面會分析元素的「出隊」,理解了「出隊」操做再回頭來看這個分支就容易理解不少了。
隊列中元素的「出隊」是從隊首移除元素,咱們來看下ConcurrentLinkedQueue是如何實現出隊的:
/** * 在隊首出隊元素, 直到成功 */ public E poll() { restartFromHead: for (; ; ) { for (Node<E> h = head, p = h, q; ; ) { E item = p.item; if (item != null && p.casItem(item, null)) { // CASE2: 隊首是非哨兵結點(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // CASE1: 隊首是一個哨兵結點(item==null) updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
仍是經過示例來看,假設初始的隊列結構以下:
①ThreadA先單獨進行出隊操做
因爲head所指的是item==null的結點,因此ThreadA會執行如下分支:
else p = q;
而後進入下一次自旋,在自旋中執行如下分支,若是CAS操做成功,則移除首個有效元素,並從新設置頭指針:
if (item != null && p.casItem(item, null)) { // CASE2: 隊首是非哨兵結點(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; }
此時隊列的結構以下:
若是ThreadA的CAS操做失敗呢?
CAS操做失敗則會進入如下分支,並從新開始自旋:
else if (p == q) continue restartFromHead;
最終前面兩個null結點會被GC回收,隊列結構以下:
②ThreadA繼續進行出隊操做
ThreadA繼續執行「出隊」操做,仍是執行如下分支:
if (item != null && p.casItem(item, null)) { // CASE2: 隊首是非哨兵結點(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; }
可是此時p==h
,因此僅將頭結點置null
,這實際上是一種「懶刪除」的策略。
出隊元素「2」:
出隊元素「10」:
最終隊列結果以下:
③ThreadA進行出隊,其它線程進行入隊
這是最特殊的一種狀況,當隊列中只剩下一個元素時,若是同時發生出隊和入隊操做,會致使隊列出現下面這種結構:(假設ThreadA進行出隊元素「25」,ThreadB進行入隊元素「11」)
此時tail.next=tail自身,因此ThreadB在執行入隊時,會進入到offer方法的如下分支:
else if (p == q) // CASE2: 發生了出隊操做 p = (t != (t = tail)) ? t : head;
ConcurrentLinkedQueue使用了自旋+CAS的非阻塞算法來保證線程併發訪問時的數據一致性。因爲隊列自己是一種鏈表結構,因此雖然算法看起來很簡單,但其實須要考慮各類併發的狀況,實現複雜度較高,而且ConcurrentLinkedQueue不具有實時的數據一致性,實際運用中,隊列通常在生產者-消費者的場景下使用得較多,因此ConcurrentLinkedQueue的使用場景並不如阻塞隊列那麼多。
另外,關於ConcurrentLinkedQueue還有如下須要注意的幾點:
ConcurrentModificationException
,這也就形成在遍歷某個還沒有被修改的結點時,在next方法返回時能夠看到該結點的修改,但在遍歷後再對該結點修改時就看不到這種變化。size
方法須要遍歷鏈表,因此在併發狀況下,其結果不必定是準確的,只能供參考。