非阻塞隊列ConcurrentLinkedQueue咱們已經瞭解過了,既然是Queue,那麼是否有其雙端隊列實現呢?答案是確定的,今天就繼續說一說非阻塞雙端隊列實現ConcurrentLinkedDequehtml
JDK版本號:1.8.0_171
ConcurrentLinkedDeque是一個基於鏈表實現的無界的線程安全的同時支持FIFO、LIFO非阻塞雙端隊列。操做上可類比ConcurrentLinkedQueue,利用CAS進行無鎖操做,同時經過鬆弛度閾值設置來減小CAS操做,在理解這個類前可先去參考理解我以前對ConcurrentLinkedQueue的源碼分析java
爲了說明的方便,咱們區分下,活動結點是item非null的結點,有效結點是保持着先後關係的結點,做者在註釋中解釋了將一個結點刪除分爲3個步驟:node
區分定義是便於源碼分析時的說明,參考下圖所示理解:安全
public class ConcurrentLinkedDeque<E> extends AbstractCollection<E> implements Deque<E>, java.io.Serializable
爲了方便理解,這裏將ConcurrentLinkedDeque操做過程進行圖示,讓各位先有個瞭解,便於後面源碼的分析性能優化
1.new實例化操做源碼分析
2.offer("1")性能
3.offer("2")優化
4.offerFirst("3")this
5.offerFirst("4")spa
6.pollLast
7.poll
8.poll
9.pollLast
head結點(p.prev == null && p.next != p):
tail結點(p.next == null && p.prev != p):
因爲head結點和tail結點不是實時更新(同ConcurrentLinkedQueue),達到鬆弛度閾值才進行更新,減小CAS操做,有可能致使head結點在tail結點以後的現象
/** * A node from which the first node on list (that is, the unique node p * with p.prev == null && p.next != p) can be reached in O(1) time. * Invariants: * - the first node is always O(1) reachable from head via prev links * - all live nodes are reachable from the first node via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * - head is never gc-unlinked (but may be unlinked) * Non-invariants: * - head.item may or may not be null * - head may not be reachable from the first or last node, or from tail */ private transient volatile Node<E> head; /** * A node from which the last node on list (that is, the unique node p * with p.next == null && p.prev != p) can be reached in O(1) time. * Invariants: * - the last node is always O(1) reachable from tail via next links * - all live nodes are reachable from the last node via pred() * - tail != null * - tail is never gc-unlinked (but may be unlinked) * Non-invariants: * - tail.item may or may not be null * - tail may not be reachable from the first or last node, or from head */ private transient volatile Node<E> tail; // 終止結點,在gc-unlinking階段將無用結點連接到這兩個結點上,自行處理減小內內存滯留風險 private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR; // 刪除結點執行unlinking/gc-unlinking的閾值,當邏輯刪除結點達到閾值才觸發,算是性能優化 private static final int HOPS = 2; // CAS private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; static { PREV_TERMINATOR = new Node<Object>(); PREV_TERMINATOR.next = PREV_TERMINATOR; NEXT_TERMINATOR = new Node<Object>(); NEXT_TERMINATOR.prev = NEXT_TERMINATOR; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentLinkedDeque.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } }
Node實現與ConcurrentLinkedQueue不一樣之處也就在於多了變量prev指向結點的前驅
static final class Node<E> { volatile Node<E> prev; volatile E item; volatile Node<E> next; Node() { // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR } /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext or casPrev. */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } void lazySetPrev(Node<E> val) { UNSAFE.putOrderedObject(this, prevOffset, val); } boolean casPrev(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long prevOffset; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; prevOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("prev")); itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
無參構造方法建立了空結點同時頭尾結點指向這個空結點,集合參數構造時先將全部集合結點構成鏈表,最後經過initHeadTail更新鏈表head,tail便可
public ConcurrentLinkedDeque() { head = tail = new Node<E>(null); } public ConcurrentLinkedDeque(Collection<? extends E> c) { // Copy c into a private chain of Nodes Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); newNode.lazySetPrev(t); t = newNode; } } initHeadTail(h, t); } /** * Initializes head and tail, ensuring invariants hold. */ private void initHeadTail(Node<E> h, Node<E> t) { if (h == t) { if (h == null) h = t = new Node<E>(null); else { // Avoid edge case of a single Node with non-null item. Node<E> newNode = new Node<E>(null); t.lazySetNext(newNode); newNode.lazySetPrev(t); t = newNode; } } head = h; tail = t; }
被addFirst和offerFirst所使用,將元素e添加到隊列頭部,即從頭部入隊操做。linkLast被addLast和offerLast所使用,offer,add一樣最終調用此方法完成操做,將元素e添加到隊列尾部,即從尾部入隊操做,沒什麼好說的,類比linkFirst源碼理解
private void linkFirst(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); restartFromHead: for (;;) for (Node<E> h = head, p = h, q;;) { // 前驅節點不爲null,前驅的前驅節點不爲null if ((q = p.prev) != null && (q = (p = q).prev) != null) // head應該被更新了(已經超過了鬆弛度閾值)判斷是否已經更新了則p更新爲head // 未更新則直接更新爲前驅的前驅結點 p = (h != (h = head)) ? h : q; // p已經出隊,沒辦法從p再繼續判斷了,沒法到達其餘結點,須要從新開始循環 else if (p.next == p) // PREV_TERMINATOR continue restartFromHead; else { // p爲第一個結點,更新新結點next指向p newNode.lazySetNext(p); // CAS piggyback // 嘗試更新p的前驅指向新結點,更新失敗則從新循環更新 if (p.casPrev(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". // 新結點入隊成功,頭結點已經更新了(此時的新結點距離h已經 >= 2個結點距離),嘗試更新head if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // Lost CAS race to another thread; re-read prev } } }
被pollFirst,pollLast,removeFirstOccurrence,removeLastOccurrence和迭代器的remove所使用,移除非空結點,出隊操做和刪除時使用,主要處理處於隊列中間的結點
void unlink(Node<E> x) { // assert x != null; // assert x.item == null; // assert x != PREV_TERMINATOR; // assert x != NEXT_TERMINATOR; final Node<E> prev = x.prev; final Node<E> next = x.next; // 前驅爲null表示x爲頭結點 if (prev == null) { unlinkFirst(x, next); // 後繼爲null表示x爲尾結點 } else if (next == null) { unlinkLast(x, prev); // 非頭尾結點表示x處於中間位置須要特殊處理 } else { Node<E> activePred, activeSucc; boolean isFirst, isLast; // 記錄邏輯刪除結點數 int hops = 1; // Find active predecessor // 找到有效的前驅結點 for (Node<E> p = prev; ; ++hops) { // 有效前驅結點設置 if (p.item != null) { activePred = p; isFirst = false; break; } Node<E> q = p.prev; // p是第一個結點 if (q == null) { // p已經出隊 if (p.next == p) return; // p的item爲null,next還未更新,變量設置 activePred = p; isFirst = true; break; } // p == p.prev表示p已經出隊 else if (p == q) return; // 繼續循環向前查找 else p = q; } // Find active successor // 找到有效的後繼結點 for (Node<E> p = next; ; ++hops) { // 有效後繼結點設置 if (p.item != null) { activeSucc = p; isLast = false; break; } Node<E> q = p.next; // p是最後一個結點 if (q == null) { // p已經出隊 if (p.prev == p) return; // p的item爲null,prev還未更新,變量設置 activeSucc = p; isLast = true; break; } // p == p.next表示p已經出隊 else if (p == q) return; // 繼續循環向後查找 else p = q; } // TODO: better HOP heuristics // 達到邏輯刪除結點閾值或者是內部刪除結點則須要進行額外處理unlink/gc-unlink if (hops < HOPS // always squeeze out interior deleted nodes && (isFirst | isLast)) return; // Squeeze out deleted nodes between activePred and // activeSucc, including x. // 移除有效前驅和後繼結點之間的有效結點,包括x,使得前驅和後繼互連 skipDeletedSuccessors(activePred); skipDeletedPredecessors(activeSucc); // Try to gc-unlink, if possible // 有效前驅和後繼是隊頭或隊尾,嘗試gc-unlink if ((isFirst | isLast) && // Recheck expected state of predecessor and successor // 檢查前驅後繼狀態,確保未改變 (activePred.next == activeSucc) && (activeSucc.prev == activePred) && (isFirst ? activePred.prev == null : activePred.item != null) && (isLast ? activeSucc.next == null : activeSucc.item != null)) { // 更新head和tail 確保x不可達 updateHead(); // Ensure x is not reachable from head updateTail(); // Ensure x is not reachable from tail // Finally, actually gc-unlink // 最後更新x,使得從x到活動節點不可達 x.lazySetPrev(isFirst ? prevTerminator() : x); x.lazySetNext(isLast ? nextTerminator() : x); } } }
unlink中調用,從隊列頭將第一個非空結點出隊。unlinkLast從隊列尾將第一個非空結點出隊,代碼實現與unlinkFirst相似,可參考理解
private void unlinkFirst(Node<E> first, Node<E> next) { // assert first != null; // assert next != null; // assert first.item == null; for (Node<E> o = null, p = next, q;;) { // p爲活動節點或p爲最後一個節點 if (p.item != null || (q = p.next) == null) { // 若是第一次循環就執行到此則不須要進行操做直接返回,p原本就是first的後繼 // p的前驅不能指向本身,first的後繼更新成p if (o != null && p.prev != p && first.casNext(next, p)) { // unlink階段 skipDeletedPredecessors(p); // 檢查first和p,確保沒被更新修改才進行gc-unlink操做 if (first.prev == null && (p.next == null || p.item != null) && p.prev == first) { updateHead(); // Ensure o is not reachable from head updateTail(); // Ensure o is not reachable from tail // Finally, actually gc-unlink o.lazySetNext(o); o.lazySetPrev(prevTerminator()); } } return; } // p == p.next // p非活動結點同時p後繼已經指向本身則直接返回 else if (p == q) return; // p非活動結點,p還有後繼結點,從新賦值循環處理,注意這裏o才被賦值 else { o = p; p = q; } } }
更新head結點,確保在調用此方法以前unlinked的任何結點在該方法返回以後都不能從head訪問,不保證消除鬆弛度,僅僅是head將指向處於活動狀態的結點。updateTail更新tail結點,同updateHead,基本操做一致,只是方向不一樣而已
private final void updateHead() { // Either head already points to an active node, or we keep // trying to cas it to the first node until it does. // head要麼指向一個活動結點要麼嘗試指向第一個結點直到成功 Node<E> h, p, q; restartFromHead: // head指向非活動結點同時head非第一個結點 while ((h = head).item == null && (p = h.prev) != null) { for (;;) { // head前驅的前驅爲空或head前驅的前驅的前驅爲空 // 即head前有1個或2個結點 if ((q = p.prev) == null || (q = (p = q).prev) == null) { // It is possible that p is PREV_TERMINATOR, // but if so, the CAS is guaranteed to fail. // 將head更新指向爲第一個結點 if (casHead(h, p)) return; else // 未成功更新說明已經被其餘線程更新了,從新循環判斷 continue restartFromHead; } // h前有超過2個的結點,代表當前h指向的結點已經與第一個結點距離超過2,同時h已經不指向head了,從新循環 else if (h != head) continue restartFromHead; // h前有超過2個的結點,同時h還指向head,則更新p爲q再次判斷,至關於p向前跳了1或2個結點位置 else p = q; } } }
這裏有個java語法須要注意:continue lable和break lable的做用,可下列參考代碼理解:
System.out.println("continue lable start "); aaa: for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { System.out.println(j); if(j == 1){ continue aaa; } } } System.out.println("continue lable end "); System.out.println("break lable start "); bbb: for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { System.out.println(j); if(j == 1){ break bbb; } } } System.out.println("break lable end ");
skipDeletedPredecessors實現將剛剛找到的後繼結點的前驅指向結點p,即完成它們的互聯,這一步就是所謂的unlinking,使隊列的活動結點沒法訪問被刪除的結點。skipDeletedSuccessors代碼邏輯同skipDeletedPredecessors,可參考理解
private void skipDeletedPredecessors(Node<E> x) { whileActive: do { Node<E> prev = x.prev; // assert prev != null; // assert x != NEXT_TERMINATOR; // assert x != PREV_TERMINATOR; Node<E> p = prev; findActive: for (;;) { // p的item非空,說明p爲活動結點,退出循環進行關聯更新操做 if (p.item != null) break findActive; // p的item爲空,再繼續向前查找其前驅 Node<E> q = p.prev; // p的前驅結點爲空 // 若p結點處於gc-unlinking狀態,即經過p已經沒法到達其餘活動結點,則需重頭開始繼續循環判斷 // 上面條件不知足,則表示p結點處於unlinking狀態,還能夠到達其餘活動結點,能夠繼續被使用 // 代表找到了有效結點,退出循環 if (q == null) { if (p.next == p) continue whileActive; break findActive; } // p的前驅結點非空,p.prev == p // 相等則代表p已經此刻的p結點處於gc-unlinking狀態,即經過p已經沒法到達其餘有效結點 // 沒法再向前遍歷,只能重頭開始循環判斷 else if (p == q) continue whileActive; // 到此表示p的item爲空,p的前驅非空且不處於gc-unlinking狀態 // 循環向前繼續判斷前驅結點 else p = q; } // found active CAS target // 找到活動或有效的前驅節點,前驅CAS更新成功返回不然繼續循環判斷更新 if (prev == p || x.casPrev(prev, p)) return; } while (x.item != null || x.next == null); }
找到結點的前驅或者後繼,假如當前結點已經無效結點時,則返回第一個結點或最後一個結點
/** * Returns the successor of p, or the first node if p.next has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ final Node<E> succ(Node<E> p) { // TODO: should we skip deleted nodes here? Node<E> q = p.next; return (p == q) ? first() : q; } /** * Returns the predecessor of p, or the last node if p.prev has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ final Node<E> pred(Node<E> p) { Node<E> q = p.prev; return (p == q) ? last() : q; }
返回第一個結點,有多是邏輯刪除結點,last操做相似
/** * Returns the first node, the unique node p for which: * p.prev == null && p.next != p * The returned node may or may not be logically deleted. * Guarantees that head is set to the returned node. */ Node<E> first() { restartFromHead: for (;;) for (Node<E> h = head, p = h, q;;) { // p的前驅和前驅的前驅都非空 // 表示p結點以前有2個以上的活動結點 if ((q = p.prev) != null && (q = (p = q).prev) != null) // Check for head updates every other hop. // If p == q, we are sure to follow head instead. // 可能head已經被更新了則判斷下更新h同時更新p // 或者head還未更新則直接將p指向q p = (h != (h = head)) ? h : q; // p的前驅爲空或者前驅的前驅爲空 // p == h 代表p的前驅爲空(第一個條件裏判斷),p就是第一個結點 // p == h 不知足則p的前驅非空,前驅的前驅爲空,則p的前驅爲第一個結點,此時嘗試更新head並返回第一個結點 else if (p == h // It is possible that p is PREV_TERMINATOR, // but if so, the CAS is guaranteed to fail. || casHead(h, p)) return p; // 第二個條件中嘗試更新head失敗,則說明其餘線程更新了head,從新開始循環處理 else continue restartFromHead; } }
經過first()/last()方法返回第一個或最後一個結點的值
public E peekFirst() { for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null) return item; } return null; } public E peekLast() { for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null) return item; } return null; }
removeFirst/removeLast同pollFirst/pollLast操做,最終調用unlink,可參考上面源碼分析
public E pollFirst() { for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; } public E pollLast() { for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
其餘操做都是基於上面的方法進行實現的,就再也不一一列舉了,可自行參考源碼理解
迭代器和以前隊列講解的迭代器ConcurrentLinkedQueue相似,不過因爲其雙向鏈表的實現,迭代器可分爲升序迭代器(Itr)和倒序迭代器(DescendingItr),經過AbstractItr封裝公共操做方法,Itr和DescendingItr分別實現對應不一樣的方法,一個從頭節點開始向後進行遍歷,一個從尾節點向後進行遍歷,這部分和以前講解過的LinkedBlockingDeque是相似的
public Iterator<E> iterator() { return new Itr(); } public Iterator<E> descendingIterator() { return new DescendingItr(); }
主要區別方法在於兩個,經過這兩個方法來完成不一樣方向的遍歷
private class Itr extends AbstractItr { Node<E> startNode() { return first(); } Node<E> nextNode(Node<E> p) { return succ(p); } } /** Descending iterator */ private class DescendingItr extends AbstractItr { Node<E> startNode() { return last(); } Node<E> nextNode(Node<E> p) { return pred(p); } }
抽象類AbstractItr涉及到的方法比較簡單,用到了前面所講解過的方法,可參考以前的分析
private abstract class AbstractItr implements Iterator<E> { /** * Next node to return item for. */ private Node<E> nextNode; /** * nextItem holds on to item fields because once we claim * that an element exists in hasNext(), we must return it in * the following next() call even if it was in the process of * being removed when hasNext() was called. */ private E nextItem; /** * Node returned by most recent call to next. Needed by remove. * Reset to null if this element is deleted by a call to remove. */ private Node<E> lastRet; abstract Node<E> startNode(); abstract Node<E> nextNode(Node<E> p); AbstractItr() { advance(); } /** * Sets nextNode and nextItem to next valid node, or to null * if no such. */ private void advance() { lastRet = nextNode; Node<E> p = (nextNode == null) ? startNode() : nextNode(nextNode); for (;; p = nextNode(p)) { if (p == null) { // p might be active end or TERMINATOR node; both are OK nextNode = null; nextItem = null; break; } E item = p.item; if (item != null) { nextNode = p; nextItem = item; break; } } } public boolean hasNext() { return nextItem != null; } public E next() { E item = nextItem; if (item == null) throw new NoSuchElementException(); advance(); return item; } public void remove() { Node<E> l = lastRet; if (l == null) throw new IllegalStateException(); l.item = null; unlink(l); lastRet = null; } }
至此,隊列部分已基本分析完畢,除了jdk自己的隊列,還有一些比較有名的隊列實現,好比Disruptor,能夠參考美團的這篇文章進行一些深刻了解,對於隊列進行了一些底層的分析總結,比較有幫助
https://tech.meituan.com/2016...
源碼已經分析完畢,咱們以pollFirst出隊操做爲例進行一個總結說明:
在unlinking階段根據結點位置進行不一樣狀況的處理:
1.若是出隊的結點是隊列的第一個結點p,則執行unlinkFirst,其過程以下:
2.若是出隊的結點是隊列的最後一個結點p,則執行unlinkLast,其過程與第1種狀況相似,只是方向不一樣
3.若是出隊的結點時隊列的中間位置,則執行unlink中的一個分支代碼:
總體處理流程已經分析完畢,其餘操做相對來講比較簡單了,須要多理解
ConcurrentLinkedDeque是ConcurrentLinkedQueue的雙端隊列實現,在刪除中涉及到了3個階段,而且因爲其無鎖CAS操做和減小CAS次數的操做,致使其實現的複雜性,須要多寫些例子理解下
以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