JDK容器學習之Queue:ConcurrentLinkedQueue

併發安全的鏈表隊列 ConcurrentLinkedQueue

併發安全的鏈表隊列,主要適用於多線程環境中;底層數據結構爲鏈表,因爲隊列自己頻繁的出隊和進隊,那麼這個線程安全是如何保障html

I. 數據結構

從命名能夠基本推測底層數據結構應該是鏈表,結合源碼看下具體的鏈表節點java

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            // jdk內用於保障原子操做的輔助類
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

// 鏈表頭,注意 volatile 聲明
private transient volatile Node<E> head;

// 鏈表尾,注意 volatile 聲明
private transient volatile Node<E> tail;

從定義能夠須要注意如下幾點node

  1. Node內部只有next節點,所以底層存儲爲單向鏈表
  2. head和tail聲明爲volatile,禁止指令重排和修改對其餘線程及時可見,保障線程安全的基本前提之一
  3. sun.misc.Unsafe UNSAFE JDK內部大量使用的一個輔助類,用於保障基本的cas操做(其原理尚沒有研究,後續在併發篇中詳細探究下)

II. 線程安全保障

按照常見的線程安全保障機制,通常處理方案是對進隊和出隊操做進行加鎖,保障同一時刻只能有一個線程對隊列進行寫操做安全

然而隊列不一樣於Map,List, 出隊和進隊是比較頻繁的操做,即隊列會出現頻繁的修改,若是加鎖,性能無異會受到嚴重的影響數據結構

所以線程安全保障不是經過加鎖來實現的多線程

1. 進隊 offer

經過源碼分析,不加鎖如何實現線程安全併發

public boolean offer(E e) {
    // 隊列中不能塞null
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    // 死循環,確保入隊必定成功
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) { // p 爲最後一個節點,嘗試入隊
            if (p.casNext(null, newNode)) { // 將tail的next指向newNode
                // 入隊成功
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // 多線程環境下,若此時另外一個線程執行了出隊操做,且此時p出隊了
            // 那麼在poll方法中的updateHead方法會將head指向當前的q,而把p.next指向本身,即:p.next == p
            // 這個時候就會形成tail在head的前面,須要從新設置p
            // 若是tail已經改變,將p指向tail,但這個時候tail依然可能在head前面
            // 若是tail沒有改變,直接將p指向head
            p = (t != (t = tail)) ? t : head;
        else
            // tail已經不是最後一個節點,將p指向最後一個節點
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

上面的實現雖然很短,在單線程環境下很好理解,就是獲取隊列尾,而後將隊列尾的next指向新的節點,並更新tail便可 (即代碼中if條件命中的邏輯)源碼分析

涉及到多線程進行併發的出隊進隊時,邏輯就沒這麼簡單了,下面進行多線程的場景區分,輔助理解性能

case1: 另外一個線程也進行入隊操做,且優先完成入隊操做this

下面圖解進行示意

  • 因爲線程B完成入隊,致使q指向新的隊尾
  • q != null, q != p
  • 進入else邏輯,執行將p指向最後一個節點,再次循環

conOffer

case2: 另外一個線程執行出隊操做,且節點p正好出隊了

下面圖解示意

  • 完成初始化以後,p,t指向tail
  • 另外一個線程執行出隊操做,正好將p出隊了,由於出隊中會執行updateHead邏輯,可能出現 p.next = p的場景
  • 此時進行q的賦值,獲得結果 q==p, 而此時指向的tail節點不會爲null(由於指向的是內存中原隊列的Node節點,真實存在),所以進入 else if邏輯

conOffer2

2. 出隊 poll

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

出隊操做,原理和入隊操做差很少,都是經過非鎖機制實現,經過CAS確保出隊和入隊自己的原子性;而爲了保證多線程的併發修改安全,在死循環中進行了各類場景的兼容

3. 隊列個數獲取

單獨拿出size方法,由於與常見的容器不一樣,ConcurrentLinkedQueuesize()方法是非併發安全,且每次都會進行掃描整個鏈表,結果以下

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

III. 應用場景&小結

  1. 底層存儲結構爲單向鏈表
  2. 出隊、入隊都是非加鎖操做,經過CAS保證出隊和入隊的原子性;爲保證併發安全,出隊和入隊放在死循環中進行,對不一樣的併發場景進行兼容
  3. size()方法非線程安全,且時間複雜度爲 O(n)
  4. 判斷隊列是否爲空,請用 isEmpty() 進行替代
  5. 由於未加鎖,出隊入隊的性能相對較好,切代碼的實現比較優雅;然實際的業務場景中,非大神級人物儘可能不要這麼玩,維護成本過高

掃描關注,java分享

QrCode

參考

相關文章
相關標籤/搜索