JDK源碼那些事兒之ConcurrentLinkedDeque

非阻塞隊列ConcurrentLinkedQueue咱們已經瞭解過了,既然是Queue,那麼是否有其雙端隊列實現呢?答案是確定的,今天就繼續說一說非阻塞雙端隊列實現ConcurrentLinkedDequehtml

前言

JDK版本號:1.8.0_171

ConcurrentLinkedDeque是一個基於鏈表實現的無界的線程安全的同時支持FIFO、LIFO非阻塞雙端隊列。操做上可類比ConcurrentLinkedQueue,利用CAS進行無鎖操做,同時經過鬆弛度閾值設置來減小CAS操做,在理解這個類前可先去參考理解我以前對ConcurrentLinkedQueue的源碼分析java

爲了說明的方便,咱們區分下,活動結點是item非null的結點,有效結點是保持着先後關係的結點,做者在註釋中解釋了將一個結點刪除分爲3個步驟:node

  • logical deletion:邏輯刪除,item置爲null,先後結點關聯關係依舊保持,此時不爲活動結點,爲有效結點
  • unlinking:有效結點能夠經過前驅和後繼指針到達其餘活動節點,可是活動節點不可到達有效結點,保證迭代器的正常,並最終可被GC回收
  • gc-unlinking:有效結點徹底孤立,爲無效結點,不可到達其餘任何活動結點,其餘結點也不可到達此結點

區分定義是便於源碼分析時的說明,參考下圖所示理解:安全

結點說明

類定義

public class ConcurrentLinkedDeque<E>
    extends AbstractCollection<E>
    implements Deque<E>, java.io.Serializable

關係圖

實現流程

爲了方便理解,這裏將ConcurrentLinkedDeque操做過程進行圖示,讓各位先有個瞭解,便於後面源碼的分析性能優化

1.new實例化操做源碼分析

new操做

2.offer("1")性能

offer1

3.offer("2")優化

offer2

4.offerFirst("3")this

offerFirst3

5.offerFirst("4")spa

offerFirst4

6.pollLast

pollLast

7.poll

poll

8.poll

poll

9.pollLast

pollLast

常量/變量

head結點(p.prev == null && p.next != p):

  • first結點能夠從head結點經過prev獲取獲得,時間複雜度O(1)
  • 全部活動結點(item != null)均可以從first結點經過succ獲取獲得
  • head結點非null
  • head結點的next不會指向本身
  • head結點不可能處於gc-unlinking階段,可是有可能處於unlinking階段
  • head結點的item可能爲空也可能不爲空
  • head結點可能不能從first或者last或者tail結點獲取獲得

tail結點(p.next == null && p.prev != p):

  • last結點能夠從tail結點經過next獲取獲得,時間複雜度O(1)
  • 全部活動結點(item != null)均可以從last結點經過pred獲取獲得
  • tail結點非null
  • tail結點不可能處於gc-unlinking階段,可是有可能處於unlinking階段
  • tail結點的item可能爲空也可能不爲空
  • tail結點可能不能從first或者last或者head結點獲取獲得

因爲head結點和tail結點不是實時更新(同ConcurrentLinkedQueue),達到鬆弛度閾值才進行更新,減小CAS操做,有可能致使head結點在tail結點以後的現象

/**
     * A node from which the first node on list (that is, the unique node p
     * with p.prev == null && p.next != p) can be reached in O(1) time.
     * Invariants:
     * - the first node is always O(1) reachable from head via prev links
     * - all live nodes are reachable from the first node via succ()
     * - head != null
     * - (tmp = head).next != tmp || tmp != head
     * - head is never gc-unlinked (but may be unlinked)
     * Non-invariants:
     * - head.item may or may not be null
     * - head may not be reachable from the first or last node, or from tail
     */
    private transient volatile Node<E> head;

    /**
     * A node from which the last node on list (that is, the unique node p
     * with p.next == null && p.prev != p) can be reached in O(1) time.
     * Invariants:
     * - the last node is always O(1) reachable from tail via next links
     * - all live nodes are reachable from the last node via pred()
     * - tail != null
     * - tail is never gc-unlinked (but may be unlinked)
     * Non-invariants:
     * - tail.item may or may not be null
     * - tail may not be reachable from the first or last node, or from head
     */
    private transient volatile Node<E> tail;

    // 終止結點,在gc-unlinking階段將無用結點連接到這兩個結點上,自行處理減小內內存滯留風險
    private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;
    
    // 刪除結點執行unlinking/gc-unlinking的閾值,當邏輯刪除結點達到閾值才觸發,算是性能優化
    private static final int HOPS = 2;
    
    // CAS
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
    static {
        PREV_TERMINATOR = new Node<Object>();
        PREV_TERMINATOR.next = PREV_TERMINATOR;
        NEXT_TERMINATOR = new Node<Object>();
        NEXT_TERMINATOR.prev = NEXT_TERMINATOR;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedDeque.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

內部類

Node實現與ConcurrentLinkedQueue不一樣之處也就在於多了變量prev指向結點的前驅

static final class Node<E> {
        volatile Node<E> prev;
        volatile E item;
        volatile Node<E> next;

        Node() {  // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR
        }

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext or casPrev.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        void lazySetPrev(Node<E> val) {
            UNSAFE.putOrderedObject(this, prevOffset, val);
        }

        boolean casPrev(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long prevOffset;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                prevOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("prev"));
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

構造方法

無參構造方法建立了空結點同時頭尾結點指向這個空結點,集合參數構造時先將全部集合結點構成鏈表,最後經過initHeadTail更新鏈表head,tail便可

public ConcurrentLinkedDeque() {
        head = tail = new Node<E>(null);
    }

    public ConcurrentLinkedDeque(Collection<? extends E> c) {
        // Copy c into a private chain of Nodes
        Node<E> h = null, t = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                h = t = newNode;
            else {
                t.lazySetNext(newNode);
                newNode.lazySetPrev(t);
                t = newNode;
            }
        }
        initHeadTail(h, t);
    }
    /**
     * Initializes head and tail, ensuring invariants hold.
     */
    private void initHeadTail(Node<E> h, Node<E> t) {
        if (h == t) {
            if (h == null)
                h = t = new Node<E>(null);
            else {
                // Avoid edge case of a single Node with non-null item.
                Node<E> newNode = new Node<E>(null);
                t.lazySetNext(newNode);
                newNode.lazySetPrev(t);
                t = newNode;
            }
        }
        head = h;
        tail = t;
    }

重要方法

linkFirst/linkLast

被addFirst和offerFirst所使用,將元素e添加到隊列頭部,即從頭部入隊操做。linkLast被addLast和offerLast所使用,offer,add一樣最終調用此方法完成操做,將元素e添加到隊列尾部,即從尾部入隊操做,沒什麼好說的,類比linkFirst源碼理解

private void linkFirst(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        restartFromHead:
        for (;;)
            for (Node<E> h = head, p = h, q;;) {
                // 前驅節點不爲null,前驅的前驅節點不爲null
                if ((q = p.prev) != null &&
                    (q = (p = q).prev) != null)
                    // head應該被更新了(已經超過了鬆弛度閾值)判斷是否已經更新了則p更新爲head
                    // 未更新則直接更新爲前驅的前驅結點
                    p = (h != (h = head)) ? h : q;
                // p已經出隊,沒辦法從p再繼續判斷了,沒法到達其餘結點,須要從新開始循環
                else if (p.next == p) // PREV_TERMINATOR
                    continue restartFromHead;
                else {
                    // p爲第一個結點,更新新結點next指向p
                    newNode.lazySetNext(p); // CAS piggyback
                    // 嘗試更新p的前驅指向新結點,更新失敗則從新循環更新
                    if (p.casPrev(null, newNode)) {
                        // Successful CAS is the linearization point
                        // for e to become an element of this deque,
                        // and for newNode to become "live".
                        // 新結點入隊成功,頭結點已經更新了(此時的新結點距離h已經 >= 2個結點距離),嘗試更新head
                        if (p != h) // hop two nodes at a time
                            casHead(h, newNode);  // Failure is OK.
                        return;
                    }
                    // Lost CAS race to another thread; re-read prev
                }
            }
    }

unlink

被pollFirst,pollLast,removeFirstOccurrence,removeLastOccurrence和迭代器的remove所使用,移除非空結點,出隊操做和刪除時使用,主要處理處於隊列中間的結點

void unlink(Node<E> x) {
        // assert x != null;
        // assert x.item == null;
        // assert x != PREV_TERMINATOR;
        // assert x != NEXT_TERMINATOR;

        final Node<E> prev = x.prev;
        final Node<E> next = x.next;
        // 前驅爲null表示x爲頭結點
        if (prev == null) {
            unlinkFirst(x, next);
        // 後繼爲null表示x爲尾結點    
        } else if (next == null) {
            unlinkLast(x, prev);
        // 非頭尾結點表示x處於中間位置須要特殊處理    
        } else {
            
            Node<E> activePred, activeSucc;
            boolean isFirst, isLast;
            // 記錄邏輯刪除結點數
            int hops = 1;

            // Find active predecessor
            // 找到有效的前驅結點
            for (Node<E> p = prev; ; ++hops) {
                // 有效前驅結點設置
                if (p.item != null) {
                    activePred = p;
                    isFirst = false;
                    break;
                }
                Node<E> q = p.prev;
                // p是第一個結點
                if (q == null) {
                    // p已經出隊
                    if (p.next == p)
                        return;
                    // p的item爲null,next還未更新,變量設置
                    activePred = p;
                    isFirst = true;
                    break;
                }
                // p == p.prev表示p已經出隊
                else if (p == q)
                    return;
                // 繼續循環向前查找
                else
                    p = q;
            }

            // Find active successor
            // 找到有效的後繼結點
            for (Node<E> p = next; ; ++hops) {
                // 有效後繼結點設置
                if (p.item != null) {
                    activeSucc = p;
                    isLast = false;
                    break;
                }
                Node<E> q = p.next;
                // p是最後一個結點
                if (q == null) {
                    // p已經出隊
                    if (p.prev == p)
                        return;
                    // p的item爲null,prev還未更新,變量設置
                    activeSucc = p;
                    isLast = true;
                    break;
                }
                // p == p.next表示p已經出隊
                else if (p == q)
                    return;
                // 繼續循環向後查找
                else
                    p = q;
            }

            // TODO: better HOP heuristics
            // 達到邏輯刪除結點閾值或者是內部刪除結點則須要進行額外處理unlink/gc-unlink
            if (hops < HOPS
                // always squeeze out interior deleted nodes
                && (isFirst | isLast))
                return;

            // Squeeze out deleted nodes between activePred and
            // activeSucc, including x.
            // 移除有效前驅和後繼結點之間的有效結點,包括x,使得前驅和後繼互連
            skipDeletedSuccessors(activePred);
            skipDeletedPredecessors(activeSucc);

            // Try to gc-unlink, if possible
            // 有效前驅和後繼是隊頭或隊尾,嘗試gc-unlink
            if ((isFirst | isLast) &&

                // Recheck expected state of predecessor and successor
                // 檢查前驅後繼狀態,確保未改變
                (activePred.next == activeSucc) &&
                (activeSucc.prev == activePred) &&
                (isFirst ? activePred.prev == null : activePred.item != null) &&
                (isLast  ? activeSucc.next == null : activeSucc.item != null)) {
                
                // 更新head和tail 確保x不可達
                updateHead(); // Ensure x is not reachable from head
                updateTail(); // Ensure x is not reachable from tail

                // Finally, actually gc-unlink
                // 最後更新x,使得從x到活動節點不可達
                x.lazySetPrev(isFirst ? prevTerminator() : x);
                x.lazySetNext(isLast  ? nextTerminator() : x);
            }
        }
    }

unlinkFirst/unlinkLast

unlink中調用,從隊列頭將第一個非空結點出隊。unlinkLast從隊列尾將第一個非空結點出隊,代碼實現與unlinkFirst相似,可參考理解

private void unlinkFirst(Node<E> first, Node<E> next) {
        // assert first != null;
        // assert next != null;
        // assert first.item == null;
        for (Node<E> o = null, p = next, q;;) {
            // p爲活動節點或p爲最後一個節點
            if (p.item != null || (q = p.next) == null) {
                // 若是第一次循環就執行到此則不須要進行操做直接返回,p原本就是first的後繼
                // p的前驅不能指向本身,first的後繼更新成p
                if (o != null && p.prev != p && first.casNext(next, p)) {
                    // unlink階段
                    skipDeletedPredecessors(p);
                    // 檢查first和p,確保沒被更新修改才進行gc-unlink操做
                    if (first.prev == null &&
                        (p.next == null || p.item != null) &&
                        p.prev == first) {

                        updateHead(); // Ensure o is not reachable from head
                        updateTail(); // Ensure o is not reachable from tail

                        // Finally, actually gc-unlink
                        o.lazySetNext(o);
                        o.lazySetPrev(prevTerminator());
                    }
                }
                return;
            }
            // p == p.next 
            // p非活動結點同時p後繼已經指向本身則直接返回
            else if (p == q)
                return;
            // p非活動結點,p還有後繼結點,從新賦值循環處理,注意這裏o才被賦值
            else {
                o = p;
                p = q;
            }
        }
    }

updateHead/updateTail

更新head結點,確保在調用此方法以前unlinked的任何結點在該方法返回以後都不能從head訪問,不保證消除鬆弛度,僅僅是head將指向處於活動狀態的結點。updateTail更新tail結點,同updateHead,基本操做一致,只是方向不一樣而已

private final void updateHead() {
        // Either head already points to an active node, or we keep
        // trying to cas it to the first node until it does.
        // head要麼指向一個活動結點要麼嘗試指向第一個結點直到成功
        Node<E> h, p, q;
        restartFromHead:
        // head指向非活動結點同時head非第一個結點
        while ((h = head).item == null && (p = h.prev) != null) {
            for (;;) {
                // head前驅的前驅爲空或head前驅的前驅的前驅爲空
                // 即head前有1個或2個結點
                if ((q = p.prev) == null ||
                    (q = (p = q).prev) == null) {
                    // It is possible that p is PREV_TERMINATOR,
                    // but if so, the CAS is guaranteed to fail.
                    // 將head更新指向爲第一個結點
                    if (casHead(h, p))
                        return;
                    else
                        // 未成功更新說明已經被其餘線程更新了,從新循環判斷
                        continue restartFromHead;
                }
                // h前有超過2個的結點,代表當前h指向的結點已經與第一個結點距離超過2,同時h已經不指向head了,從新循環
                else if (h != head)
                    continue restartFromHead;
                // h前有超過2個的結點,同時h還指向head,則更新p爲q再次判斷,至關於p向前跳了1或2個結點位置
                else
                    p = q;
            }
        }
    }

skipDeletedPredecessors/skipDeletedSuccessors

這裏有個java語法須要注意:continue lable和break lable的做用,可下列參考代碼理解:

System.out.println("continue lable start ");
        aaa:
        for (int i = 0; i < 3; i++) {
            for (int j = 0; j < 3; j++) {
                System.out.println(j);
                if(j == 1){
                    continue aaa;
                }
            }
        }
        System.out.println("continue lable end ");

        System.out.println("break lable start ");
        bbb:
        for (int i = 0; i < 3; i++) {
            for (int j = 0; j < 3; j++) {
                System.out.println(j);
                if(j == 1){
                    break bbb;
                }
            }
        }
        System.out.println("break lable end ");

skipDeletedPredecessors實現將剛剛找到的後繼結點的前驅指向結點p,即完成它們的互聯,這一步就是所謂的unlinking,使隊列的活動結點沒法訪問被刪除的結點。skipDeletedSuccessors代碼邏輯同skipDeletedPredecessors,可參考理解

private void skipDeletedPredecessors(Node<E> x) {
        whileActive:
        do {
            Node<E> prev = x.prev;
            // assert prev != null;
            // assert x != NEXT_TERMINATOR;
            // assert x != PREV_TERMINATOR;
            Node<E> p = prev;
            findActive:
            for (;;) {
                // p的item非空,說明p爲活動結點,退出循環進行關聯更新操做
                if (p.item != null)
                    break findActive;
                // p的item爲空,再繼續向前查找其前驅
                Node<E> q = p.prev;
                // p的前驅結點爲空
                // 若p結點處於gc-unlinking狀態,即經過p已經沒法到達其餘活動結點,則需重頭開始繼續循環判斷
                // 上面條件不知足,則表示p結點處於unlinking狀態,還能夠到達其餘活動結點,能夠繼續被使用
                // 代表找到了有效結點,退出循環
                if (q == null) {
                    if (p.next == p)
                        continue whileActive;
                    break findActive;
                }
                // p的前驅結點非空,p.prev == p
                // 相等則代表p已經此刻的p結點處於gc-unlinking狀態,即經過p已經沒法到達其餘有效結點
                // 沒法再向前遍歷,只能重頭開始循環判斷
                else if (p == q)
                    continue whileActive;
                // 到此表示p的item爲空,p的前驅非空且不處於gc-unlinking狀態
                // 循環向前繼續判斷前驅結點
                else
                    p = q;
            }

            // found active CAS target
            // 找到活動或有效的前驅節點,前驅CAS更新成功返回不然繼續循環判斷更新
            if (prev == p || x.casPrev(prev, p))
                return;

        } while (x.item != null || x.next == null);
    }

succ/pred

找到結點的前驅或者後繼,假如當前結點已經無效結點時,則返回第一個結點或最後一個結點

/**
     * Returns the successor of p, or the first node if p.next has been
     * linked to self, which will only be true if traversing with a
     * stale pointer that is now off the list.
     */
    final Node<E> succ(Node<E> p) {
        // TODO: should we skip deleted nodes here?
        Node<E> q = p.next;
        return (p == q) ? first() : q;
    }

    /**
     * Returns the predecessor of p, or the last node if p.prev has been
     * linked to self, which will only be true if traversing with a
     * stale pointer that is now off the list.
     */
    final Node<E> pred(Node<E> p) {
        Node<E> q = p.prev;
        return (p == q) ? last() : q;
    }

first/last

返回第一個結點,有多是邏輯刪除結點,last操做相似

/**
     * Returns the first node, the unique node p for which:
     *     p.prev == null && p.next != p
     * The returned node may or may not be logically deleted.
     * Guarantees that head is set to the returned node.
     */
    Node<E> first() {
        restartFromHead:
        for (;;)
            for (Node<E> h = head, p = h, q;;) {
                // p的前驅和前驅的前驅都非空
                // 表示p結點以前有2個以上的活動結點
                if ((q = p.prev) != null &&
                    (q = (p = q).prev) != null)
                    // Check for head updates every other hop.
                    // If p == q, we are sure to follow head instead.
                    // 可能head已經被更新了則判斷下更新h同時更新p
                    // 或者head還未更新則直接將p指向q
                    p = (h != (h = head)) ? h : q;
                // p的前驅爲空或者前驅的前驅爲空
                // p == h 代表p的前驅爲空(第一個條件裏判斷),p就是第一個結點
                // p == h 不知足則p的前驅非空,前驅的前驅爲空,則p的前驅爲第一個結點,此時嘗試更新head並返回第一個結點
                else if (p == h
                         // It is possible that p is PREV_TERMINATOR,
                         // but if so, the CAS is guaranteed to fail.
                         || casHead(h, p))
                    return p;
                // 第二個條件中嘗試更新head失敗,則說明其餘線程更新了head,從新開始循環處理
                else
                    continue restartFromHead;
            }
    }

peekFirst/peekLast

經過first()/last()方法返回第一個或最後一個結點的值

public E peekFirst() {
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null)
                return item;
        }
        return null;
    }

    public E peekLast() {
        for (Node<E> p = last(); p != null; p = pred(p)) {
            E item = p.item;
            if (item != null)
                return item;
        }
        return null;
    }

pollFirst/pollLast

removeFirst/removeLast同pollFirst/pollLast操做,最終調用unlink,可參考上面源碼分析

public E pollFirst() {
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null && p.casItem(item, null)) {
                unlink(p);
                return item;
            }
        }
        return null;
    }

    public E pollLast() {
        for (Node<E> p = last(); p != null; p = pred(p)) {
            E item = p.item;
            if (item != null && p.casItem(item, null)) {
                unlink(p);
                return item;
            }
        }
        return null;
    }

其餘操做都是基於上面的方法進行實現的,就再也不一一列舉了,可自行參考源碼理解

迭代器

迭代器和以前隊列講解的迭代器ConcurrentLinkedQueue相似,不過因爲其雙向鏈表的實現,迭代器可分爲升序迭代器(Itr)和倒序迭代器(DescendingItr),經過AbstractItr封裝公共操做方法,Itr和DescendingItr分別實現對應不一樣的方法,一個從頭節點開始向後進行遍歷,一個從尾節點向後進行遍歷,這部分和以前講解過的LinkedBlockingDeque是相似的

public Iterator<E> iterator() {
        return new Itr();
    }
    public Iterator<E> descendingIterator() {
        return new DescendingItr();
    }

主要區別方法在於兩個,經過這兩個方法來完成不一樣方向的遍歷

