在以前的文章中,已經對ArrayBlockingQueue、LinkedBlockingQueue這兩個比較經常使用的阻塞隊列作了源碼分析,咱們知道其內部都是經過ReentrantLock來保證數據讀寫的線程安全,經過Condition來完成線程等待和喚醒,只不過ArrayBlockingQueue在讀寫時使用了一把鎖所完成,而LinkedBlockingQueue對於讀和寫分別使用了兩把鎖來進行處理,從而達到讀寫分離的效果。算法
然而,經過鎖機制來實現一個線程安全的隊列,在併發不是特別高的狀況下並非很是合適,由於在大多數狀況下都只有幾個線程同時訪問,而每次執行都須要去加一次鎖,從而致使線程進行上下文切換,影響總體性能。所以,JDK還爲咱們提供了一個無鎖線程安全的隊列——ConcurrentLinkedQueue,其底層使用CAS來實現無阻塞的併發控制。本文將該隊列的實現機制和源碼作一個分析,讓咱們共同看看Doug Lea大神是如何巧妙地經過無鎖機制來實現一個線程安全的隊列。編程
首先讓咱們看看JDK文檔對該類的描述:安全
ConcurrentLinkedQueue的API描述.png多線程
ConcurrentLinkedQueue是一個基於鏈表、無界、線程安全的隊列。這個隊列將元素按照先進先出的順序進行存儲。隊列的頭節點是在隊列中存在時間最久的節點,隊列的尾節點是在隊列中存在時間最短的節點。新的元素會被插入到隊列的尾部,而隊列的元素的獲取操做則會從隊列的頭部去獲取元素。ConcurrentLinekedQueue適合做爲多個線程共享訪問的集合。與大多數併發集合的實現相似,該類也不容許添加null元素,該類的實現使用了一個高效的無鎖算法,其算法的是基於podc-1996.pdf所改進的。併發
除了上面所描述的基本特性以外,ConcurrentLinkedQueue中還有一些其餘的特色:工具
因爲是基於鏈表的實現方式,與其餘的併發隊列相似,都會在內部定義一個節點類,ConcurrentLinedQueu亦是如此,首先咱們看一下節點的定義:源碼分析
//該節點是一個靜態內部類,所以其只能做用於該隊列內部 private static class Node<E> { /* * 當前節點存儲的元素,注意到這裏使用volatile關鍵字來對節點進行 * 修飾,其目的是在併發讀的時候保證內存的可見性 */ volatile E item; //當前節點的下一個節點 volatile Node<E> next; /** * 構建新的節點,這裏沒有使用volatile的方式來對節點的元素值進行設置,而是使用普通的寫方式 * 由於對於一個新增的節點,只有在其被成功插入到隊列尾部纔對外可見,所以在這裏沒有對數據可見性的強制要求 */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); } /* * 經過Unsafe來完成對當前節點元素的CAS操做 */ boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /* * 使用普通的方式來設置當前節點的下一個節點 */ void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } /* * 經過Unsafe來完成對當前節點的下一個節點的CAS操做 */ boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; //經過Unsafe來獲取一個Node節點的item屬性在內存中相對該對象的位置偏移量 itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); //經過Unsafe來獲取一個Node節點的next節點屬性在內存中相對該對象的位置偏移量 nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
上面節點的定義與以前分析的SynchronousQueue中的內部定義的節點很是相似,這裏再也不過多闡述。性能
隊列的屬性定義this
爲了提升快速查找隊列中第一個節點和最後一個節點,所以ConcurrentLinkedQueue中分別定義了一個head節點和tail節點來快速定位。spa
/** * 不變性: * - 隊列中全部未刪除的節點均可以經過head節點的succ方法查找到 * - head節點必定不可能等於null * - (tmp = head).next != tmp,即head的next不能指向本身。 * * 可變性: * - head的item可能爲null,也可能不爲null * - tail節點可能會滯後於head節點,所以從head節點未必必定能夠找到tail節點 * */ private transient volatile Node<E> head; /** * 不變性: * - 節點中的最後一個元素老是能夠經過tail的succ方法來獲取 * - tail節點不等於null * * 可變性: * - head的item可能爲null,也可能不爲null * - tail 節點的next可能指向本身,也可能不指向本身 * - tail節點可能會滯後於head節點,所以從head節點未必必定能夠找到tail節點 */ private transient volatile Node<E> tail; public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
經過上面的描述咱們知道了該隊列是經過一個頭節點和一個尾節點,而後將中間連接節點之間兩兩相鏈接構成一個隊列,下面讓咱們分析一下ConcurrentLinkedQueue的內部的具體實現。(在這裏須要說明一下,因爲其節點是徹底地基於Unsafe來完成CAS的操做,若是你對該內容還不是很熟悉的話,能夠參考個人深刻分析Java中的原子操做這篇文章,裏面對原子操做有一個比較細緻的描述。
在分析源碼以前,咱們先經過幾張圖來講明一下ConcurrentLinkedQueue的入隊列的總體過程。在瞭解完總體的過程後,再結合源碼去分析細節會更加容易理解:
隊列初始化
隊列初始化狀態
在隊列剛建立時,head和tail同時指向空節點,咱們也稱其爲dummy
節點。
dummy節點的說明
添加元素a
添加元素a
向隊列中添加元素a,此時只是新節點追加到第一個節點的後面,可是tail節點並未發生改變。
添加元素b
添加元素b
向隊列中添加元素b,此時新節點與原先的tail節點之間的距離大於1,所以tail節點在這個時候會更新,真正的指向了最後一個節點
添加元素c
添加元素c
因爲新節點與tail節點的距離沒有大於1,所以此時tail節點一樣不會發生更新。
添加元素d
添加元素d
經過上面的圖示,咱們能夠看到ConcurrentLinkedQueue在入隊列過程當中很是明顯的一個特色就是tail指針不是實時更新的,即tail節點可能會滯後於隊列中真正的最後一個節點,只有當最後的一個節點與tail節點以前的距離大於1時纔會更新,而這樣設計的目的就是爲了減小避免每增長一個節點,tail節點都須要去執行一次CAS操做的狀況發生。
public boolean offer(E e) { // 因爲元素不容許爲null,所以對元素作一個檢查 checkNotNull(e); // 生成待插入的節點 final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 當q等於null時,則p就是隊列中的最後一個元素 if (q == null) { // 執行cas操做,若是執行成功,則newNode成爲隊列的最後一個元素,但它未必是tail指向的元素 if (p.casNext(null, newNode)) { // 經過判斷p!=t,從而肯定當前新節點與tail指向的節點之間的距離是否大於1 // 若是大於1,則須要更新tail指針 if (p != t) casTail(t, newNode); return true; } } // 若是p==q,則意味着當前p節點已經被從隊列中移除(若是單純從入隊列看是看不出來的,後面結合出隊列再回頭分析) else if (p == q) /* 判斷在執行過程當中tail是否發生變化,若是未發生變化,則tail也已經脫落隊列 * 由於 p = t = tail,而p已經脫離隊列,從而推斷出tail也脫離了隊列 * 那麼此時只能從head開始,從新查找隊列的最後一個元素 * 若是tail發生了變化,則直接從當前隊列的tail開始查找隊列的最後一個元素 */ p = (t != (t = tail)) ? t : head; else /** * 因爲p節點的next不爲null,而且p節點並未從隊列中刪除,所以須要繼續查找隊列的最後一個節點 * 判斷執行過程當中tail節點是否發生了變化 * 若是發生了變化,則讓p執行當前的tail,不然就讓p直接指向它的next節點q */ p = (p != t && t != (t = tail)) ? t : q; } }
可能會有很多朋友對上面t != (t = tail)
的處理感到疑惑,疑惑的緣由可能會以爲一個變量本身和本身比較,那不是必定爲true嘛,怎麼還會出現等於false的可能呢。爲了理解這個問題,咱們一塊兒看一下下面的這段代碼:
public class VarCompareTest { static volatile int b = 2; public static void main(String[] args) { int a = b; int c = a != (a = b) ? 5 : 4; System.out.println(a); System.out.println(c); } }
這段代碼的也用到了上面相似的t != (t = tail)
,可是在編譯完成後,咱們經過IDEA自己的反編譯工具來查看一下對應的Class文件的結果(目前因爲對字節碼指令不熟悉,所以不從那個角度去解讀):
反編譯Class文件後的結果
經過上面的代碼,咱們能夠看到,一開始a的值就等於b的值,其值爲2;緊接着a的值首先經過一個局部變量記錄,而後再將b的值賦值給a,因爲b可能存在多線程修改的可能,此時b的值可能被其餘線程改爲了3,所以a的值也會變爲3,最後再拿2與3進行比較,即var10000 != b
進行比較,就會存在等於false的狀況發生了。對於t != (t = tail)
的狀況也是如此,相信經過這個例子說明你們應該明白其中的原理了!
在分析出隊列源碼以前,咱們也結合上面的圖來看一下出隊列的總體過程:
隊列當前狀態
這裏假設隊列是基於上面入隊列以後的狀態進行的。
移除第1個節點
移除第一個節點
本來指向head的節點,此時的next指向了本身,所以它從隊列中真正的移除。存儲a的節點,其item置爲了null,而且head節點發生變動,真正指向了隊列中第一個有效(真正存儲數據)的節點。
移除第2個節點
移除第2個節點
此時只是僅僅將節點的item設置爲了null,可是head節點沒有發生變動,這樣作的目的也是爲了減小一次CAS操做,它會等到下一次纔去變動。
移除第3個節點
移除第3個節點
移除第4個節點
移除第4個節點
看完上圖的分析,相信你們對ConcurrentLinkedQueue出隊列的操做應該有一個直觀的理解了,下面咱們看下源碼的具體實現:
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 若是元素的item不爲null,則說明p節點是當前隊列中的第一個未被刪除的節點 // 此時也說明head指向的節點確實是隊列中的第一個元素 // 經過CAS操做,將item設置爲null,來標記其已經被刪除 if (item != null && p.casItem(item, null)) { // 判斷p節點與head指向的節點是不是同一個,若是不是則須要將head節點向前移動 if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 節點的next爲null,則說明隊列爲空,只有一個dummy節點 else if ((q = p.next) == null) { // 嘗試將dummy節點p設置爲新的head updateHead(h, p); return null; } // 若是p節點被刪除,只能從隊列的head從頭開始再次查找 else if (p == q) // 跳回到最外層的循環,從新執行一次Node<E> h = head, p = h, q;操做 continue restartFromHead; else // 因爲head指向的節點其item爲null,即head指向的節點不是一個有效節點,所以繼續經過head的next繼續查找 p = q; } } } final void updateHead(Node<E> h, Node<E> p) { // 判斷新設置的節點與原來的head節點是不是同一個,若是不是則將新節點設置爲新的head節點, // 而且將原來的head節點的next指向本身。 if (h != p && casHead(h, p)) h.lazySetNext(h); }
通常來講,head節點都在隊列的左邊,而tail節點在隊列的右邊。然而,在ConcurrentLinkedQueue中,可能存在tail節點在左邊,而head節點卻跑到了右邊的狀況,這種場景咱們將其稱爲tail lag behind head
。下面咱們分析一下在什麼樣的場景下會發生這樣的狀況,這對於真正理解ConcurrentLinkedQueue很是重要!
添加一個元素
從上圖能夠看到,在添加完一個元素後,tail節點並有發生改變。此時,假設咱們去獲取隊列中的元素,隊列的結構就變成以下的樣子。
取出隊列元素
從上面的結構咱們能夠看到,此時tail節點滯後於head節點,而且咱們此時經過head節點也沒法查找到tail節點,由於該節點已經從隊列中移除。當下一次添加元素的時候,就會出現tail節點本身指向本身的狀況,此時就須要從新獲取到head,將新增的元素追加到head後面。
至此,ConcurrentLinkedQueue的實現咱們已經分析完成了。該類的核心設計就在於CAS的無阻塞以及head/tail節點的延遲更新。儘可能咱們在實際的開發中基本不會去實現一個如此複雜的隊列,可是經過分析一個經典無阻塞隊列,能夠更加好地幫助咱們理解併發編程。若是存在分析不對的地方,還望大神指出。
做者:碼農一枚 連接:https://www.jianshu.com/p/32d6526494fd 來源:簡書 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。