Java同步數據結構之ConcurrentLinkedQueue

前言

前面介紹的Queue都是經過Lock鎖實現的阻塞隊列,今天介紹一種非阻塞隊列ConcurrentLinkedQueue,所謂非阻塞,其實就是經過CAS代替加鎖來實現的高效的非阻塞隊列。當許多線程共享對公共集合的訪問時,ConcurrentLinkedQueue是一個合適的選擇。與大多數其餘併發集合實現同樣,該類不容許使用空元素。node

ConcurrentLinkedQueue是一個基於鏈表的無界線程安全的先進先出隊列。雖然前面介紹的隊列也有基於鏈表的實現,例如LinkedBlockingQueue以及SynchronousQueue的公平隊列實現,可是ConcurrentLinkedQueue的鏈表實現與它們有本質的差異,LinkedBlockingQueue的鏈表實現存在老是指向第一個節點的虛擬head節點,以及始終指向隊列最後一個節點的tail節點,可是ConcurrentLinkedQueue的head與tail則更加靈活多變,ConcurrentLinkedQueue有以下一些基本約束特性數組

1.CAS入隊的最後一個節點的next指向爲null。
2.隊列中的全部未刪除節點是那些item不爲null,而且都能從head節點訪問到的節點,由於刪除節點是經過CAS將其item引用置爲null。迭代器會跳過那些item爲null的節點。因此若是隊列是空的,那麼全部item固然都必須爲空。
3.head並不老是指向隊列的第一個元素,tail也並不老是指向隊列的最後一個節點。安全

針對ConcurrentLinkedQueue的headtail節點,有以下一些特性:多線程

  不變性 可變性
head

1.全部未刪除的節點均可以從head節點經過succ()方法訪問到併發

2.head不會爲nullapp

3.head節點的next不會指向自身異步

1.head的item可能爲null,也可能不爲nullide

2.容許tail滯後於head,即容許從head經過succ()不能訪問到tail。高併發

tail

1.最後一個節點老是能夠從tail經過succ()方法訪問到優化

2.tail不會爲null

1.tail的item可能爲null,也可能不爲null

2.容許tail滯後於head,即容許從head經過succ()不能訪問到tail。

3.tail節點的next能夠指向自身,也能夠不指向自身。

源碼解析

進行源碼解析以前,先看看ConcurrentLinkedQueue定義的內部節點類Node:

 1 private static class Node<E> {  2     volatile E item;  3     volatile Node<E> next;  4 
 5     /**
 6  * Constructs a new node. Uses relaxed write because item can  7  * only be seen after publication via casNext.  8      */
 9  Node(E item) { 10         UNSAFE.putObject(this, itemOffset, item); 11  } 12 
13     boolean casItem(E cmp, E val) { 14         return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 15  } 16 
17     void lazySetNext(Node<E> val) { 18         UNSAFE.putOrderedObject(this, nextOffset, val); 19  } 20 
21     boolean casNext(Node<E> cmp, Node<E> val) { 22         return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 23  } 24 
25     // Unsafe mechanics
26 
27     private static final sun.misc.Unsafe UNSAFE; 28     private static final long itemOffset; 29     private static final long nextOffset; 30 
31     static { 32         try { 33             UNSAFE = sun.misc.Unsafe.getUnsafe(); 34             Class<?> k = Node.class; 35             itemOffset = UNSAFE.objectFieldOffset 36                 (k.getDeclaredField("item")); 37             nextOffset = UNSAFE.objectFieldOffset 38                 (k.getDeclaredField("next")); 39         } catch (Exception e) { 40             throw new Error(e); 41  } 42  } 43 }
Node節點內部類

Node節點內部類很簡單,只包含節點數據item以及指向下一個節點的next引用,和其它一些輔助方法,這裏不表,接下來看構造方法吧。

構造方法

 1 private transient volatile Node<E> head;  2 
 3 private transient volatile Node<E> tail;  4 
 5 /**
 6  * 構造一個空隊列  7  */
 8 public ConcurrentLinkedQueue() {  9     head = tail = new Node<E>(null); 10 } 11 
