public class ConcurrentLinkedDeque<E> extends AbstractCollection<E> implements Deque<E>, java.io.Serializable
head結點(p.prev == null && p.next != p):
tail結點(p.next == null && p.prev != p):
/** * A node from which the first node on list (that is, the unique node p * with p.prev == null && p.next != p) can be reached in O(1) time. * Invariants: * - the first node is always O(1) reachable from head via prev links * - all live nodes are reachable from the first node via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * - head is never gc-unlinked (but may be unlinked) * Non-invariants: * - head.item may or may not be null * - head may not be reachable from the first or last node, or from tail */ private transient volatile Node<E> head; /** * A node from which the last node on list (that is, the unique node p * with p.next == null && p.prev != p) can be reached in O(1) time. * Invariants: * - the last node is always O(1) reachable from tail via next links * - all live nodes are reachable from the last node via pred() * - tail != null * - tail is never gc-unlinked (but may be unlinked) * Non-invariants: * - tail.item may or may not be null * - tail may not be reachable from the first or last node, or from head */ private transient volatile Node<E> tail; // 終止結點,在gc-unlinking階段將無用結點連接到這兩個結點上,自行處理減小內內存滯留風險 private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR; // 刪除結點執行unlinking/gc-unlinking的閾值,當邏輯刪除結點達到閾值才觸發,算是性能優化 private static final int HOPS = 2; // CAS private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; static { PREV_TERMINATOR = new Node<Object>(); PREV_TERMINATOR.next = PREV_TERMINATOR; NEXT_TERMINATOR = new Node<Object>(); NEXT_TERMINATOR.prev = NEXT_TERMINATOR; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentLinkedDeque.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } }
static final class Node<E> { volatile Node<E> prev; volatile E item; volatile Node<E> next; Node() { // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR } /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext or casPrev. */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } void lazySetPrev(Node<E> val) { UNSAFE.putOrderedObject(this, prevOffset, val); } boolean casPrev(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long prevOffset; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; prevOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("prev")); itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
public ConcurrentLinkedDeque() { head = tail = new Node<E>(null); } public ConcurrentLinkedDeque(Collection<? extends E> c) { // Copy c into a private chain of Nodes Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); newNode.lazySetPrev(t); t = newNode; } } initHeadTail(h, t); } /** * Initializes head and tail, ensuring invariants hold. */ private void initHeadTail(Node<E> h, Node<E> t) { if (h == t) { if (h == null) h = t = new Node<E>(null); else { // Avoid edge case of a single Node with non-null item. Node<E> newNode = new Node<E>(null); t.lazySetNext(newNode); newNode.lazySetPrev(t); t = newNode; } } head = h; tail = t; }
private void linkFirst(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); restartFromHead: for (;;) for (Node<E> h = head, p = h, q;;) { // 前驅節點不爲null,前驅的前驅節點不爲null if ((q = p.prev) != null && (q = (p = q).prev) != null) // head應該被更新了(已經超過了鬆弛度閾值)判斷是否已經更新了則p更新爲head // 未更新則直接更新爲前驅的前驅結點 p = (h != (h = head)) ? h : q; // p已經出隊,沒辦法從p再繼續判斷了,沒法到達其餘結點,須要從新開始循環 else if (p.next == p) // PREV_TERMINATOR continue restartFromHead; else { // p爲第一個結點,更新新結點next指向p newNode.lazySetNext(p); // CAS piggyback // 嘗試更新p的前驅指向新結點,更新失敗則從新循環更新 if (p.casPrev(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". // 新結點入隊成功,頭結點已經更新了(此時的新結點距離h已經 >= 2個結點距離),嘗試更新head if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // Lost CAS race to another thread; re-read prev } } }
void unlink(Node<E> x) { // assert x != null; // assert x.item == null; // assert x != PREV_TERMINATOR; // assert x != NEXT_TERMINATOR; final Node<E> prev = x.prev; final Node<E> next = x.next; // 前驅爲null表示x爲頭結點 if (prev == null) { unlinkFirst(x, next); // 後繼爲null表示x爲尾結點 } else if (next == null) { unlinkLast(x, prev); // 非頭尾結點表示x處於中間位置須要特殊處理 } else { Node<E> activePred, activeSucc; boolean isFirst, isLast; // 記錄邏輯刪除結點數 int hops = 1; // Find active predecessor // 找到有效的前驅結點 for (Node<E> p = prev; ; ++hops) { // 有效前驅結點設置 if (p.item != null) { activePred = p; isFirst = false; break; } Node<E> q = p.prev; // p是第一個結點 if (q == null) { // p已經出隊 if (p.next == p) return; // p的item爲null,next還未更新,變量設置 activePred = p; isFirst = true; break; } // p == p.prev表示p已經出隊 else if (p == q) return; // 繼續循環向前查找 else p = q; } // Find active successor // 找到有效的後繼結點 for (Node<E> p = next; ; ++hops) { // 有效後繼結點設置 if (p.item != null) { activeSucc = p; isLast = false; break; } Node<E> q = p.next; // p是最後一個結點 if (q == null) { // p已經出隊 if (p.prev == p) return; // p的item爲null,prev還未更新,變量設置 activeSucc = p; isLast = true; break; } // p == p.next表示p已經出隊 else if (p == q) return; // 繼續循環向後查找 else p = q; } // TODO: better HOP heuristics // 達到邏輯刪除結點閾值或者是內部刪除結點則須要進行額外處理unlink/gc-unlink if (hops < HOPS // always squeeze out interior deleted nodes && (isFirst | isLast)) return; // Squeeze out deleted nodes between activePred and // activeSucc, including x. // 移除有效前驅和後繼結點之間的有效結點,包括x,使得前驅和後繼互連 skipDeletedSuccessors(activePred); skipDeletedPredecessors(activeSucc); // Try to gc-unlink, if possible // 有效前驅和後繼是隊頭或隊尾,嘗試gc-unlink if ((isFirst | isLast) && // Recheck expected state of predecessor and successor // 檢查前驅後繼狀態,確保未改變 (activePred.next == activeSucc) && (activeSucc.prev == activePred) && (isFirst ? activePred.prev == null : activePred.item != null) && (isLast ? activeSucc.next == null : activeSucc.item != null)) { // 更新head和tail 確保x不可達 updateHead(); // Ensure x is not reachable from head updateTail(); // Ensure x is not reachable from tail // Finally, actually gc-unlink // 最後更新x,使得從x到活動節點不可達 x.lazySetPrev(isFirst ? prevTerminator() : x); x.lazySetNext(isLast ? nextTerminator() : x); } } }
private void unlinkFirst(Node<E> first, Node<E> next) { // assert first != null; // assert next != null; // assert first.item == null; for (Node<E> o = null, p = next, q;;) { // p爲活動節點或p爲最後一個節點 if (p.item != null || (q = p.next) == null) { // 若是第一次循環就執行到此則不須要進行操做直接返回,p原本就是first的後繼 // p的前驅不能指向本身,first的後繼更新成p if (o != null && p.prev != p && first.casNext(next, p)) { // unlink階段 skipDeletedPredecessors(p); // 檢查first和p,確保沒被更新修改才進行gc-unlink操做 if (first.prev == null && (p.next == null || p.item != null) && p.prev == first) { updateHead(); // Ensure o is not reachable from head updateTail(); // Ensure o is not reachable from tail // Finally, actually gc-unlink o.lazySetNext(o); o.lazySetPrev(prevTerminator()); } } return; } // p == p.next // p非活動結點同時p後繼已經指向本身則直接返回 else if (p == q) return; // p非活動結點,p還有後繼結點,從新賦值循環處理,注意這裏o才被賦值 else { o = p; p = q; } } }
private final void updateHead() { // Either head already points to an active node, or we keep // trying to cas it to the first node until it does. // head要麼指向一個活動結點要麼嘗試指向第一個結點直到成功 Node<E> h, p, q; restartFromHead: // head指向非活動結點同時head非第一個結點 while ((h = head).item == null && (p = h.prev) != null) { for (;;) { // head前驅的前驅爲空或head前驅的前驅的前驅爲空 // 即head前有1個或2個結點 if ((q = p.prev) == null || (q = (p = q).prev) == null) { // It is possible that p is PREV_TERMINATOR, // but if so, the CAS is guaranteed to fail. // 將head更新指向爲第一個結點 if (casHead(h, p)) return; else // 未成功更新說明已經被其餘線程更新了,從新循環判斷 continue restartFromHead; } // h前有超過2個的結點,代表當前h指向的結點已經與第一個結點距離超過2,同時h已經不指向head了,從新循環 else if (h != head) continue restartFromHead; // h前有超過2個的結點,同時h還指向head,則更新p爲q再次判斷,至關於p向前跳了1或2個結點位置 else p = q; } } }
這裏有個java語法須要注意:continue lable和break lable的做用,可下列參考代碼理解:
System.out.println("continue lable start "); aaa: for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { System.out.println(j); if(j == 1){ continue aaa; } } } System.out.println("continue lable end "); System.out.println("break lable start "); bbb: for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { System.out.println(j); if(j == 1){ break bbb; } } } System.out.println("break lable end ");
private void skipDeletedPredecessors(Node<E> x) { whileActive: do { Node<E> prev = x.prev; // assert prev != null; // assert x != NEXT_TERMINATOR; // assert x != PREV_TERMINATOR; Node<E> p = prev; findActive: for (;;) { // p的item非空,說明p爲活動結點,退出循環進行關聯更新操做 if (p.item != null) break findActive; // p的item爲空,再繼續向前查找其前驅 Node<E> q = p.prev; // p的前驅結點爲空 // 若p結點處於gc-unlinking狀態,即經過p已經沒法到達其餘活動結點,則需重頭開始繼續循環判斷 // 上面條件不知足,則表示p結點處於unlinking狀態,還能夠到達其餘活動結點,能夠繼續被使用 // 代表找到了有效結點,退出循環 if (q == null) { if (p.next == p) continue whileActive; break findActive; } // p的前驅結點非空,p.prev == p // 相等則代表p已經此刻的p結點處於gc-unlinking狀態,即經過p已經沒法到達其餘有效結點 // 沒法再向前遍歷,只能重頭開始循環判斷 else if (p == q) continue whileActive; // 到此表示p的item爲空,p的前驅非空且不處於gc-unlinking狀態 // 循環向前繼續判斷前驅結點 else p = q; } // found active CAS target // 找到活動或有效的前驅節點,前驅CAS更新成功返回不然繼續循環判斷更新 if (prev == p || x.casPrev(prev, p)) return; } while (x.item != null || x.next == null); }
/** * Returns the successor of p, or the first node if p.next has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ final Node<E> succ(Node<E> p) { // TODO: should we skip deleted nodes here? Node<E> q = p.next; return (p == q) ? first() : q; } /** * Returns the predecessor of p, or the last node if p.prev has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ final Node<E> pred(Node<E> p) { Node<E> q = p.prev; return (p == q) ? last() : q; }
/** * Returns the first node, the unique node p for which: * p.prev == null && p.next != p * The returned node may or may not be logically deleted. * Guarantees that head is set to the returned node. */ Node<E> first() { restartFromHead: for (;;) for (Node<E> h = head, p = h, q;;) { // p的前驅和前驅的前驅都非空 // 表示p結點以前有2個以上的活動結點 if ((q = p.prev) != null && (q = (p = q).prev) != null) // Check for head updates every other hop. // If p == q, we are sure to follow head instead. // 可能head已經被更新了則判斷下更新h同時更新p // 或者head還未更新則直接將p指向q p = (h != (h = head)) ? h : q; // p的前驅爲空或者前驅的前驅爲空 // p == h 代表p的前驅爲空(第一個條件裏判斷),p就是第一個結點 // p == h 不知足則p的前驅非空,前驅的前驅爲空,則p的前驅爲第一個結點,此時嘗試更新head並返回第一個結點 else if (p == h // It is possible that p is PREV_TERMINATOR, // but if so, the CAS is guaranteed to fail. || casHead(h, p)) return p; // 第二個條件中嘗試更新head失敗,則說明其餘線程更新了head,從新開始循環處理 else continue restartFromHead; } }
public E peekFirst() { for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null) return item; } return null; } public E peekLast() { for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null) return item; } return null; }
public E pollFirst() { for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; } public E pollLast() { for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
public Iterator<E> iterator() { return new Itr(); } public Iterator<E> descendingIterator() { return new DescendingItr(); }
private class Itr extends AbstractItr { Node<E> startNode() { return first(); } Node<E> nextNode(Node<E> p) { return succ(p); } } /** Descending iterator */ private class DescendingItr extends AbstractItr { Node<E> startNode() { return last(); } Node<E> nextNode(Node<E> p) { return pred(p); } }
private abstract class AbstractItr implements Iterator<E> { /** * Next node to return item for. */ private Node<E> nextNode; /** * nextItem holds on to item fields because once we claim * that an element exists in hasNext(), we must return it in * the following next() call even if it was in the process of * being removed when hasNext() was called. */ private E nextItem; /** * Node returned by most recent call to next. Needed by remove. * Reset to null if this element is deleted by a call to remove. */ private Node<E> lastRet; abstract Node<E> startNode(); abstract Node<E> nextNode(Node<E> p); AbstractItr() { advance(); } /** * Sets nextNode and nextItem to next valid node, or to null * if no such. */ private void advance() { lastRet = nextNode; Node<E> p = (nextNode == null) ? startNode() : nextNode(nextNode); for (;; p = nextNode(p)) { if (p == null) { // p might be active end or TERMINATOR node; both are OK nextNode = null; nextItem = null; break; } E item = p.item; if (item != null) { nextNode = p; nextItem = item; break; } } } public boolean hasNext() { return nextItem != null; } public E next() { E item = nextItem; if (item == null) throw new NoSuchElementException(); advance(); return item; } public void remove() { Node<E> l = lastRet; if (l == null) throw new IllegalStateException(); l.item = null; unlink(l); lastRet = null; } }