本文首發於一世流雲專欄: https://segmentfault.com/blog...
在開始講ConcurrentLinkedDeque
以前,咱們先來了解下Deque這種數據結構,咱們知道Queue是一種具備FIFO特色的數據結構,元素只能在隊首進行「入隊」操做,在隊尾進行「出隊」操做。java
而Deque(double-ended queue)是一種雙端隊列,也就是說能夠在任意一端進行「入隊」,也能夠在任意一端進行「出隊」:node
Deque的數據結構示意圖以下:算法
咱們再來看下JDK中Queue和Deque這兩種數據結構的接口定義,看看Deque和Queue相比有哪些加強:segmentfault
Queue的接口很是簡單,一共只有三種類型的操做:入隊、出隊、讀取。api
上述方法,能夠劃分以下:安全
操做類型 | 拋出異常 | 返回特殊值 |
---|---|---|
入隊 | add(e) | offer(e) |
出隊 | remove() | poll() |
讀取 | element() | peek() |
每種操做類型,都給出了兩種方法,區別就是其中一種操做在隊列的狀態不知足某些要求時,會拋出異常;另外一種,則直接返回特殊值(如null)。數據結構
Queue接口的全部方法Deque都具有,只不過隊首/隊尾均可以進行「出隊」和「入隊」操做:併發
操做類型 | 拋出異常 | 返回特殊值 |
---|---|---|
隊首入隊 | addFirst(e) | offerFirst(e) |
隊首出隊 | removeFirst() | pollFirst() |
隊首讀取 | getFirst() | peekFirst() |
隊尾入隊 | addLast(e) | offerLast(e) |
隊尾出隊 | removeLast() | pollLast() |
隊尾讀取 | getLast() | peekLast() |
除此以外,Deque還能夠看成「棧」來使用,咱們知道「棧」是一種具備「LIFO」特色的數據結構(關於棧,能夠參考個人這篇博文:棧),Deque提供了push
、pop
、peek
這三個棧方法,通常實現這三個方法時,能夠利用已有方法,即有以下映射關係:oracle
棧方法 | Deque方法 |
---|---|
push | addFirst(e) |
pop | removeFirst() |
peek | peekFirst() |
關於Deque接口的更多細節,讀者能夠參考Oracle的官方文檔:https://docs.oracle.com/javas...工具
ConcurrentLinkedDeque
是JDK1.7時,J.U.C包引入的一個集合工具類。在JDK1.7以前,除了Stack類外,並無其它適合併發環境的「棧」數據結構。ConcurrentLinkedDeque做爲雙端隊列,能夠看成「棧」來使用,而且高效地支持併發環境。
ConcurrentLinkedDeque和ConcurrentLinkedQueue同樣,採用了無鎖算法,底層基於自旋+CAS的方式實現。
咱們先來看下ConcurrentLinkedDeque的內部結構:
public class ConcurrentLinkedDeque<E> extends AbstractCollection<E> implements Deque<E>, java.io.Serializable { /** * 頭指針 */ private transient volatile Node<E> head; /** * 尾指針 */ private transient volatile Node<E> tail; private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; static { PREV_TERMINATOR = new Node<Object>(); PREV_TERMINATOR.next = PREV_TERMINATOR; NEXT_TERMINATOR = new Node<Object>(); NEXT_TERMINATOR.prev = NEXT_TERMINATOR; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentLinkedDeque.class; headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } } /** * 雙鏈表結點定義 */ static final class Node<E> { volatile Node<E> prev; // 前驅指針 volatile E item; // 結點值 volatile Node<E> next; // 後驅指針 Node() { } Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } void lazySetPrev(Node<E> val) { UNSAFE.putOrderedObject(this, prevOffset, val); } boolean casPrev(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long prevOffset; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; prevOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("prev")); itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } // ... }
能夠看到,ConcurrentLinkedDeque的內部和ConcurrentLinkedQueue相似,不過是一個雙鏈表結構,每入隊一個元素就是插入一個Node類型的結點。字段head
指向隊列頭,tail
指向隊列尾,經過Unsafe來CAS操做字段值以及Node對象的字段值。
須要特別注意的是ConcurrentLinkedDeque包含兩個特殊字段:PREV_TERMINATOR、NEXT_TERMINATOR。
這兩個字段初始時都指向一個值爲null的空結點,這兩個字段在結點刪除時使用,後面會詳細介紹:
ConcurrentLinkedDeque包含兩種構造器:
/** * 空構造器. */ public ConcurrentLinkedDeque() { head = tail = new Node<E>(null); }
/** * 從已有集合,構造隊列 */ public ConcurrentLinkedDeque(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { // 在隊尾插入元素 t.lazySetNext(newNode); newNode.lazySetPrev(t); t = newNode; } } initHeadTail(h, t); }
咱們重點看下空構造器,經過空構造器創建的ConcurrentLinkedDeque對象,其head和tail指針並不是指向null,而是指向一個item值爲null的Node結點——哨兵結點,以下圖:
雙端隊列與普通隊列的入隊區別是:雙端隊列既能夠在「隊尾」插入元素,也能夠在「隊首」插入元素。ConcurrentLinkedDeque的入隊方法有不少:addFirst(e)
、addLast(e)
、offerFirst(e)
、offerLast(e)
:
public void addFirst(E e) { linkFirst(e); } public void addLast(E e) { linkLast(e); } public boolean offerFirst(E e) { linkFirst(e); return true; } public boolean offerLast(E e) { linkLast(e); return true; }
能夠看到,隊首「入隊」其實就是調用了linkFirst(e)
方法,而隊尾「入隊」是調用了 linkLast(e)方法。咱們先來看下隊首「入隊」——linkFirst(e):
/** * 在隊首插入一個元素. */ private void linkFirst(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); // 建立待插入的結點 restartFromHead: for (; ; ) for (Node<E> h = head, p = h, q; ; ) { if ((q = p.prev) != null && (q = (p = q).prev) != null) // Check for head updates every other hop. // If p == q, we are sure to follow head instead. p = (h != (h = head)) ? h : q; else if (p.next == p) // PREV_TERMINATOR continue restartFromHead; else { // p is first node newNode.lazySetNext(p); // CAS piggyback if (p.casPrev(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // Lost CAS race to another thread; re-read prev } } }
爲了便於理解,咱們以示例來看:假設有兩個線程ThreadA和ThreadB同時進行入隊操做。
①ThreadA先單獨入隊一個元素9
此時,ThreadA會執行CASE3分支:
else { // CASE3: p是隊首結點 newNode.lazySetNext(p); // 「新結點」的next指向隊首結點 if (p.casPrev(null, newNode)) { // 隊首結點的prev指針指向「新結點」 if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // 執行到此處說明CAS操做失敗,有其它線程也在隊首插入元素 }
隊列的結構以下:
②ThreadA入隊一個元素2,同時ThreadB入隊一個元素10
此時,依然執行CASE3分支,咱們假設ThreadA操做成功,ThreadB操做失敗:
else { // CASE3: p是隊首結點 newNode.lazySetNext(p); // 「新結點」的next指向隊首結點 if (p.casPrev(null, newNode)) { // 隊首結點的prev指針指向「新結點」 if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // 執行到此處說明CAS操做失敗,有其它線程也在隊首插入元素 }
ThreadA的CAS操做成功後,會進入如下判斷:
if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK.
上述判斷的做用就是重置head頭指針,能夠看到,ConcurrentLinkedDeque實際上是以每次跳2個結點的方式移動指針,這主要考慮到併發環境以這種hop跳的方式能夠提高效率。
此時隊列的機構以下:
注意,此時ThreadB的p.casPrev(null, newNode)
操做失敗了,因此會進入下一次自旋,在下一次自旋中繼續進入CASE3。若是ThreadA的casHead操做沒有完成,ThreadB就進入了下一次自旋,則會進入分支1,重置指針p指向隊首。最終隊列結構以下:
在隊尾插入元素和隊首相似,再也不贅述,讀者能夠本身閱讀源碼。
ConcurrentLinkedDeque的出隊同樣分爲隊首、隊尾兩種狀況:removeFirst()
、pollFirst()
、removeLast()
、pollLast()
。
public E removeFirst() { return screenNullResult(pollFirst()); } public E removeLast() { return screenNullResult(pollLast()); } public E pollFirst() { for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; } public E pollLast() { for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
能夠看到,兩個remove方法其實內部都調用了對應的poll方法,咱們重點看下隊尾的「出隊」——pollLast方法:
public E pollLast() { for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
last方法用於尋找隊尾結點,即知足p.next == null && p.prev != p
的結點:
Node<E> last() { restartFromTail: for (; ; ) for (Node<E> t = tail, p = t, q; ; ) { if ((q = p.next) != null && (q = (p = q).next) != null) // Check for tail updates every other hop. // If p == q, we are sure to follow tail instead. p = (t != (t = tail)) ? t : q; else if (p == t // It is possible that p is NEXT_TERMINATOR, // but if so, the CAS is guaranteed to fail. || casTail(t, p)) return p; else continue restartFromTail; } }
pred方法用於尋找當前結點的前驅結點(若是前驅是自身,則返回隊尾結點):
final Node<E> pred(Node<E> p) { Node<E> q = p.prev; return (p == q) ? last() : q; }
unlink方法斷開結點的連接:
/** * Unlinks non-null node x. */ void unlink(Node<E> x) { // assert x != null; // assert x.item == null; // assert x != PREV_TERMINATOR; // assert x != NEXT_TERMINATOR; final Node<E> prev = x.prev; final Node<E> next = x.next; if (prev == null) { unlinkFirst(x, next); } else if (next == null) { unlinkLast(x, prev); } else { Node<E> activePred, activeSucc; boolean isFirst, isLast; int hops = 1; // Find active predecessor for (Node<E> p = prev; ; ++hops) { if (p.item != null) { activePred = p; isFirst = false; break; } Node<E> q = p.prev; if (q == null) { if (p.next == p) return; activePred = p; isFirst = true; break; } else if (p == q) return; else p = q; } // Find active successor for (Node<E> p = next; ; ++hops) { if (p.item != null) { activeSucc = p; isLast = false; break; } Node<E> q = p.next; if (q == null) { if (p.prev == p) return; activeSucc = p; isLast = true; break; } else if (p == q) return; else p = q; } // TODO: better HOP heuristics if (hops < HOPS // always squeeze out interior deleted nodes && (isFirst | isLast)) return; // Squeeze out deleted nodes between activePred and // activeSucc, including x. skipDeletedSuccessors(activePred); skipDeletedPredecessors(activeSucc); // Try to gc-unlink, if possible if ((isFirst | isLast) && // Recheck expected state of predecessor and successor (activePred.next == activeSucc) && (activeSucc.prev == activePred) && (isFirst ? activePred.prev == null : activePred.item != null) && (isLast ? activeSucc.next == null : activeSucc.item != null)) { updateHead(); // Ensure x is not reachable from head updateTail(); // Ensure x is not reachable from tail // Finally, actually gc-unlink x.lazySetPrev(isFirst ? prevTerminator() : x); x.lazySetNext(isLast ? nextTerminator() : x); } } }
ConcurrentLinkedDeque相比ConcurrentLinkedQueue,功能更豐富,可是因爲底層結構是雙鏈表,且徹底採用CAS+自旋的無鎖算法保證線程安全性,因此須要考慮各類併發狀況,源碼比ConcurrentLinkedQueue更加難懂,留待有精力做進一步分析。
ConcurrentLinkedDeque使用了自旋+CAS的非阻塞算法來保證線程併發訪問時的數據一致性。因爲隊列自己是一種雙鏈表結構,因此雖然算法看起來很簡單,但其實須要考慮各類併發的狀況,實現複雜度較高,而且ConcurrentLinkedDeque不具有實時的數據一致性,實際運用中,若是須要一種線程安全的棧結構,可使用ConcurrentLinkedDeque。
另外,關於ConcurrentLinkedDeque還有如下須要注意的幾點: