Java併發容器--ConcurrentLinkedQueue

概述

  ConcurrentLinkedQueue是一種基於鏈表實現的無界非阻塞線程安全隊列,遵循先入先出規則。java

  線程安全隊列有兩種實現方式:node

    阻塞方式:對入隊和出隊操做加鎖。阻塞隊列。編程

    非阻塞方式:經過自旋CAS實現。例如:ConcurrentLinkedQueue安全

  下面從源代碼中分析ConcurrentLinkedQueue的實現方法。併發

類關係圖

      

    從類圖能夠看出,ConcurrentLinkedQueue有head和tail兩個volatile域,節點是用靜態內部類Node表示,每一個Node含有元素item和指向下一個節點的指針next,都是volatile變量。ide

源碼分析

  Node源碼

    Node的item和next兩個域都是volatile變量,保證可見性。casItem和casNext方法使用了UNSAFE提供的CAS方法保證操做的原子性。源碼分析

 1         //Node代碼中使用了UNSAFE提供的CAS方法保證操做的原子性,
 2         //UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 
 3         //第一個參數表示要更新的對象,第二個參數nextOffset是Field的偏移量,第三個參數表示指望值,最後一個參數更新後的值。若next域的值等於cmp,則把next域更新爲val並返回true;不然不更新並返回false。
 4         private static class Node<E> {
 5             volatile E item;    //Node值,volatile保證可見性
 6             volatile Node<E> next;    //Node的下一個元素,volatile保證可見性
 7 
 8             /**
 9              * Constructs a new node.  Uses relaxed write because item can
10              * only be seen after publication via casNext.
11              */
12             Node(E item) {
13                 UNSAFE.putObject(this, itemOffset, item);
14             }
15 
16             boolean casItem(E cmp, E val) {
17                 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
18             }
19 
20             void lazySetNext(Node<E> val) {
21                 UNSAFE.putOrderedObject(this, nextOffset, val);
22             }
23 
24             boolean casNext(Node<E> cmp, Node<E> val) {
25                 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
26             }
27 
28             // Unsafe mechanics
29 
30             private static final sun.misc.Unsafe UNSAFE;
31             private static final long itemOffset;
32             private static final long nextOffset;
33 
34             static {
35                 //初始化UNSAFE和各個域在類中的偏移量
36                 try {
37                     UNSAFE = sun.misc.Unsafe.getUnsafe();//初始化UNSAFE
38                     Class k = Node.class;
39                     //itemOffset是指類中item字段在Node類中的偏移量,先經過反射獲取類的item域,而後經過UNSAFE獲取item域在內存中相對於Node類首地址的偏移量。
40                     itemOffset = UNSAFE.objectFieldOffset
41                         (k.getDeclaredField("item"));
42                     //nextOffset是指類中next字段在Node類中的偏移量
43                     nextOffset = UNSAFE.objectFieldOffset
44                         (k.getDeclaredField("next"));
45                 } catch (Exception e) {
46                     throw new Error(e);
47                 }
48             }
49         }
View Code

    Node類中的lazySetNext(Node<E> val)方法,能夠理解爲延遲設置Next,內部是使用UNSAFE類的putOrderedObject方法實現,putOrderedXXX方法是putXXXVolatile方法的延遲實現,不保證值的改變被其餘線程當即看到。爲何要lazySetNext這個方法呢?其實它是一種低級別的優化手段,就是在不須要讓共享變量的修改馬上讓其餘線程可見的時候,以設置普通變量的方式來修改共享狀態,能夠減小沒必要要的內存屏障,從而提升程序執行的效率。性能

    《Java內存模型中》提到volatile變量能夠實現可見性,其原理就是插入內存屏障以保證不會重排序指令,使用的是store-load內存屏障,開銷較大。UNSAFE類的putOrderedXXX方法則是在指令中插入StoreStore內存屏障,避免發生寫操做重排序,因爲StoreStore屏障的性能損耗小於StoreLoad屏障,因此lazySetNext方法比直接寫volatile變量的性能要高。須要注意的是,StoreStore屏障僅能夠避免寫寫重排序,不保證內存可見性。優化

    在出隊操做中更新Queue的Head節點時用到了lazySetNext(Node<E> val)方法,將舊head節點的next指向本身。this

  初始化

    建立一個空的Queue,head節點爲null且tail節點等於head節點。

1             //建立一個空的Queue,head節點爲null且tail節點等於head節點
2             public ConcurrentLinkedQueue() {
3                 head = tail = new Node<E>(null);
4         
5             }
View Code

 

  入隊

    入隊的方法爲offer,向隊列的尾部插入指定的元素,因爲ConcurrentLinkedQueue是無界的,因此offer永遠返回true,不能經過返回值來判斷是否入隊成功。

    入隊大體有如下幾個步驟:

      1)根據tail節點定位出尾節點(last node);

      2)將新節點置爲尾節點的下一個節點;

      3)更新尾節點casTail。

 1         //向隊列的尾部插入指定的元素
 2         public boolean offer(E e) {
 3             checkNotNull(e);
 4             final Node<E> newNode = new Node<E>(e);//構造新Node
 5             //循環CAS直到入隊成功。一、根據tail節點定位出尾節點(last node);二、將新節點置爲尾節點的下一個節點,三、更新尾節點casTail。
 6             for (Node<E> t = tail, p = t;;) {
 7                 Node<E> q = p.next;
 8                 if (q == null) {    //判斷p是否是尾節點,tail節點不必定是尾節點,判斷是否是尾節點的依據是該節點的next是否是null
 9                     // p is last node
10                     if (p.casNext(null, newNode)) {    
11                     //設置P節點的下一個節點爲新節點,若是p的next爲null,說明p是尾節點,casNext返回true;若是p的next不爲null,說明有其餘線程更新過隊列的尾節點,casNext返回false。
12                         // Successful CAS is the linearization point
13                         // for e to become an element of this queue,
14                         // and for newNode to become "live".
15                         if (p != t) // hop two nodes at a time
16                             casTail(t, newNode);  // Failure is OK.
17                         return true;
18                     }
19                     // Lost CAS race to another thread; re-read next
20                 }
21                 else if (p == q)
22                     //p節點是null的head節點恰好被出隊,更新head節點時h.lazySetNext(h)把舊的head節點指向本身
23                     // We have fallen off list.  If tail is unchanged, it
24                     // will also be off-list, in which case we need to
25                     // jump to head, from which all live nodes are always
26                     // reachable.  Else the new tail is a better bet.
27                     p = (t != (t = tail)) ? t : head;
28                 else
29                     // Check for tail updates after two hops.
30                     p = (p != t && t != (t = tail)) ? t : q;
31                     //判斷tail節點有沒有被更新,若是沒被更新,1)p=q:p指向p.next繼續尋找尾節點;
32                     //若是被更新了,2)p=t:P賦值爲新的tail節點
33                     //p != t && t != (t = tail)是怎麼執行的?見隨筆附錄《經過字節碼指令分析 p != t && t != (t = tail) 語句的執行》
34                     //什麼狀況下p!=t.只有本分支和else if (p == q)分支含有更新變量p和t的語句,因此在p!=t出現以前已經循環過這兩個分支至少一次。
35                     
36             }
37         }
38         
39         private boolean casTail(Node<E> cmp, Node<E> val) {
40             return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
41         }
View Code

    須要注意的是:tail不老是尾節點(last node)。DougLea大師爲何這麼設計呢?把tail節點永遠做爲Queue的尾節點實現起來不是更簡單嗎?

    下面是tail節點永遠做爲Queue的尾節點的入隊方法代碼:

 1         public boolean offer(E e) {
 2             if (e == null)
 3                 throw new NullPointerException();
 4             Node<E> n = new Node<E>(e);
 5             for (;;) {
 6                 Node<E> t = tail;
 7                 //此處若是casNext成功,那麼casTail可能會成功。由於在這兩個原子操做期間,其餘線程的casNext操做都會失敗,以後的casTail不會被執行,即tail節點不變。
 8                 if (t.casNext(null, n) && casTail(t, n)) {
 9                     return true;
10                 }
11             }
12         }
View Code

    這麼作的缺點是每次入隊都會自旋CAS更新tail節點,入隊效率會下降,而DougLea的設計經過hops變量來減小入隊時減小更新tail節點的次數,默認狀況下hops爲1。當tail節點與尾節點的距離大於等於hops值時才更新Queue的tail節點。這樣帶來的壞處是入隊時須要根據tail定位尾節點,hops的值越大,定位時間就越長。DougLea的設計思想是經過增長對volatile變量的讀來減小對volatile變量的寫,而寫操做的開銷遠遠大於讀操做。因此從整體上來講入隊效率是提高的。

 

  出隊

    和入隊類似,出隊時也不是每次都會更新head節點,當head節點的item不爲null時,直接彈出item;不然會更新head節點。更新head節點成功時,會把舊的head節點指向本身。

 1             public E poll() {
 2                 restartFromHead:
 3                 //兩層循環
 4                 for (;;) {
 5                     for (Node<E> h = head, p = h, q;;) {
 6                         E item = p.item;
 7 
 8                         if (item != null && p.casItem(item, null)) {
 9                             // Successful CAS is the linearization point
10                             // for item to be removed from this queue.
11                             if (p != h) // hop two nodes at a time
12                                 updateHead(h, ((q = p.next) != null) ? q : p);
13                             return item;
14                         }
15                         //隊列爲空,更新head節點
16                         else if ((q = p.next) == null) {
17                             updateHead(h, p);
18                             return null;
19                         }
20                         else if (p == q)
21                             //p節點是null的head節點恰好被出隊,更新head節點時h.lazySetNext(h);把舊的head節點指向本身。
22                             //從新從head節點開始
23                             continue restartFromHead;
24                         else
25                             p = q;    //將p執行p的下一個節點
26                     }
27                 }
28             }
29             
30             //更新head節點
31             final void updateHead(Node<E> h, Node<E> p) {
32                 //經過CAS將head更新爲P
33                 if (h != p && casHead(h, p))
34                     h.lazySetNext(h);//把舊的head節點指向本身
35             }
36             
37             void lazySetNext(Node<E> val) {
38                 UNSAFE.putOrderedObject(this, nextOffset, val);
39             }
View Code

  

  隊列大小

    注意:size()須要遍歷隊列中的全部元素,時間複雜度爲O(n),開銷較大。而且若是在遍歷的過程當中,Queue有入隊或出隊的操做,會致使該方法統計的結果不許確。因此size()方法不太有用。那如何判斷Queue是否爲空呢?使用isEmpty()方法,判斷第一個節點是否爲null,時間複雜度爲O(1)

