ConcurrentLinkedQueue是JUC中的基於鏈表的無鎖隊列實現。本文將解讀其源碼實現。java
ConcurrentLinkedQueue的實現是以Maged M. Michael和Michael L. Scott的論文Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms爲原型進行改造的,不妨閱讀此篇論文。node
下面我將論文中的介紹的入隊與出隊用接近Java語言的形式改寫並加上註釋。數據結構
enq node = new Node(value, null) loop tail = this.tail next = tail.next # 若是tail已經不是尾節點,重試循環。 if tail == this.tail # 隊列處於穩定狀態,嘗試插入節點。 if next == null # 插入新節點,將尾節點與新節點連接起來。 # 若是成功則退出循環,不然重試。 if CAS(tail.next, next, <node, next.count+1> break # 隊列處於中間狀態,推動尾節點。 else CAS (this.tail, tail, <next, tail.count+1>) # 將尾節點更新爲新插入的節點,失敗不要緊,說明其它線程更新了尾節點。 CAS(this.tail, tail, <node, tail.count+1>) deq loop head = this.head tail = this.tail next = head.next # 若是head已經不是頭節點,重試循環。 if head == this.head if head == tail # 隊列處於穩定狀態則出隊失敗。 if next == null return false # 有其它線程正在入隊,推動尾節點。 CAS(this.tail, tail, <next, tail.count+1>) else # 成功將隊列頭節點CAS到下一個節點則出隊成功,退出循環。 if CAS(this.head, head, <next, head.count+1>) break return true
因爲Java自帶垃圾回收,加上ConcurrentLinkedQueue對節點進行CAS且其內外方法都保證了節點不會複用,因此並不會出現ABA問題,所以節點不須要版本號。oop
正如典型的隊列設計,內部的節點用以下的Node類表示優化
/** * 僅展現屬性,其他略去。 */ private static class Node<E> { volatile E item; volatile Node<E> next; }
值得一提的是Node中有一個lazySetNext方法this
void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
與AtomicReference類同樣,使用了UNSAFE.putOrderedObject方法來實現低延遲的寫入。這個方法會插入Store-Store內存屏障,也就是保證寫操做不會重排。而不會插入普通volatile寫會插入的Store-Load屏障。線程
ConcurrentLinkedQueue在構造時會初始化head和tail爲一個item爲null的節點,做爲哨兵節點。設計
private transient volatile Node<E> head; private transient volatile Node<E> tail;
ConcurrentLinkedQueue的源碼仍是有些晦澀難懂的,可是doc很是詳細,對閱讀源碼很是有幫助。若是帶着從doc中介紹的設計與實現思路去讀源碼會輕鬆很多。指針
ConcurrentLinkedQueue是不容許向其插入空的item的,對於刪除元素,會將其item給CAS爲null,一旦某個元素的item變爲null,就意味着它再也不是隊列中的有效元素了,而且會將已刪除節點的next指針指向自身。
這樣能夠實現儘量快地從已刪除的元素跳事後面刪除的元素,回到隊列中。rest
ConcurrentLinkedQueue具備如下這些性質:
這裏提到了succ方法,那麼先睹爲快,看一下succ方法吧。
final Node<E> succ(Node<E> p) { Node<E> next = p.next; // 若是next就是自身(表明已經不在隊列中),則返回head,不然返回next。 return (p == next) ? head : next; }
由於ConcurrentLinkedQueue中的head和tail均可能會滯後,這實際上是一種避免頻繁CAS的優化。固然過分的滯後也是會影響操做效率的,因此在具體實現的時候,會盡量能有機會更新head和tail就去更新它們。
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 若是p的next爲null,則說明此刻p爲隊列中最後一個元素。 if (q == null) { /* * cas成功則newNode成功入隊,只是此刻tail仍是老的。 * 不然說明由於線程競爭的關係沒有成功入隊,須要重試。 */ if (p.casNext(null, newNode)) { /* * t是當前線程讀到的tail快照,p是上面CAS時隊列中最後一個元素。 * 這二者不一致說明該更新tail了。 * 若是CAS失敗則說明tail已經被其它線程更新過了,這不要緊。 */ if (p != t) casTail(t, newNode); return true; } } /* * ConcurrentLinkedQueue的一個設計就是對於已經移除的元素, * 會將next置爲自己,用於判斷當前元素已經出隊,接着從head繼續遍歷(能夠看succ方法)。 * * 在整個offer方法的執行過程當中,p必定是等於t或者在t的後面的, * 所以若是p已經不在隊列中的話,t也必定不在隊列中了。 * * 因此從新讀取一次tail到快照t, * 若是t未發生變化,就從head開始繼續下去。 * 不然讓p重新的t開始繼續嘗試入隊是一個更好的選擇(此時新的t極可能在head後面) */ else if (p == q) p = (t != (t = tail)) ? t : head; else /* * 若是p與t相等,則讓p繼續向後移動一個節點。 * * 若是p和t不相等,則說明已經經歷至少兩輪循環(仍然沒有入隊), * 則從新讀取一次tail到t,若是t發生了變化,則從t開始再次嘗試入隊。 */ p = (p != t && t != (t = tail)) ? t : q; } }
public E poll() { restartFromHead: for (;;) { // p初始設置爲head。 for (Node<E> h = head, p = h, q;;) { E item = p.item; /* * 成功將item給CAS爲null則說明成功移除了元素。 * 這裏的item != null判斷也是爲了儘量避免無心義的CAS。 */ if (item != null && p.casItem(item, null)) { /* * p若是與h不相等,則說明head極可能滯後,指向已不在隊列中的元素。 * 若是此時p有後繼,則更新head爲p.next, * 不然儘管p已經被移除出去了,也只能更新head爲p了。 */ if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; } /* * 若是沒能成功移除p,且p也沒有後繼,則說明p爲此時隊列的最後元素。 * 因此更新head爲p並返回null。 * * 注意這裏h和p是可能相等的,updateHead會判斷h和p是否相等以免無心義CAS。 */ else if ((q = p.next) == null) { updateHead(h, p); return null; } /* * p存在後繼,須要檢查是否p還在隊列中。 * 若是p已經不在隊列中(p==q),則從新讀一次head到快照h並讓p從h開始再嘗試移除元素。 * * 由於必定有其它線程已經經過updateHead將head從p給CAS爲新的head而且令p節點的next指向p本身, * 這時再一步步日後面走顯然不值得,不如從如今的head開始從新來過。 */ else if (p == q) continue restartFromHead; // 繼續向後走一個節點嘗試移除元素。 else p = q; } } } final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 其實這裏的if就是將poll中的if前兩個分支作了個合併。 if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } } }
public boolean remove(Object o) { if (o != null) { Node<E> next, pred = null; // p爲當前節點,pred爲p前驅,next爲後繼。 for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; // item爲null表明元素已經無效(認爲不在隊列中) if (item != null) { // 不是要刪除的元素。 if (!o.equals(item)) { next = succ(p); continue; } removed = p.casItem(item, null); } next = succ(p); if (pred != null && next != null) // 前驅與後繼連上。 pred.casNext(p, next); if (removed) return true; } } return false; }
/** * size方法效率其實挺差的,是一個O(n)的遍歷。 */ public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // 最多隻返回Integer.MAX_VALUE if (++count == Integer.MAX_VALUE) break; return count; } /** * 這個方法和poll/peek方法差很少,只不過返回的是Node而不是元素。 * * 之因此peek方法沒有複用first方法的緣由有2點 * 1. 會增長一次volatile讀 * 2. 有可能會由於和poll方法的競爭,致使出現非指望的結果。 * 好比first返回的node非null,裏面的item也不是null。 * 可是等到poll方法返回從first方法拿到的node的item的時候,item已經被poll方法CAS爲null了。 * 那這個問題只能再peek中增長重試,這未免代價過高了。 * * 這就是first和peek代碼沒有複用的緣由。 */ Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }