JUC源碼分析-集合篇(三)ConcurrentLinkedQueue

JUC源碼分析-集合篇(三)ConcurrentLinkedQueue

在併發編程中,有時候須要使用線程安全的隊列。若是要實現一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另外一種是使用非阻塞算法。使用阻塞算法的隊列能夠用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不一樣的鎖)等方式來實現。非阻塞的實現方
式則可使用循環 CAS 的方式來實現。本節讓咱們一塊兒來研究一下 Doug Lea 是如何使用非阻塞的方式來實現線程安全隊列 ConcurrentLinkedQueue 的,相信從大師身上咱們能學到很多併發編程的技巧。html

  • ConcurrentLinkedQueue 先進先出(FIFO)單向隊列
  • ConcurrentLinkedDeque 雙向隊列

下面以 ConcurrentLinkedQueue 爲例看使用非阻塞算法(CAS) 保證線程安全。java

1. 數據結構

ConcurrentLinkedQueue數據結構

ConcurrentLinkedQueue 由 head 節點和 tail 節點組成,每一個節點(Node)由節點元素(item)和
指向下一個節點(next)的引用組成,節點與節點之間就是經過這個 next 關聯起來,從而組成一
張鏈表結構的隊列。默認狀況下 head 節點存儲的元素爲空,tail 節點等於 head 節點。head、tail 以及 Node.item、Node.next 都是 volatile 修辭。node

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
}

默認狀況下 head、tail 都是空節點。算法

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

獲取一個節點的後繼節點編程

// 遇到哨兵節點,從 head 開始遍歷
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

2. 入隊 offer

入隊列就是將入隊節點添加到隊列的尾部。爲了方便理解入隊時隊列的變化,以及 head 節點和 tail 節點的變化,每添加一個節點我就作了一個隊列的快照圖(注意這是單線程入隊狀況)。安全

offer入隊

第一步添加元素 e1。隊列更新 head 節點的 next 節點爲元素 e1 節點。又由於 tail 節點默認狀況下等於 head 節點,因此它們的 next 節點都指向元素 e1 節點。
第二步添加元素 e2。隊列首先設置元素 e1 節點的 next 節點爲元素 e2 節點,而後更新 tail 節點指向元素 e2 節點。
第三步添加元素 e3,設置 tail 節點的next節點爲元素 e3 節點。
第四步添加元素 e4,設置元素 e3 的 next 節點爲元素 e4 節點,而後將 tail 節點指向元素 e4 節點。數據結構

經過 debug 入隊過程並觀察 head 節點和 tail 節點的變化,發現入隊主要作兩件事情,第一是將入隊節點設置成當前隊列尾節點的下一個節點。第二是更新 tail 節點,若是 tail 節點的 next 節點不爲空,則將入隊節點設置成 tail 節點,若是 tail 節點的 next 節點爲空,則將入隊節點設置成 tail 的 next 節點,因此 tail 節點不老是尾節點,理解這一點對於咱們研究源碼會很是有幫助。併發