1         public int size() {
2             int count = 0;
3             for (Node<E> p = first(); p != null; p = succ(p))
4                 if (p.item != null)
5                     // Collection.size() spec says to max out
6                     if (++count == Integer.MAX_VALUE)
7                         break;
8             return count;
9         }
View Code

 

 

附錄:經過字節碼指令分析 p != t && t != (t = tail) 語句的執行

  在讀ConcurrentLinkedQueue源代碼時,在入隊方法的定位尾節點中讀到 p = (p != t && t != (t = tail)) ? t : q; 語句,不太理解 p != t && t != (t = tail) 的執行順序,遂經過反彙編語句仔細研究一下。

  咱們都知道 A && B 運算,在A不知足條件的狀況下,B將不會執行。那在字節碼指令中是怎麼實現的呢?

  經過如下代碼模擬:

1             public class Test {
2                 public static void main(String[] args) {
3                     int t = 8;
4                     int p = t;
5                     int tail = 9;
6                     boolean result = (p != t && t != (t = tail));
7                     System.out.println("p=" + p + ", t=" + t + ", result=" + result);
8                 }
9             }
View Code

 

  不出所料,運行結果爲p=8, t=8, result=false。t=8說明沒有執行t != (t = tail)語句。

  看反彙編後的字節碼指令:

 1         public class Test {
 2           public static void main(java.lang.String[] args);
 3              0  bipush 8                //將單字節常量(-128~127)壓入棧頂
 4              2  istore_1 [t]            //將棧頂int型數值存入第二個本地變量,即賦值給變量t,同時常量8出棧
 5              3  iload_1 [t]                //將第二個int型本地變量(t)壓入棧頂 
 6              4  istore_2 [p]            //將棧頂int型數值存入第三個本地變量,即賦值給變量P,同時t出棧
 7              5  bipush 9                
 8              7  istore_3 [tail]
 9              8  iload_2 [p]
10              9  iload_1 [t]
11             10  if_icmpeq 24            //比較棧頂兩int型數值大小,當結果等於0時跳轉。即比較p!=t,結果爲false(0),跳轉到24行,同時p和t出棧
12             13  iload_1 [t]
13             14  iload_3 [tail]
14             15  dup
15             16  istore_1 [t]
16             17  if_icmpeq 24
17             20  iconst_1
18             21  goto 25
19             24  iconst_0                //將int型0壓入棧頂。
20             25  istore 4 [result]        //將棧頂int型數值存入指定本地變量。即將result賦值爲0(false)
21             27  return
22         }
View Code

  接下來再看一下第一個條件成立時的狀況。代碼將p != t改成p == t:

