JDK源碼那些事兒之ConcurrentLinkedQueue

阻塞隊列的實現前面已經講解完畢,今天咱們繼續瞭解源碼中非阻塞隊列的實現,接下來就看一看ConcurrentLinkedQueue非阻塞隊列是怎麼完成操做的java

前言

JDK版本號:1.8.0_171

ConcurrentLinkedQueue是一個基於鏈表實現的無界的線程安全的FIFO非阻塞隊列。最大的不一樣之處在於非阻塞特性,以前講解的阻塞隊列都會經過各類方式進行阻塞操做,在ConcurrentLinkedQueue中經過CAS操做來完成非阻塞操做。其中head和tail的更新相似以前在LinkedTransferQueue中講解的slack(鬆弛度)機制,只有在slack閾值大於等於2時纔會進行更新,儘可能減小CAS的操做次數,固然,這樣的操做也提升了代碼實現的複雜度node

類定義

從關係圖上咱們也能夠看到ConcurrentLinkedQueue沒有去實現BlockingQueue接口安全

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable

ConcurrentLinkedQueue關係圖

實現流程

爲了瞭解其內部實現的操做,能夠看下面的過程圖理解其內部結點入隊出隊的轉換過程多線程

ConcurrentLinkedQueue構造

ConcurrentLinkedQueue入隊1

ConcurrentLinkedQueue入隊2

ConcurrentLinkedQueue入隊3

ConcurrentLinkedQueue出隊1

ConcurrentLinkedQueue出隊2

ConcurrentLinkedQueue構造

ConcurrentLinkedQueue入隊4

ConcurrentLinkedQueue入隊5

常量/變量

除了CAS須要使用的常量,就只剩下head和tail兩個引用結點,在其註釋部分能夠看到做者的說明,這裏解釋下:函數

head結點:this

  • 全部未刪除結點均可以head結點經過執行succ()方法訪問
  • head結點非null
  • head結點next不能指向本身
  • head結點的item可能爲null,也可能不爲null
  • tail結點能夠落後於head結點,此時,從head結點不能訪問到tail結點

tail結點(tail的next爲null):spa

  • 隊列最後一個結點能夠經過tail結點執行succ()方法得訪問
  • tail結點非null
  • tail結點的item可能爲null,也可能不爲null
  • tail結點能夠落後於head結點,此時,從head結點不能訪問到tail結點
  • tail結點next可能指向本身,也可能不指向本身

因爲head結點和tail結點不是實時更新,達到鬆弛度閾值才進行更新,有可能致使head結點在tail結點以後的現象線程

/**
     * A node from which the first live (non-deleted) node (if any)
     * can be reached in O(1) time.
     * Invariants:
     * - all live nodes are reachable from head via succ()
     * - head != null
     * - (tmp = head).next != tmp || tmp != head
     * Non-invariants:
     * - head.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     */
    private transient volatile Node<E> head;

    /**
     * A node from which the last node on list (that is, the unique
     * node with node.next == null) can be reached in O(1) time.
     * Invariants:
     * - the last node is always reachable from tail via succ()
     * - tail != null
     * Non-invariants:
     * - tail.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     * - tail.next may or may not be self-pointing to tail.
     */
    private transient volatile Node<E> tail;
    
    // CAS操做
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

內部類

Node實現比較簡單,沒複雜的部分,主要是經過CAS操做進行更新變量指針

private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

構造方法

無參構造方法建立了空結點同時頭尾結點指向這個空結點,集合參數構造時循環添加結點,比較簡單,主要須要理解默認無參構造函數建立時發生的變化rest

public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

    public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                h = t = newNode;
            else {
                t.lazySetNext(newNode);
                t = newNode;
            }
        }
        if (h == null)
            h = t = new Node<E>(null);
        head = h;
        tail = t;
    }

重要方法

updateHead

h != p的前提條件下嘗試更新head指向到p,成功則嘗試更新原head結點指向到本身,表示結點離隊

/**
     * Tries to CAS head to p. If successful, repoint old head to itself
     * as sentinel for succ(), below.
     */
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

succ

獲取p結點的後繼結點,當next指向本身時,結點自己可能已經處於離隊狀態,此時返回head結點

/**
     * Returns the successor of p, or the head node if p.next has been
     * linked to self, which will only be true if traversing with a
     * stale pointer that is now off the list.
     */
    final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }

offer