上面的分析讓咱們從單線程入隊的角度來理解入隊過程,可是多個線程同時進行入隊狀況就變得更加複雜,由於可能會出現其餘線程插隊的狀況。若是有一個線程正在入隊,那麼它必須先獲取尾節點,而後設置尾節點的下一個節點爲入隊節點,但這時可能有另一個線程插隊了,那麼隊列的尾節點就會發生變化,這時當前線程要暫停入隊操做,而後從新獲取尾節點。讓咱們再經過源碼來詳細分析下它是如何使用CAS算法來入隊的。源碼分析

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // 1. p is last node
        if (q == null) {
            // 1.1 經過自旋保證節點必定添加到數據鏈中
            if (p.casNext(null, newNode)) {
                // 1.2 p表明當前結點,當前節點不是尾節點時更新
                //     也就是說tail不必定是尾節點,尾節點爲tail或tail.next
                //     更新失敗了也不要緊,由於失敗了表示有其餘線程成功更新了tail節點
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        // 2. 遇到哨兵節點,從 head 開始遍歷
        //    可是若是 tail 被修改,則使用 tail(由於可能被修改正確了)
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        // 3. 尾節點只多是tail或tail.next。若是tail發生變化則直接從tail開始遍歷
        else
            // Check for tail updates after two hops.
            // 其實我認爲這裏一直取p.next節點遍歷最終能夠遍歷到尾節點,能夠沒必要取從新tail
            // 可能從新取tail會遍歷更快
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

3. hops 設計意圖

上面分析咱們知道真正的尾節點可能 tail 或 tail.next,doug lea 寫的代碼和邏輯仍是稍微有點複雜。那麼可不可讓 tail 永遠指向尾節點呢?代碼以下spa

public boolean offer(E e) {
  Node n = new Node(e);
  for (;;) {
     Node</e><e> t = tail;
     if (t.casNext(null, n) && casTail(t, n)) {
        return true;
     }
  }
}

讓 tail 節點永遠做爲隊列的尾節點,這樣實現代碼量很是少,並且邏輯很是清楚和易懂。可是這麼作有個缺點就是每次都須要使用循環 CAS 更新 tail 節點,若是能減小 CAS 更新 tail 節點的次數,就能提升入隊的效率

因此 doug lea 使用 hops 變量(JDK1.8沒有直接使用hops,但邏輯沒有改變)來控制並減小 tail 節點的更新頻率,並非每次節點入隊後都將 tail 節點更新成尾節點,而是當 tail 節點和尾節點的距離大於等於常量 HOPS 的值(默認等於1)時才更新 tail 節點,tail 和尾節點的距離越長使用 CAS 更新 tail 節點的次數就會越少,可是距離越長帶來的負面效果就是每次入隊時定位尾節點的時間就越長,由於循環體須要多循環一次來定位出尾節點,可是這樣仍然能提升入隊的效率,由於從本質上來看它經過增長對 volatile 變量的讀操做來減小了對 volatile 變量的寫操做,而對 volatile 變量的寫操做開銷要遠遠大於讀操做,因此入隊效率會有所提高。

// JDK1.7 代碼直接使用 hops 來控制
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> n = new Node<E>(e);
    retry:
    for (;;) {
        Node<E> t = tail;
        Node<E> p = t;
        for (int hops = 0; ; hops++) {
            // 得到p節點的下一個節點。
            Node<E> next = succ(p);
            // next節點不爲空,說明p不是尾節點,須要更新p後在將它指向next節點
            if (next != null) {
                if (hops > HOPS && t != tail)
                    continue retry;
                p = next;
            } else if (p.casNext(null, n)) {
                if (hops >= HOPS)
                    casTail(t, n); // 更新tail節點,容許失敗
                return true;
            } else {
                p = succ(p);
            }
        }
    }
}

4. 出隊列 poll

出隊列的就是從隊列裏返回一個節點元素,並清空該節點對元素的引用。讓咱們經過每一個節點出隊的快照來觀察下head節點的變化。

出隊poll

出隊的代碼和入隊差很少,也有 hop 的概念。出隊了完成了兩件事:一是將節點的 item 設置爲 null;二是更新頭節點並將頭節點的 next 指向本身,也就是哨兵節點。

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            // 1. 出隊後 p.item 必定爲 null
            if (item != null && p.casItem(item, null)) {
                if (p != h) // hop two nodes at a time
                    // 更新頭節點並將頭節點的 next 指向本身。成爲哨兵節點,等 GC 回收
                    // 一樣容許失敗,說明其它的線程更新了頭節點
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            // 2. 遍歷到尾節點了,沒有元素了
            } else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            // 3. 出現哨兵節點,說明有其它線程poll後更新了head,須要從新從head開始遍歷
            } else if (p == q)
                continue restartFromHead;
            // 4. 繼續遍歷
            else
                p = q;
        }
    }
}

5. 其它經常使用方法

5.1 元素個數 size

// 能夠看到 size 是一個很耗時的方法
public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

5.1 元素是否爲空 isEmpty

public boolean isEmpty() {
    return first() == null;
}

若是隻判斷集合中是否存在元素請使用 isEmpty

5.3 查找第一個有效元素 first

// 從 head 開始遍歷找到第一個 item!=null 的元素
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            // 要麼找到了 item!=null 的元素,要麼遍歷完整個鏈表
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            } else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

5.4 查找後繼節點 succ

// 遇到哨兵節點,從 head 開始遍歷
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

參考:

  1. 《Java併發編程的藝術》第六章
  2. 併發隊列-無界非阻塞隊列 ConcurrentLinkedQueue 原理探究(有節點變化圖)
  3. 聊聊併發(六)ConcurrentLinkedQueue的實現原理分析

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索