12 /**
13  * 構造一個包含給定集合元素的ConcurrentLinkedQueue實例 14  */
15 public ConcurrentLinkedQueue(Collection<? extends E> c) { 16     Node<E> h = null, t = null; 17     for (E e : c) { //遍歷
18  checkNotNull(e); 19         Node<E> newNode = new Node<E>(e); 20         if (h == null) 21             h = t = newNode; 22         else { 23             //這裏使用延遲賦值是由於後面對head、tail的寫是volatile寫因此能夠保證可見性 24             //延遲賦值採用putOrderedObject方法只關注不被重排序便可。
25  t.lazySetNext(newNode); 26             t = newNode; 27  } 28  } 29     if (h == null) 30         h = t = new Node<E>(null); 31     head = h; 32     tail = t; 33 }
View Code

構造方法也很簡單,當構造空隊列的實例時,其head於tail都指向同一個item爲null的虛擬節點;當以給定集合構造實例時,head指向第一個元素,tail指向最後一個元素。

入隊offer

ConcurrentLinkedQueue的入隊操做add、offer都是經過offer方法實現的,它將指定的元素插入到隊列的最後一個元素後面,下面看看其源碼:

 1 /**
 2  * 將指定的元素插入到此隊列的末尾。  3  * 由於隊列是無界的,因此這個方法永遠不會返回false。  4  *  5  * 若是指定的元素爲null,拋出NullPointerException  6  */
 7 public boolean offer(E e) {  8  checkNotNull(e);  9     final Node<E> newNode = new Node<E>(e); 10 
11     for (Node<E> t = tail, p = t;;) {//死循環,直到成功
12         Node<E> q = p.next; 13         //第一個if塊
14         if (q == null) { //p是最後一個節點,嘗試加入到隊列最後一個元素後面
15             if (p.casNext(null, newNode)) { //CAS 嘗試將新節點掛載到最後一個節點的next 16                 // Successful CAS is the linearization point 17                 // for e to become an element of this queue, 18                 // and for newNode to become "live".
19                 if (p != t) //最後一個節點距離tail大於1個節點時,更新tail指向最後一個節點
20                     casTail(t, newNode);  // Failure is OK. 爲何容許失敗。由於tail可能被其它操做搶先設置好了
21                 return true; 22             } //CAS失敗,從新循環嘗試。 23             // Lost CAS race to another thread; re-read next
24         }//第二個if塊
25         else if (p == q) //到這裏說明p節點已經被移除了。 26             // We have fallen off list. If tail is unchanged, it 27             // will also be off-list, in which case we need to 28             // jump to head, from which all live nodes are always 29             // reachable. Else the new tail is a better bet. 30             //tail節點已經被更新就取新的tail,不然取head,從新循環尋找最後一個節點嘗試。
31             p = (t != (t = tail)) ? t : head; 32         else //第三個if塊 33             //到這裏說明tail並非指向的隊列中最後一個元素。 34             
35             //這時候若是tail已經被其它操做更新了則取最新的tail,不然取p的next節點繼續循環嘗試。 36             // Check for tail updates after two hops.
37             p = (p != t && t != (t = tail)) ? t : q; 38  } 39 }
View Code

由於隊列的無界的,因此操做在一個死循環中嘗試,直到成功返回true。入隊操做主要分三種狀況(三個if塊):

1. 找到了隊列的最後一個元素(其next指向null),嘗試將新節點加到其next,成功返回失敗繼續循環嘗試。

2. 當前節點被移除了,須要從新定位起點進行循環找到最後一個元素,定位到哪兒取決於操做期間tail是否改變,tail改變則重新tail開始,不然只能從head開始。

3. 當前節點不是最後一個元素也沒有被移除,這時候若是tail改變了則重新tail開始,不然直接從當前節點的next節點開始繼續循環查找最後一個元素。

值得注意的是,在將新元素節點插入到最後一個節點以後,若是當前tail指向的節點距離隊列最後一個節點超過一個節點時須要更新tail使其指向隊列最後一個節點,不然不對tail作更新。因而可知,tail要麼指向隊列最後一個節點,要麼指向隊列的倒數第二個節點。若是發現有節點next指向自身則會認爲是被移除的節點,從而摒棄它從新定位起點尋找最後一個節點。

下面以入隊操做的圖例來講明這個隊列的變化過程,首先假設隊列最開始是一個空隊列,那麼其head、tail都指向一個item爲null的虛擬節點,以下所示:

offer("A")

按照入隊的邏輯分析過程爲:tail指向的虛擬節點p的next爲null,即q==p.next==null,因此直接執行第一個if塊:p.casNext(null, newNode)將新節點掛載到虛擬節點p的next,成功以後因爲tail指向的虛擬節點p==t,因此不會執行casTail(t, newNode)來更新tail指向剛纔入隊的節點,結束返回true,這時候隊列的狀態以下:

如上圖所示,在入隊第一個元素以後,head和tail亦然指向的當初的虛擬節點,虛擬節點指向剛纔入隊的節點A,這時候節點A就是隊列的最後一個節點,tail並無指向它。

offer("B")

接着咱們再入隊一個元素B,按照入隊邏輯分析過程爲:tail指向的虛擬節點p的next爲A不爲空,因此第一個if塊不成立,而且q=p.next=A !=p,因此第二個if塊也不成立,進入第三個if塊執行p = (p != t && t != (t = tail)) ? t : q,假設此時tail並無被其它線程更新,因此三目運算的條件不成立,p=q即p==A,進入第二輪循環。第二輪循環此時p指向節點A,其next爲null,因此第一個if塊成立,將新節點B插入A節點的next,這時候由於p指向節點A,t即tail依然指向虛擬節點1.因此p != t 執行casTail(t, newNode)將更新tail執行新入隊的節點B,結束返回true,這時候隊列的狀態以下:

如上圖所示,在節點B入隊以後,head依然指向當初的虛擬節點,但tail節點被更新指向到了最後一個節點,因此ConcurrentLinkedQueue會保證tail節點與隊列最後一個節點直接的距離不超過一個節點,當超過期會更新tail指向最後一個節點,依次類推,加入咱們再執行offer("C"),那麼隊列將變成這樣:

tail的這種並非每次都須要更新的策略能夠提升程序的併發度,儘可能減小沒必要要的CAS操做,可是程序的複雜度不言而喻變得更復雜難懂。

出隊poll()

 1 public E poll() {  2  restartFromHead:  3     for (;;) {  4         for (Node<E> h = head, p = h, q;;) {  5             E item = p.item;  6             
 7             //第一個if塊,p.item 不爲null,則將item 設置爲null
 8             if (item != null && p.casItem(item, null)) {  9                 //出隊的p節點不是head,若是p後面還有節點更新head指向p.next,不然指向p,原head指向自身
10                 if (p != h) // hop two nodes at a time
11                     updateHead(h, ((q = p.next) != null) ? q : p); 12                 return item; 13             }//第二個if塊,到這裏說明p節點item爲null或者已經被其它線程搶先拿走(casItem失敗), 14             // p.next == null 說明被其它線程搶先拿走的p是隊列中最後一個有效節點,如今隊列已經空了
15             else if ((q = p.next) == null) { 16                 updateHead(h, p); //更新head指向p,將原來的head的next指向自身
17                 return null; 18             }//第三個if塊,到這裏說明p已經被其它線程搶先拿走了,須要從新開始循環
19             else if (p == q) 20                 continue restartFromHead; 21             else//第四個if塊,到這裏說明p是一個虛擬節點,而且隊列不爲空,繼續看它的下一個節點q
22                 p = q; 23  } 24  } 25 }
View Code

入隊邏輯主要在tail節點處作處理,而出隊固然是從頭部出隊,因此在head節點處作處理,出隊的邏輯也是由包含4個if塊的死循環構成:

1. 從head處出發發現第一個item不爲空的節點,則CAS更新item爲null,若是出隊的節點不是指向head就須要將head指向新的節點(隊列不爲空指向下一個節點,爲空指向出隊的節點),原head的next指向自身。

2. 發現隊列爲空了,更新head指向最後一個無效節點(item,next都爲null),原head的next指向自身。

3. 節點搶先被其它線程拿走了,則從新從head處開始尋找第一個有效節點(item不爲空)

4. 若是當前節點是一個虛擬節點(好比第一次)而且隊列不爲空,那麼指向下一個節點繼續嘗試。

因爲隊列實際是以一個節點實例包裝實際數據存儲在隊列中的,出隊時只須要拿到實際的節點數據而不關心其依附的節點,因此出隊時其依附的節點並不必定會被移除,而僅僅是將其item置空了,下面以實際的操做來觀察隊列的變化。

第一次poll()

以上面包含A,B,C三個節點的隊列爲例,執行第一次poll操做:因爲head指向的是一個虛擬節點,其item爲null因此第一個if塊不成立,隊列不爲空第二個if也不成立,虛擬節點的next爲節點A不爲空第三個if塊也不成立,因此進入第四個if塊p指向虛擬節點的next即節點A,進入下一次循環,節點A的item不爲空,執行第一個if塊的p.casItem(item, null),假設成功,此時head指向虛擬節點,而p指向的節點A,而且A節點的next爲節點B不爲空,因此執行updateHead(h, 節點B),因此head會指向節點B,原來的head即虛擬節點1會指向自身,而節點A除了其item被置爲null沒有任何變化,最終隊列以下:

由上圖看出,poll拿走A以後,節點A並無被移除而僅僅是其item被置空,而是原head指向的虛擬節點1把next指向了自身從而脫離了隊列,新的head指向了節點B.

第二次poll()

若是再次poll:此時head指向節點B,其item不爲空,第一個if塊成立,執行第一個if塊的p.casItem(item, null),假設成功,此時head指向節點B,因此p != h不成立直接返回B,此時隊列狀態以下:

此時因爲head指向的節點B便是被出隊的節點數據,因此head並不會被更新,head這種相似tail同樣並非每次都更新的策略也同樣可以減小CAS的次數提升併發度。

第三次poll()

此時head指向item爲null的節點B,因此同第一次poll同樣,前三個if塊都不成立,第四個if塊將p定位到下一個節點C,第二次循環開始,第一個if塊知足,執行p.casItem(item, null)將節點C的item置爲null,因爲此時p指向節點C,head指向節點B,因此p !=h成立,而且節點C的next爲null,因此執行updateHead(h, 節點C);最終head指向節點C,節點B指向自身,隊列狀態以下:

上圖證實了ConcurrentLinkedQueue容許tail滯後於head,即容許從head經過succ()不能訪問到tail這一特性,以及tail節點的next能夠指向自身的特性。

offer("D")

在三次poll以後,隊列其實已經空了,而且根據上圖head指向C,tail指向的B,若是這時候咱們入隊元素D,會怎麼樣?此時tail指向節點C,節點C的next爲null,知足第一個if塊,執行p.casNext(null, newNode)將節點D掛載到節點C以後,而且由於tail指向的節點B,因此p!=t執行casTail(t, newNode)將tail指向了節點D.此時隊列的狀態以下:

此時head指向節點C,tail指向節點D.左邊的鏈表部分已經脫離了隊列變得無心義。

offer("E")

若是再次入隊一個元素E,此時tail指向節點D,節點D的next爲null,因此知足第一個if塊,執行p.casNext(null, newNode)將新節點E插入到節點D後面,而且p即節點D == t即tail,因此不會更新tail,直接返回true,最後隊列以下:

其它輔助方法

獲取但不移除peek()

 1 public E peek() {  2  restartFromHead:  3     for (;;) {  4         for (Node<E> h = head, p = h, q;;) {  5             E item = p.item;  6             //item不爲空,或者隊列爲空
 7             if (item != null || (q = p.next) == null) {  8                 updateHead(h, p); //更新head指向p,原head指向自身
 9                 return item; //返回
10             }//到這裏說明當前節點item爲空而且隊列不爲空,若是當前節點p已經被移除,從新循環
11             else if (p == q) 12                 continue restartFromHead; 13             else //到這裏說明當前節點是一個item爲空的節點,而且隊列不爲空那麼循環其下一個節點
14                 p = q; 15  } 16  } 17 }
View Code

 peek雖然僅僅是獲取但不移除節點,可是也會在返回去更新head,以上面offer("A")以後的隊列爲例,此時執行peek:head指向虛擬節點1,其item爲null,其next指向節點A不爲null,第一個if條件不成立,假設此時沒有其它線程將虛擬節點移除,天然第二個if塊也不成立,到第三個if塊使p指向節點A,繼續下一輪循環,此時p指向的節點A其item不爲null,第一個if條件成立,執行updateHead(h, p),將head指向節點A,原來的head指向自身,最後返回「A」,最終隊列的狀態以下:

此時從head不可訪問tail,若是此時繼續執行peek,那麼head指向節點A,其item不爲null,第一個if條件成立,執行updateHead(h, p);因爲updateHead裏面存在 if (h != p && casHead(h, p))這樣的判斷,此時h == p因此並不會執行更新head的操做。當隊列爲空時,peek將返回null。

Node<E> first()

另外一個first方法和peek方法其實邏輯徹底同樣,不一樣的是first返回的是包裝數據的節點,而peek返回的是節點包裝的數據,這裏就不作分析了。

final Node<E> succ(Node<E> p) 

若是p節點的next指向自身則返回head,不然返回p.next。

public int size()

返回此隊列中的元素數量,若是超過了Integer.MAX_VALUE,返回Integer.MAX_VALUE,因爲隊列的異步性,此方法返回結果並不必定正確,或者說僅僅是一個舜態值。

public boolean contains(Object o)

若是此隊列包含指定的元素,則返回true。更正式地說,當且僅當此隊列包含至少一個元素e,使得o.equals(e)時返回true。注意比較的是真正的數據而不是節點哦。

public boolean remove(Object o)

從隊列中移除一個與指定元素數據相等的實例,即便隊列中存在多個相等的也僅僅只移除一個,注意比較的是節點數據。remove方法會使對應節點的item被置爲null,而且使該節點的前一個節點的next指向被移除節點的下一個節點,即修改next跳過了這個被置空了item的節點。

public boolean addAll(Collection<? extends E> c)

將指定集合中的全部元素按照指定集合的迭代器返回的順序追加到此隊列的末尾。試圖將隊列添加到自身會致使IllegalArgumentException即c == this。該方法先把指定集合C中的元素構形成一個鏈表,最後再把這個鏈表的head連接到當前ConcurrentLinkedQueue隊列的尾部,而後更新tail指向新的尾節點。

public Object[] toArray()

按適當的順序返回包含此隊列中全部非空元素(item不爲null)的數組。該方法會分配一個新數組存儲隊列中每一個節點的item引用,因此調用者能夠隨意修改返回的數組並不會對原隊列產生任何影響。此方法充當數組和集合之間的橋樑API。

public <T> T[] toArray(T[] a)

toArray()方法不一樣在於它會嘗試將隊列中的item數據的類型轉換成指定的類型並存儲在指定的數組中,若是類型匹配而且指定的數組容量足夠的話。不然將按照指定數組的運行時類型和該隊列的大小分配一個新數組用於存儲隊列中item不爲null的元素。數組中緊跟在隊列末尾的元素將會被設置成null,所以調用者能夠經過判斷數組中出現的第一個null來判斷隊列元素已經結束。這種方法容許對輸出數組的運行時類型進行精確控制,在某些狀況下,還能夠用來節省分配成本。toArray(new Object[0])在形式上與toArray()是徹底相同的。

 

該類不能保證批量操做addAll、removeAll、retainAll、containsAll、equals和toArray的原子性。例如,與addAll操做併發操做的迭代器可能只會看到一部分添加的元素。

內存一致性影響:
與其餘併發集合同樣, 一個線程先將一個對象放入ConcurrentLinkedQueue的動做 happen-before 後來的線程從ConcurrentLinkedQueue中執行訪問或者刪除該元素的操做。

迭代器

ConcurrentLinkedQueue的迭代器在建立實例的時候就已經拿到了第一個節點以及節點item數據,每一次執行next的時候又準備好下一次迭代的返回對象,同ArrayBlockingQueue同樣,它也有一個lastRet變量用來暫時存儲當前迭代的節點,用於在it.next調用完成以後,調用it.remove()時避免刪除不該該刪除的元素。

 1 public Iterator<E> iterator() {  2     return new Itr();  3 }  4 
 5 private class Itr implements Iterator<E> {  6 
 7     /**
 8  * 下一次調用next時對應的節點  9      */
10     private Node<E> nextNode; 11 
12     /**
13  * nextItem引用item對象,由於一旦咱們聲明hasNext()中存在一個元素,咱們必須在下一個next()調用中返回它,即便它在hasNext()過程當中被刪除。 14      */
15     private E nextItem; 16 
17     /**
18  * 上一次next返回的item對應的節點,用於remove() 19      */
20     private Node<E> lastRet; 21 
22  Itr() { 23  advance(); 24  } 25 
26     /**
27  * 移動到下一個有效節點並返回調用next()時的返回值,若是沒有,則返回null。 28      */
29     private E advance() { 30         lastRet = nextNode; 31         E x = nextItem; 32 
33         Node<E> pred, p; 34         if (nextNode == null) { 35             p = first(); 36             pred = null; 37         } else { 38             pred = nextNode; 39             p = succ(nextNode); 40  } 41 
42         for (;;) { 43             if (p == null) { 44                 nextNode = null; 45                 nextItem = null; 46                 return x; 47  } 48             E item = p.item; 49             if (item != null) { 50                 nextNode = p; 51                 nextItem = item; 52                 return x; 53             } else { 54                 // 跳過item爲null的節點
55                 Node<E> next = succ(p); 56                 if (pred != null && next != null) 57                     pred.casNext(p, next);//輔助斷開無效節點
58                 p = next; 59  } 60  } 61     }
View Code

 

在建立迭代器實例的時候執行了一次advance()方法,準備好了第一個有效節點nextNode,以及其item引用,hasNext直接判斷nextNode不爲空便可,保證了迭代器的弱一致性,一旦hasNext返回true,那麼調用next必定會獲得相對應的item,即便在者之間該節點item已經被置爲空了。

 1 public boolean hasNext() {  2     return nextNode != null; //檢測的nextNode節點
 3 }  4 
 5 public E next() {  6     if (nextNode == null) throw new NoSuchElementException();  7     return advance(); //仍是調用的advance();
 8 }  9 
10 public void remove() { 11     Node<E> l = lastRet; 12     if (l == null) throw new IllegalStateException(); 13     // rely on a future traversal to relink.
14     l.item = null; //將item置爲null
15     lastRet = null; 16 }
View Code

 

next方法仍是調用的advance()方法,remove方法藉助了lastRet來將item置爲null,因爲直接操做的隊列中的節點,因此迭代器的remove會真正的將隊列中的節點item置爲空,從而影響ConcurrentLinkedQueue隊列自己。

可拆分迭代器Spliterator

ConcurrentLinkedQueue實現了本身的可拆分迭代器CLQSpliterator,從spliterator方法就能夠看到:

public Spliterator<E> spliterator() { return new CLQSpliterator<E>(this); }

可拆分迭代器的 tryAdvance、forEachRemaining、trySplit方法都是非阻塞的,tryAdvance獲取第一個item不爲空的節點數據作指定的操做,forEachRemaining循環遍歷當前迭代器中全部沒有被移除的節點數據(item不爲空)作指定的操做源碼都很簡單,就不貼代碼了,至於它的拆分方法trySplit,其實和LinkedBlockingQueue的拆分方式是同樣的,代碼都幾乎一致,它不是像ArrayBlockingQueue那樣每次分一半,而是第一次只拆一個元素,第二次拆2個,第三次拆三個,依次內推,拆分的次數越多,拆分出的新迭代器分的得元素越多,直到一個很大的數MAX_BATCH(33554432) ,後面的迭代器每次都分到這麼多的元素,拆分的實現邏輯很簡單,每一次拆分結束都記錄下拆分到哪一個元素,下一次拆分從上次結束的位置繼續往下拆分,直到沒有元素可拆分了返回null。

總結

ConcurrentLinkedQueue不少時候都是與LinkedBlockingQueue相對應的,ConcurrentLinkedQueue使用CAS實現了非阻塞的隊列操做,而不是像LinkedBlockingQueue那樣的雙鎖實現,ConcurrentLinkedQueue雖然也有head、tail節點的概念,可是不一樣於LinkedBlockingQueue,ConcurrentLinkedQueue的head並非老是指向第一個節點,tail也不必定老是指向最後一個節點,只有噹噹前指針距離第一個/最後一個節點有兩個或更多步時,纔將更新head/tail,這種減小CAS次數的設計是一種優化,總的來講它比起LinkedBlockingQueue來講,ConcurrentLinkedQueue更多的使用與多線程共享訪問同一個集合這種場景。

相關文章
相關標籤/搜索