private class Itr extends AbstractItr {
        Node<E> startNode() { return first(); }
        Node<E> nextNode(Node<E> p) { return succ(p); }
    }

    /** Descending iterator */
    private class DescendingItr extends AbstractItr {
        Node<E> startNode() { return last(); }
        Node<E> nextNode(Node<E> p) { return pred(p); }
    }

抽象類AbstractItr涉及到的方法比較簡單,用到了前面所講解過的方法,可參考以前的分析

private abstract class AbstractItr implements Iterator<E> {
        /**
         * Next node to return item for.
         */
        private Node<E> nextNode;

        /**
         * nextItem holds on to item fields because once we claim
         * that an element exists in hasNext(), we must return it in
         * the following next() call even if it was in the process of
         * being removed when hasNext() was called.
         */
        private E nextItem;

        /**
         * Node returned by most recent call to next. Needed by remove.
         * Reset to null if this element is deleted by a call to remove.
         */
        private Node<E> lastRet;

        abstract Node<E> startNode();
        abstract Node<E> nextNode(Node<E> p);

        AbstractItr() {
            advance();
        }

        /**
         * Sets nextNode and nextItem to next valid node, or to null
         * if no such.
         */
        private void advance() {
            lastRet = nextNode;

            Node<E> p = (nextNode == null) ? startNode() : nextNode(nextNode);
            for (;; p = nextNode(p)) {
                if (p == null) {
                    // p might be active end or TERMINATOR node; both are OK
                    nextNode = null;
                    nextItem = null;
                    break;
                }
                E item = p.item;
                if (item != null) {
                    nextNode = p;
                    nextItem = item;
                    break;
                }
            }
        }

