ConcurrentLinkedQueue是一種基於鏈表實現的無界非阻塞線程安全隊列,遵循先入先出規則。java
線程安全隊列有兩種實現方式:node
阻塞方式:對入隊和出隊操做加鎖。阻塞隊列。編程
非阻塞方式:經過自旋CAS實現。例如:ConcurrentLinkedQueue安全
下面從源代碼中分析ConcurrentLinkedQueue的實現方法。併發
從類圖能夠看出,ConcurrentLinkedQueue有head和tail兩個volatile域,節點是用靜態內部類Node表示,每一個Node含有元素item和指向下一個節點的指針next,都是volatile變量。ide
Node的item和next兩個域都是volatile變量,保證可見性。casItem和casNext方法使用了UNSAFE提供的CAS方法保證操做的原子性。源碼分析
1 //Node代碼中使用了UNSAFE提供的CAS方法保證操做的原子性, 2 //UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 3 //第一個參數表示要更新的對象,第二個參數nextOffset是Field的偏移量,第三個參數表示指望值,最後一個參數更新後的值。若next域的值等於cmp,則把next域更新爲val並返回true;不然不更新並返回false。 4 private static class Node<E> { 5 volatile E item; //Node值,volatile保證可見性 6 volatile Node<E> next; //Node的下一個元素,volatile保證可見性 7 8 /** 9 * Constructs a new node. Uses relaxed write because item can 10 * only be seen after publication via casNext. 11 */ 12 Node(E item) { 13 UNSAFE.putObject(this, itemOffset, item); 14 } 15 16 boolean casItem(E cmp, E val) { 17 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 18 } 19 20 void lazySetNext(Node<E> val) { 21 UNSAFE.putOrderedObject(this, nextOffset, val); 22 } 23 24 boolean casNext(Node<E> cmp, Node<E> val) { 25 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 26 } 27 28 // Unsafe mechanics 29 30 private static final sun.misc.Unsafe UNSAFE; 31 private static final long itemOffset; 32 private static final long nextOffset; 33 34 static { 35 //初始化UNSAFE和各個域在類中的偏移量 36 try { 37 UNSAFE = sun.misc.Unsafe.getUnsafe();//初始化UNSAFE 38 Class k = Node.class; 39 //itemOffset是指類中item字段在Node類中的偏移量,先經過反射獲取類的item域,而後經過UNSAFE獲取item域在內存中相對於Node類首地址的偏移量。 40 itemOffset = UNSAFE.objectFieldOffset 41 (k.getDeclaredField("item")); 42 //nextOffset是指類中next字段在Node類中的偏移量 43 nextOffset = UNSAFE.objectFieldOffset 44 (k.getDeclaredField("next")); 45 } catch (Exception e) { 46 throw new Error(e); 47 } 48 } 49 }
Node類中的lazySetNext(Node<E> val)方法,能夠理解爲延遲設置Next,內部是使用UNSAFE類的putOrderedObject方法實現,putOrderedXXX方法是putXXXVolatile方法的延遲實現,不保證值的改變被其餘線程當即看到。爲何要lazySetNext這個方法呢?其實它是一種低級別的優化手段,就是在不須要讓共享變量的修改馬上讓其餘線程可見的時候,以設置普通變量的方式來修改共享狀態,能夠減小沒必要要的內存屏障,從而提升程序執行的效率。性能
《Java內存模型中》提到volatile變量能夠實現可見性,其原理就是插入內存屏障以保證不會重排序指令,使用的是store-load內存屏障,開銷較大。UNSAFE類的putOrderedXXX方法則是在指令中插入StoreStore內存屏障,避免發生寫操做重排序,因爲StoreStore屏障的性能損耗小於StoreLoad屏障,因此lazySetNext方法比直接寫volatile變量的性能要高。須要注意的是,StoreStore屏障僅能夠避免寫寫重排序,不保證內存可見性。優化
在出隊操做中更新Queue的Head節點時用到了lazySetNext(Node<E> val)方法,將舊head節點的next指向本身。this
建立一個空的Queue,head節點爲null且tail節點等於head節點。
1 //建立一個空的Queue,head節點爲null且tail節點等於head節點 2 public ConcurrentLinkedQueue() { 3 head = tail = new Node<E>(null); 4 5 }
入隊的方法爲offer,向隊列的尾部插入指定的元素,因爲ConcurrentLinkedQueue是無界的,因此offer永遠返回true,不能經過返回值來判斷是否入隊成功。
入隊大體有如下幾個步驟:
1)根據tail節點定位出尾節點(last node);
2)將新節點置爲尾節點的下一個節點;
3)更新尾節點casTail。
1 //向隊列的尾部插入指定的元素 2 public boolean offer(E e) { 3 checkNotNull(e); 4 final Node<E> newNode = new Node<E>(e);//構造新Node 5 //循環CAS直到入隊成功。一、根據tail節點定位出尾節點(last node);二、將新節點置爲尾節點的下一個節點,三、更新尾節點casTail。 6 for (Node<E> t = tail, p = t;;) { 7 Node<E> q = p.next; 8 if (q == null) { //判斷p是否是尾節點,tail節點不必定是尾節點,判斷是否是尾節點的依據是該節點的next是否是null 9 // p is last node 10 if (p.casNext(null, newNode)) { 11 //設置P節點的下一個節點爲新節點,若是p的next爲null,說明p是尾節點,casNext返回true;若是p的next不爲null,說明有其餘線程更新過隊列的尾節點,casNext返回false。 12 // Successful CAS is the linearization point 13 // for e to become an element of this queue, 14 // and for newNode to become "live". 15 if (p != t) // hop two nodes at a time 16 casTail(t, newNode); // Failure is OK. 17 return true; 18 } 19 // Lost CAS race to another thread; re-read next 20 } 21 else if (p == q) 22 //p節點是null的head節點恰好被出隊,更新head節點時h.lazySetNext(h)把舊的head節點指向本身 23 // We have fallen off list. If tail is unchanged, it 24 // will also be off-list, in which case we need to 25 // jump to head, from which all live nodes are always 26 // reachable. Else the new tail is a better bet. 27 p = (t != (t = tail)) ? t : head; 28 else 29 // Check for tail updates after two hops. 30 p = (p != t && t != (t = tail)) ? t : q; 31 //判斷tail節點有沒有被更新,若是沒被更新,1)p=q:p指向p.next繼續尋找尾節點; 32 //若是被更新了,2)p=t:P賦值爲新的tail節點 33 //p != t && t != (t = tail)是怎麼執行的?見隨筆附錄《經過字節碼指令分析 p != t && t != (t = tail) 語句的執行》 34 //什麼狀況下p!=t.只有本分支和else if (p == q)分支含有更新變量p和t的語句,因此在p!=t出現以前已經循環過這兩個分支至少一次。 35 36 } 37 } 38 39 private boolean casTail(Node<E> cmp, Node<E> val) { 40 return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); 41 }
須要注意的是:tail不老是尾節點(last node)。DougLea大師爲何這麼設計呢?把tail節點永遠做爲Queue的尾節點實現起來不是更簡單嗎?
下面是tail節點永遠做爲Queue的尾節點的入隊方法代碼:
1 public boolean offer(E e) { 2 if (e == null) 3 throw new NullPointerException(); 4 Node<E> n = new Node<E>(e); 5 for (;;) { 6 Node<E> t = tail; 7 //此處若是casNext成功,那麼casTail可能會成功。由於在這兩個原子操做期間,其餘線程的casNext操做都會失敗,以後的casTail不會被執行,即tail節點不變。 8 if (t.casNext(null, n) && casTail(t, n)) { 9 return true; 10 } 11 } 12 }
這麼作的缺點是每次入隊都會自旋CAS更新tail節點,入隊效率會下降,而DougLea的設計經過hops變量來減小入隊時減小更新tail節點的次數,默認狀況下hops爲1。當tail節點與尾節點的距離大於等於hops值時才更新Queue的tail節點。這樣帶來的壞處是入隊時須要根據tail定位尾節點,hops的值越大,定位時間就越長。DougLea的設計思想是經過增長對volatile變量的讀來減小對volatile變量的寫,而寫操做的開銷遠遠大於讀操做。因此從整體上來講入隊效率是提高的。
和入隊類似,出隊時也不是每次都會更新head節點,當head節點的item不爲null時,直接彈出item;不然會更新head節點。更新head節點成功時,會把舊的head節點指向本身。
1 public E poll() { 2 restartFromHead: 3 //兩層循環 4 for (;;) { 5 for (Node<E> h = head, p = h, q;;) { 6 E item = p.item; 7 8 if (item != null && p.casItem(item, null)) { 9 // Successful CAS is the linearization point 10 // for item to be removed from this queue. 11 if (p != h) // hop two nodes at a time 12 updateHead(h, ((q = p.next) != null) ? q : p); 13 return item; 14 } 15 //隊列爲空,更新head節點 16 else if ((q = p.next) == null) { 17 updateHead(h, p); 18 return null; 19 } 20 else if (p == q) 21 //p節點是null的head節點恰好被出隊,更新head節點時h.lazySetNext(h);把舊的head節點指向本身。 22 //從新從head節點開始 23 continue restartFromHead; 24 else 25 p = q; //將p執行p的下一個節點 26 } 27 } 28 } 29 30 //更新head節點 31 final void updateHead(Node<E> h, Node<E> p) { 32 //經過CAS將head更新爲P 33 if (h != p && casHead(h, p)) 34 h.lazySetNext(h);//把舊的head節點指向本身 35 } 36 37 void lazySetNext(Node<E> val) { 38 UNSAFE.putOrderedObject(this, nextOffset, val); 39 }
注意:size()須要遍歷隊列中的全部元素,時間複雜度爲O(n),開銷較大。而且若是在遍歷的過程當中,Queue有入隊或出隊的操做,會致使該方法統計的結果不許確。因此size()方法不太有用。那如何判斷Queue是否爲空呢?使用isEmpty()方法,判斷第一個節點是否爲null,時間複雜度爲O(1)
1 public int size() { 2 int count = 0; 3 for (Node<E> p = first(); p != null; p = succ(p)) 4 if (p.item != null) 5 // Collection.size() spec says to max out 6 if (++count == Integer.MAX_VALUE) 7 break; 8 return count; 9 }
在讀ConcurrentLinkedQueue源代碼時,在入隊方法的定位尾節點中讀到 p = (p != t && t != (t = tail)) ? t : q; 語句,不太理解 p != t && t != (t = tail) 的執行順序,遂經過反彙編語句仔細研究一下。
咱們都知道 A && B 運算,在A不知足條件的狀況下,B將不會執行。那在字節碼指令中是怎麼實現的呢?
經過如下代碼模擬:
1 public class Test { 2 public static void main(String[] args) { 3 int t = 8; 4 int p = t; 5 int tail = 9; 6 boolean result = (p != t && t != (t = tail)); 7 System.out.println("p=" + p + ", t=" + t + ", result=" + result); 8 } 9 }
不出所料,運行結果爲p=8, t=8, result=false。t=8說明沒有執行t != (t = tail)語句。
看反彙編後的字節碼指令:
1 public class Test { 2 public static void main(java.lang.String[] args); 3 0 bipush 8 //將單字節常量(-128~127)壓入棧頂 4 2 istore_1 [t] //將棧頂int型數值存入第二個本地變量,即賦值給變量t,同時常量8出棧 5 3 iload_1 [t] //將第二個int型本地變量(t)壓入棧頂 6 4 istore_2 [p] //將棧頂int型數值存入第三個本地變量,即賦值給變量P,同時t出棧 7 5 bipush 9 8 7 istore_3 [tail] 9 8 iload_2 [p] 10 9 iload_1 [t] 11 10 if_icmpeq 24 //比較棧頂兩int型數值大小,當結果等於0時跳轉。即比較p!=t,結果爲false(0),跳轉到24行,同時p和t出棧 12 13 iload_1 [t] 13 14 iload_3 [tail] 14 15 dup 15 16 istore_1 [t] 16 17 if_icmpeq 24 17 20 iconst_1 18 21 goto 25 19 24 iconst_0 //將int型0壓入棧頂。 20 25 istore 4 [result] //將棧頂int型數值存入指定本地變量。即將result賦值爲0(false) 21 27 return 22 }
接下來再看一下第一個條件成立時的狀況。代碼將p != t改成p == t:
1 public class Test { 2 public static void main(String[] args) { 3 int t = 8; 4 int p = t; 5 int tail = 9; 6 boolean result = (p == t && t != (t = tail)); 7 System.out.println("p=" + p + ", t=" + t + ", result=" + result); 8 } 9 }
先來看運行結果p=8, t=9, result=true。說明執行了t != (t = tail)語句。
看反彙編後的字節碼指令:
1 public class Test { 2 public static void main(java.lang.String[] args); 3 0 bipush 8 4 2 istore_1 [t] 5 3 iload_1 [t] 6 4 istore_2 [p] 7 5 bipush 9 8 7 istore_3 [tail] 9 8 iload_2 [p] 10 9 iload_1 [t] 11 10 if_icmpne 24 //比較棧頂兩int型數值大小,當結果不等於0時跳轉。即比較p == t,結果爲true(1)。因此不會跳轉到24行,繼續執行下一行。 12 13 iload_1 [t] //將變量t壓入棧頂,此時t=8 13 14 iload_3 [tail] //將變量tail壓入棧頂,tail=9 14 15 dup //複製棧頂數值並將複製值壓入棧頂。即複製tail變量值並壓入棧頂,tail=9 15 16 istore_1 [t] //將棧頂數值存入t變量,同時出棧 16 17 if_icmpeq 24 //比較棧頂兩int型數值大小,當結果等於0時跳轉。此時棧頂有九、8。比較9!=8,結果爲true(1)。因此不會跳轉到24行,繼續執行下一行。 17 20 iconst_1 //將int型1壓入棧頂 18 21 goto 25 //無條件跳轉到25行 19 24 iconst_0 20 25 istore 4 [result] //將棧頂1存入result,同時出棧。即result返回true 21 27 return 22 }
經過字節碼指令分析可知,編譯器是經過if_icmpeq和if_icmpne比較並條件跳轉指令實現&&短路與運算的。在第二種狀況中,還分析了t != (t = tail)語句的執行過程,理解會更加深刻。
《Java併發編程的藝術》
ConcurrentLinkedQueue源碼分析(http://www.jianshu.com/p/7816c1361439)