1             public class Test {
2                 public static void main(String[] args) {
3                     int t = 8;
4                     int p = t;
5                     int tail = 9;
6                     boolean result = (p == t && t != (t = tail));
7                     System.out.println("p=" + p + ", t=" + t + ", result=" + result);
8                 }
9             }
View Code

 

  先來看運行結果p=8, t=9, result=true。說明執行了t != (t = tail)語句。

  看反彙編後的字節碼指令:

 1         public class Test {
 2           public static void main(java.lang.String[] args);
 3              0  bipush 8
 4              2  istore_1 [t]
 5              3  iload_1 [t]
 6              4  istore_2 [p]
 7              5  bipush 9
 8              7  istore_3 [tail]
 9              8  iload_2 [p]
10              9  iload_1 [t]
11             10  if_icmpne 24            //比較棧頂兩int型數值大小,當結果不等於0時跳轉。即比較p == t,結果爲true(1)。因此不會跳轉到24行,繼續執行下一行。
12             13  iload_1 [t]                //將變量t壓入棧頂,此時t=8
13             14  iload_3 [tail]            //將變量tail壓入棧頂,tail=9
14             15  dup                        //複製棧頂數值並將複製值壓入棧頂。即複製tail變量值並壓入棧頂,tail=9
15             16  istore_1 [t]            //將棧頂數值存入t變量,同時出棧
16             17  if_icmpeq 24            //比較棧頂兩int型數值大小,當結果等於0時跳轉。此時棧頂有九、8。比較9!=8,結果爲true(1)。因此不會跳轉到24行,繼續執行下一行。
17             20  iconst_1                //將int型1壓入棧頂
18             21  goto 25                    //無條件跳轉到25行
19             24  iconst_0
20             25  istore 4 [result]        //將棧頂1存入result,同時出棧。即result返回true
21             27  return
22         }
View Code

 

  經過字節碼指令分析可知,編譯器是經過if_icmpeq和if_icmpne比較並條件跳轉指令實現&&短路與運算的。在第二種狀況中,還分析了t != (t = tail)語句的執行過程,理解會更加深刻。

 

參考資料:

  《Java併發編程的藝術》

  ConcurrentLinkedQueue源碼分析(http://www.jianshu.com/p/7816c1361439)

相關文章
相關標籤/搜索