        public boolean hasNext() {
            return nextItem != null;
        }

        public E next() {
            E item = nextItem;
            if (item == null) throw new NoSuchElementException();
            advance();
            return item;
        }

        public void remove() {
            Node<E> l = lastRet;
            if (l == null) throw new IllegalStateException();
            l.item = null;
            unlink(l);
            lastRet = null;
        }
    }

高性能隊列-Disruptor

至此,隊列部分已基本分析完畢,除了jdk自己的隊列,還有一些比較有名的隊列實現,好比Disruptor,能夠參考美團的這篇文章進行一些深刻了解,對於隊列進行了一些底層的分析總結,比較有幫助

https://tech.meituan.com/2016...

總結

源碼已經分析完畢,咱們以pollFirst出隊操做爲例進行一個總結說明:

  • 經過first()獲取到隊列頭部的第一個結點
  • 若是爲活動結點(item非空),則將活動結點item置空,即執行logical deletion(邏輯刪除)操做
  • 繼續執行unlinking階段
  • 繼續執行gc-unlinking階段

在unlinking階段根據結點位置進行不一樣狀況的處理:

1.若是出隊的結點是隊列的第一個結點p,則執行unlinkFirst,其過程以下:

  • 找到p以後的第一個有效結點,直到最後一個結點爲止,p的後繼結點指向這個找到的結點
  • skipDeletedPredecessors完成unlinking階段,使隊列的活動結點沒法訪問被刪除的結點
  • 進行gc-unlinking階段,經過updateHead、updateTail使被刪除的結點沒法從head/tail可達,最後讓被刪除結點後繼指向本身,前驅指向終結結點

2.若是出隊的結點是隊列的最後一個結點p,則執行unlinkLast,其過程與第1種狀況相似,只是方向不一樣

3.若是出隊的結點時隊列的中間位置,則執行unlink中的一個分支代碼:

  • 先找到刪除結點x的有效前驅和有效後繼,統計中間已經處於邏輯刪除的結點個數
  • 若是統計個數已經超過閾值個數或者是內部結點刪除,有效前驅和後繼互連,即活動結點不能訪問邏輯刪除結點了(unlinking階段)
  • 有效前驅和後繼是隊頭或隊尾,嘗試進行gc-unlink,經過updateHead、updateTail使被刪除的結點沒法從head/tail可達,最後讓被刪除結點指向本身或者執行終結結點

總體處理流程已經分析完畢,其餘操做相對來講比較簡單了,須要多理解

ConcurrentLinkedDeque是ConcurrentLinkedQueue的雙端隊列實現,在刪除中涉及到了3個階段,而且因爲其無鎖CAS操做和減小CAS次數的操做,致使其實現的複雜性,須要多寫些例子理解下

以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝

相關文章
相關標籤/搜索