java併發:非阻塞隊列之ConcurrentLinkedQueue

初始ConcurrentLinkedQueue

ConcurrentLinkedQueue 是線程安全的無界非阻塞隊列,其底層使用單向鏈表實現,對於入隊和出隊操做使用 CAS 來實現線程安全。node

其類圖以下:算法

從類圖能夠發現其與其它阻塞隊列的一個明顯區別是,ConcurrentLinkedQueue沒有實現BlockingQueue接口,因此ConcurrentLinkedQueue沒有提供具備阻塞性質的put、take等方法。安全

 

其構造函數以下:app

    /**
     * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
     */
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>();
    }

    /**
     * Creates a {@code ConcurrentLinkedQueue}
     * initially containing the elements of the given collection,
     * added in traversal order of the collection's iterator.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
            Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
            if (h == null)
                h = t = newNode;
            else
                t.appendRelaxed(t = newNode);
        }
        if (h == null)
            h = t = new Node<E>();
        head = h;
        tail = t;
    }

解讀:函數

ConcurrentLinkedQueue維護有兩個 volatile 類型的 Node 節點分別用來存放隊列的首、尾節點。ui

建立隊列時頭、尾節點指向一個 item 爲 null 的哨兵節點。this

 

Node的定義以下:spa

    static final class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a node holding item.  Uses relaxed write because
         * item can only be seen after piggy-backing publication via CAS.
         */
        Node(E item) {
            ITEM.set(this, item);
        }

        /** Constructs a dead dummy node. */
        Node() {}

        void appendRelaxed(Node<E> next) {
            // assert next != null;
            // assert this.next == null;
            NEXT.set(this, next);
        }

        boolean casItem(E cmp, E val) {
            // assert item == cmp || item == null;
            // assert cmp != null;
            // assert val == null;
            return ITEM.compareAndSet(this, cmp, val);
        }
    }

解讀:線程

在 Node 節點內部維護了一個使用 volatile 修飾的變量 item,用來存放節點的值;rest

next 用來存放鏈表的下一個節點,從而連接爲一個單向無界鏈表。

 

添加元素的方法以下:

    /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (NEXT.compareAndSet(p, null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time; failure is OK
                        TAIL.weakCompareAndSet(this, t, newNode);
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

解讀:

offer操做是在隊列末尾添加一個元素,若是傳遞的參數是 null,則拋出 NullPointerException 異常;不然因爲 ConcurrentLinkedQueue 是無界隊列,該方法將一直會返回 true(因爲使用 CAS 無阻塞算法,該方法不會阻塞調用線程)。

隊列一開始爲空時的狀態以下圖:

 

想象一下,當單個或者多個線程操做隊列可能發生的狀況以理解上述代碼中的各個分支。

 

小結:

offer操做中的關鍵步驟是經過 CAS 操做來控制某個時間只有一個線程能夠追加元素到隊列末尾。

CAS 競爭失敗的線程會經過循環一次次嘗試,直到 CAS 成功纔會返回。

這裏經過使用無限循環不斷進行 CAS 嘗試來代替阻塞算法掛起調用線程;相比阻塞算法,這是使用 CPU資源換取阻塞所帶來的開銷。

 

移除元素的方法以下:

    public E poll() {
        restartFromHead: for (;;) {
            for (Node<E> h = head, p = h, q;; p = q) {
                final E item;
                if ((item = p.item) != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
            }
        }
    }

解讀:

poll 操做是在隊列頭部獲取並移除一個元素,若是隊列爲空則返回 null。

 

Note:

ConcurrentLinkedQueue 須要遍歷鏈表來獲取 size,而不是使用原子變量(使用原子變量保存隊列元素個數須要保證入隊、出隊操做是原子性操做)。

因爲使用非阻塞 CAS 算法,沒有加鎖,因此在計算 size 時有可能進行了 offer、poll 或者 remove 操做,致使計算的元素個數不精確,因此在井髮狀況下 size 函數不是頗有用。

相關文章
相關標籤/搜索