ConcurrentLinkedQueue源碼解讀

1.簡介

ConcurrentLinkedQueue是JUC中的基於鏈表的無鎖隊列實現。本文將解讀其源碼實現。java

2. 論文

ConcurrentLinkedQueue的實現是以Maged M. Michael和Michael L. Scott的論文Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms爲原型進行改造的,不妨閱讀此篇論文。node

下面我將論文中的介紹的入隊與出隊用接近Java語言的形式改寫並加上註釋。數據結構

enq
    node = new Node(value, null)
    loop
        tail = this.tail
        next = tail.next
        # 若是tail已經不是尾節點,重試循環。
        if tail == this.tail
            # 隊列處於穩定狀態,嘗試插入節點。
            if next == null
                # 插入新節點,將尾節點與新節點連接起來。
                # 若是成功則退出循環,不然重試。
                if CAS(tail.next, next, <node, next.count+1>
                    break
            # 隊列處於中間狀態,推動尾節點。
            else
                CAS (this.tail, tail, <next, tail.count+1>)
    # 將尾節點更新爲新插入的節點,失敗不要緊,說明其它線程更新了尾節點。
    CAS(this.tail, tail, <node, tail.count+1>)

deq
    loop
        head = this.head
        tail = this.tail
        next = head.next
        # 若是head已經不是頭節點,重試循環。
        if head == this.head
            if head == tail
                # 隊列處於穩定狀態則出隊失敗。
                if next == null
                    return false
                # 有其它線程正在入隊,推動尾節點。
                CAS(this.tail, tail, <next, tail.count+1>)
            else
                # 成功將隊列頭節點CAS到下一個節點則出隊成功,退出循環。
                if CAS(this.head, head, <next, head.count+1>)
                    break
    return true

因爲Java自帶垃圾回收,加上ConcurrentLinkedQueue對節點進行CAS且其內外方法都保證了節點不會複用,因此並不會出現ABA問題,所以節點不須要版本號。oop

3. ConcurrentLinkedQueue的實現

3.1 數據結構

正如典型的隊列設計,內部的節點用以下的Node類表示優化

/**
 * 僅展現屬性,其他略去。
 */
private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
}

值得一提的是Node中有一個lazySetNext方法this

void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}

與AtomicReference類同樣,使用了UNSAFE.putOrderedObject方法來實現低延遲的寫入。這個方法會插入Store-Store內存屏障,也就是保證寫操做不會重排。而不會插入普通volatile寫會插入的Store-Load屏障。線程

ConcurrentLinkedQueue在構造時會初始化head和tail爲一個item爲null的節點,做爲哨兵節點。設計

private transient volatile Node<E> head;

private transient volatile Node<E> tail;

3.2 設計思想

ConcurrentLinkedQueue的源碼仍是有些晦澀難懂的,可是doc很是詳細,對閱讀源碼很是有幫助。若是帶着從doc中介紹的設計與實現思路去讀源碼會輕鬆很多。指針

ConcurrentLinkedQueue是不容許向其插入空的item的,對於刪除元素,會將其item給CAS爲null,一旦某個元素的item變爲null,就意味着它再也不是隊列中的有效元素了,而且會將已刪除節點的next指針指向自身。
這樣能夠實現儘量快地從已刪除的元素跳事後面刪除的元素,回到隊列中。rest

ConcurrentLinkedQueue具備如下這些性質:

  • 隊列中任意時刻只有最後一個元素的next爲null
  • head和tail不會是null(哨兵節點的設計)
  • head未必是隊列中第一個元素(head指向的多是一個已經被移除的元素)
  • 隊列中的有效元素均可以從head經過succ方法遍歷到
  • tail未必是隊列中最後一個元素(tail.next能夠不爲null)
  • 隊列中的最後一個元素能夠從tail經過succ方法遍歷到
  • tail甚至能夠是head的前驅

這裏提到了succ方法,那麼先睹爲快,看一下succ方法吧。

final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    // 若是next就是自身(表明已經不在隊列中),則返回head,不然返回next。
    return (p == next) ? head : next;
}

由於ConcurrentLinkedQueue中的head和tail均可能會滯後,這實際上是一種避免頻繁CAS的優化。固然過分的滯後也是會影響操做效率的,因此在具體實現的時候,會盡量能有機會更新head和tail就去更新它們。

3.3 源碼解讀