入隊操做核心方法,入隊必成功,返回爲true,表示入隊會一直嘗試操做直到成功,循環嘗試中主要分爲3種狀況:

  • 找到最後一個結點,嘗試更新next指向新結點,失敗則表示next被其餘線程更新,此時從新循環判斷,成功則判斷tail結點指向是否已經滯後一個結點以上,若是是則嘗試更新tail
  • 以前找到的最後一個結點已經離隊(p = p.next),若是tail已經被其餘線程更新則更新到tail,不然從head結點開始找到最後一個結點(由於tail能夠落後於head)
  • 非最後一個結點同時這個結點也未離隊,若是tail已經被其餘線程更新則更新到tail,不然從當前結點的next開始繼續循環
public boolean offer(E e) {
        // 判空
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        // 循環直到成功
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // p此時是最後一個結點
            if (q == null) {
                // 則開始嘗試更新p的next指向新插入的結點
                // p的next未更新成功說明next被其餘結點搶先更新了,從新循環判斷嘗試
                if (p.casNext(null, newNode)) {
                    // tail指向結點後已經添加了1個結點以上時才更新tail結點指向
                    // 即slack >= 2時才嘗試更新
                    if (p != t) // hop two nodes at a time
                        // 失敗可能被其餘線程更新了
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            // p非最後一個結點,同時p = p.next則表示p自己已經離隊,須要更新p
            else if (p == q)
                // tail結點已經被更新則更新tail不然從head結點開始尋找最後一個結點
                p = (t != (t = tail)) ? t : head;
            else
                // p非最後一個結點,同時p未離隊刪除,若是tail被其餘線程更新了則p更新成新的tail,不然p更新成p.next繼續循環
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

poll

出隊操做核心方法,一直嘗試直到成功,循環嘗試中主要分爲4種狀況:

  • 找到頭結點,item非null則嘗試更新成null以表示結點已出隊,成功則判斷是否須要更新head結點返回item
  • item爲null或item已經被其餘線程獲取,同時當前結點已經爲最後一個結點,則嘗試更新頭head指向當前結點,返回null
  • 當前結點非最後一個結點,若是已經離隊則從head從新進行循環
  • 當前結點未離隊則更新到下一個結點進行循環判斷
public E poll() {
        restartFromHead:
        // 循環嘗試直到成功
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                
                // item非null則嘗試更新成null(表示結點已出隊)
                if (item != null && p.casItem(item, null)) {
                    // 出隊結點非以前的頭結點,舊頭結點h距離實際的head結點已經大於1則更新head
                    if (p != h) // hop two nodes at a time
                        // 出隊結點後無結點則嘗試更新head結點爲出隊結點自己(item = null),有結點則更新到出隊結點後的那個結點
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // item爲空或item已被其餘線程獲取
                // p結點自己爲最後一個結點,則嘗試更新head並更新原h結點指向本身,返回null
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                // p非最後一個結點,p == p.next 則表示p結點已經離隊,則跳轉restartFromHead從頭從新循環判斷
                else if (p == q)
                    continue restartFromHead;
                // p非最後一個結點,p也未離隊,則更新p指向p的下一個結點,循環判斷
                else
                    p = q;
            }
        }
    }

peek

和poll方法相似,主要在於不會對結點進行出隊操做,僅僅是獲取頭結點的item值,固然中間也可能幫助更新下head指向

public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

first

和poll方法相似,poll返回的是item這裏返回的是結點,若是是null結點(item == null),這裏判斷下直接返回null

Node<E> first() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

remove

從隊列中刪除元素,經過item是否爲null判斷結點是否已經離隊,是則繼續後繼判斷,casItem(item, null)成功則表示移除結點成功,失敗則表示被其餘線程出隊操做了,則繼續後繼判斷

public boolean remove(Object o) {
        if (o != null) {
            Node<E> next, pred = null;
            for (Node<E> p = first(); p != null; pred = p, p = next) {
                boolean removed = false;
                E item = p.item;
                // item判斷(非離隊結點),不知足則繼續判斷後繼結點
                if (item != null) {
                    if (!o.equals(item)) {
                        next = succ(p);
                        continue;
                    }
                    // 找到匹配結點則嘗試更新item爲null,表示當前結點已經離隊
                    removed = p.casItem(item, null);
                }
                
                // 後繼結點,到這說明匹配的結點已經刪除了(別的線程刪除或者當前線程刪除)
                next = succ(p);
                // 取消匹配結點的關聯
                if (pred != null && next != null) // unlink
                    pred.casNext(p, next);
                // 假如是當前線程刪除的結點則返回,不然繼續判斷後繼直到匹配或沒有匹配結點才返回
                if (removed)
                    return true;
            }
        }
        return false;
    }

addAll

將集合c中的元素添加到隊列中,添加到原隊列尾部相似於上面的offer方法

public boolean addAll(Collection<? extends E> c) {
        if (c == this)
            // As historically specified in AbstractQueue#addAll
            throw new IllegalArgumentException();

        // 定義兩個指針結點指向集合c的頭尾
        // 先將c改形成Node鏈表
        Node<E> beginningOfTheEnd = null, last = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (beginningOfTheEnd == null)
                beginningOfTheEnd = last = newNode;
            else {
                last.lazySetNext(newNode);
                last = newNode;
            }
        }
        if (beginningOfTheEnd == null)
            return false;

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // p爲隊列最後一個結點
            if (q == null) {
                // 將隊列與上面新建立的鏈表鏈接起來,更新失敗再循環繼續
                if (p.casNext(null, beginningOfTheEnd)) {
                    // tail更新失敗從新嘗試
                    if (!casTail(t, last)) {
                        t = tail;
                        if (last.next == null)
                            casTail(t, last);
                    }
                    return true;
                }
            }
            // p非最後一個結點且已經離隊
            else if (p == q)
                // tail結點已經被更新則更新爲tail不然從head結點開始尋找最後一個結點
                p = (t != (t = tail)) ? t : head;
            else
                // p非最後一個結點,同時p未離隊刪除,若是tail被其餘線程更新了則p更新成新的tail,不然p更新成p.next繼續循環
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

迭代器

迭代器和以前隊列講解的迭代器類似,源碼不是很複雜,同時remove方法這裏是將item置爲null,先後結點關聯關係並不會操做,防止多線程遍歷出現問題

構造方法中執行了advance()方法,提早設置好下次next執行時的結點nextNode,以及其item引用,hasNext判斷nextNode便可,保證了迭代器的弱一致性,一旦hasNext返回true,那麼調用next必定會獲得相對應的item,即便該結點item已經被置爲null

public Iterator<E> iterator() {
        return new Itr();
    }

    private class Itr implements Iterator<E> {
        /**
         * next返回的Node
         */
        private Node<E> nextNode;

        /**
         * 保存next的item,防止hasNext爲true後結點被刪除再調用next獲取不到值的狀況
         */
        private E nextItem;

        /**
         * 最近一次調用next返回的結點,若是經過調用remove刪除了此元素,則重置爲null,避免刪除了不應刪除的元素
         */
        private Node<E> lastRet;

        /**
         * 構造的時候就先保存了第一次調用next返回的Node
         */
        Itr() {
            advance();
        }

        /**
         * Moves to next valid node and returns item to return for
         * next(), or null if no such.
         */
        private E advance() {
            lastRet = nextNode;
            E x = nextItem;

            Node<E> pred, p;
            if (nextNode == null) {
                p = first();
                pred = null;
            } else {
                pred = nextNode;
                p = succ(nextNode);
            }

            for (;;) {
                if (p == null) {
                    nextNode = null;
                    nextItem = null;
                    return x;
                }
                E item = p.item;
                if (item != null) {
                    nextNode = p;
                    nextItem = item;
                    return x;
                } else {
                    // 跳過null結點
                    Node<E> next = succ(p);
                    if (pred != null && next != null)
                        pred.casNext(p, next);
                    p = next;
                }
            }
        }

        public boolean hasNext() {
            return nextNode != null;
        }

        public E next() {
            if (nextNode == null) throw new NoSuchElementException();
            return advance();
        }

        public void remove() {
            Node<E> l = lastRet;
            if (l == null) throw new IllegalStateException();
            // rely on a future traversal to relink.
            l.item = null;
            lastRet = null;
        }
    }

總結

ConcurrentLinkedQueue是一個基於鏈表實現的無界的線程安全的FIFO非阻塞隊列,總體源碼上最主要的部分在於兩點:

  • 全程無鎖操做,無阻塞操做,使用CAS更新變量
  • head,tail結點非實時更新,在slack >= 2時才進行更新操做

結合圖解很容易理清其實現以及操做流程,相比較於以前的LinkedTransferQueue源碼算是簡單了不少

以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝

相關文章
相關標籤/搜索