3.3.1 offer方法

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // 若是p的next爲null,則說明此刻p爲隊列中最後一個元素。
        if (q == null) {
            /*
             * cas成功則newNode成功入隊,只是此刻tail仍是老的。
             * 不然說明由於線程競爭的關係沒有成功入隊,須要重試。
             */
            if (p.casNext(null, newNode)) {
                /*
                 * t是當前線程讀到的tail快照,p是上面CAS時隊列中最後一個元素。
                 * 這二者不一致說明該更新tail了。
                 * 若是CAS失敗則說明tail已經被其它線程更新過了,這不要緊。
                 */
                if (p != t) 
                    casTail(t, newNode);
                return true;
            }
        }
        /*
         * ConcurrentLinkedQueue的一個設計就是對於已經移除的元素,
         * 會將next置爲自己,用於判斷當前元素已經出隊,接着從head繼續遍歷(能夠看succ方法)。
         *
         * 在整個offer方法的執行過程當中,p必定是等於t或者在t的後面的,
         * 所以若是p已經不在隊列中的話,t也必定不在隊列中了。
         *
         * 因此從新讀取一次tail到快照t,
         * 若是t未發生變化,就從head開始繼續下去。
         * 不然讓p重新的t開始繼續嘗試入隊是一個更好的選擇(此時新的t極可能在head後面)
         */
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        else
            /*
             * 若是p與t相等,則讓p繼續向後移動一個節點。
             *
             * 若是p和t不相等,則說明已經經歷至少兩輪循環(仍然沒有入隊),
             * 則從新讀取一次tail到t,若是t發生了變化,則從t開始再次嘗試入隊。
             */
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

3.3.2 poll方法

public E poll() {
restartFromHead:
    for (;;) {
        // p初始設置爲head。
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            /* 
             * 成功將item給CAS爲null則說明成功移除了元素。
             * 這裏的item != null判斷也是爲了儘量避免無心義的CAS。
             */
            if (item != null && p.casItem(item, null)) {
                /*
                 * p若是與h不相等,則說明head極可能滯後,指向已不在隊列中的元素。
                 * 若是此時p有後繼,則更新head爲p.next,
                 * 不然儘管p已經被移除出去了,也只能更新head爲p了。
                 */
                if (p != h)
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            /*
             * 若是沒能成功移除p,且p也沒有後繼,則說明p爲此時隊列的最後元素。
             * 因此更新head爲p並返回null。
             *
             * 注意這裏h和p是可能相等的,updateHead會判斷h和p是否相等以免無心義CAS。
             */
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            /*
             * p存在後繼,須要檢查是否p還在隊列中。
             * 若是p已經不在隊列中(p==q),則從新讀一次head到快照h並讓p從h開始再嘗試移除元素。
             * 
             * 由於必定有其它線程已經經過updateHead將head從p給CAS爲新的head而且令p節點的next指向p本身,
             * 這時再一步步日後面走顯然不值得,不如從如今的head開始從新來過。
             */
            else if (p == q)
                continue restartFromHead;
            // 繼續向後走一個節點嘗試移除元素。
            else
                p = q;
        }
    }
}

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

3.3.3 peek方法

public E peek() {
restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            // 其實這裏的if就是將poll中的if前兩個分支作了個合併。
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

3.3.4 remove方法

public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        // p爲當前節點,pred爲p前驅,next爲後繼。
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            // item爲null表明元素已經無效(認爲不在隊列中)
            if (item != null) {
                // 不是要刪除的元素。
                if (!o.equals(item)) {
                    next = succ(p);
                    continue;
                }
                removed = p.casItem(item, null);
            }

            next = succ(p);
            if (pred != null && next != null)
                // 前驅與後繼連上。
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}

3.3.5 size方法

/**
 * size方法效率其實挺差的,是一個O(n)的遍歷。
 */
public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // 最多隻返回Integer.MAX_VALUE
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}


/**
 * 這個方法和poll/peek方法差很少,只不過返回的是Node而不是元素。
 * 
 * 之因此peek方法沒有複用first方法的緣由有2點
 * 1. 會增長一次volatile讀
 * 2. 有可能會由於和poll方法的競爭,致使出現非指望的結果。
 *    好比first返回的node非null,裏面的item也不是null。
 *    可是等到poll方法返回從first方法拿到的node的item的時候,item已經被poll方法CAS爲null了。
 *    那這個問題只能再peek中增長重試,這未免代價過高了。
 *
 * 這就是first和peek代碼沒有複用的緣由。
 */
Node<E> first() {
restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
相關文章
相關標籤/搜索