Java多線程進階(三十)—— J.U.C之collections框架:ConcurrentLinkedDeque

圖片描述

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、引言

在開始講ConcurrentLinkedDeque以前,咱們先來了解下Deque這種數據結構,咱們知道Queue是一種具備FIFO特色的數據結構,元素只能在隊首進行「入隊」操做,在隊尾進行「出隊」操做。java

Deque(double-ended queue)是一種雙端隊列,也就是說能夠在任意一端進行「入隊」,也能夠在任意一端進行「出隊」:
clipboard.pngnode

Deque的數據結構示意圖以下:
clipboard.png算法

咱們再來看下JDK中QueueDeque這兩種數據結構的接口定義,看看Deque和Queue相比有哪些加強:segmentfault

Queue接口定義

Queue的接口很是簡單,一共只有三種類型的操做:入隊、出隊、讀取。api

clipboard.png

上述方法,能夠劃分以下:安全

操做類型 拋出異常 返回特殊值
入隊 add(e) offer(e)
出隊 remove() poll()
讀取 element() peek()

每種操做類型,都給出了兩種方法,區別就是其中一種操做在隊列的狀態不知足某些要求時,會拋出異常;另外一種,則直接返回特殊值(如null)。數據結構

Deque接口定義

Queue接口的全部方法Deque都具有,只不過隊首/隊尾均可以進行「出隊」和「入隊」操做:併發

操做類型 拋出異常 返回特殊值
隊首入隊 addFirst(e) offerFirst(e)
隊首出隊 removeFirst() pollFirst()
隊首讀取 getFirst() peekFirst()
隊尾入隊 addLast(e) offerLast(e)
隊尾出隊 removeLast() pollLast()
隊尾讀取 getLast() peekLast()

除此以外,Deque還能夠看成「棧」來使用,咱們知道「棧」是一種具備「LIFO」特色的數據結構(關於棧,能夠參考個人這篇博文:),Deque提供了pushpoppeek這三個棧方法,通常實現這三個方法時,能夠利用已有方法,即有以下映射關係:oracle

棧方法 Deque方法
push addFirst(e)
pop removeFirst()
peek peekFirst()

關於Deque接口的更多細節,讀者能夠參考Oracle的官方文檔:https://docs.oracle.com/javas...工具

2、ConcurrentLinkedDeque簡介

ConcurrentLinkedDeque是JDK1.7時,J.U.C包引入的一個集合工具類。在JDK1.7以前,除了Stack類外,並無其它適合併發環境的「棧」數據結構。ConcurrentLinkedDeque做爲雙端隊列,能夠看成「棧」來使用,而且高效地支持併發環境。

ConcurrentLinkedDeque和ConcurrentLinkedQueue同樣,採用了無鎖算法,底層基於自旋+CAS的方式實現。

clipboard.png

3、ConcurrentLinkedDeque原理

隊列結構

咱們先來看下ConcurrentLinkedDeque的內部結構:

public class ConcurrentLinkedDeque<E> extends AbstractCollection<E>
    implements Deque<E>, java.io.Serializable {

    /**
     * 頭指針
     */
    private transient volatile Node<E> head;

    /**
     * 尾指針
     */
    private transient volatile Node<E> tail;

    private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;
    
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;

    static {
        PREV_TERMINATOR = new Node<Object>();
        PREV_TERMINATOR.next = PREV_TERMINATOR;
        NEXT_TERMINATOR = new Node<Object>();
        NEXT_TERMINATOR.prev = NEXT_TERMINATOR;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedDeque.class;
            headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    /**
     * 雙鏈表結點定義
     */
    static final class Node<E> {
        volatile Node<E> prev;  // 前驅指針
        volatile E item;        // 結點值
        volatile Node<E> next;  // 後驅指針

        Node() {
        }

        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        void lazySetPrev(Node<E> val) {
            UNSAFE.putOrderedObject(this, prevOffset, val);
        }

        boolean casPrev(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long prevOffset;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                prevOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("prev"));
                itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    
    // ...
}

能夠看到,ConcurrentLinkedDeque的內部和ConcurrentLinkedQueue相似,不過是一個雙鏈表結構,每入隊一個元素就是插入一個Node類型的結點。字段head指向隊列頭,tail指向隊列尾,經過Unsafe來CAS操做字段值以及Node對象的字段值。

clipboard.png

須要特別注意的是ConcurrentLinkedDeque包含兩個特殊字段:PREV_TERMINATOR、NEXT_TERMINATOR。
這兩個字段初始時都指向一個值爲null的空結點,這兩個字段在結點刪除時使用,後面會詳細介紹:

clipboard.png

clipboard.png


構造器定義

ConcurrentLinkedDeque包含兩種構造器:

/**
 * 空構造器.
 */
public ConcurrentLinkedDeque() {
    head = tail = new Node<E>(null);

}
/**
 * 從已有集合,構造隊列
 */
public ConcurrentLinkedDeque(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {  // 在隊尾插入元素
            t.lazySetNext(newNode);
            newNode.lazySetPrev(t);
            t = newNode;
        }
    }
    initHeadTail(h, t);
}

咱們重點看下空構造器,經過空構造器創建的ConcurrentLinkedDeque對象,其head和tail指針並不是指向null,而是指向一個item值爲null的Node結點——哨兵結點,以下圖:

clipboard.png

入隊操做

雙端隊列與普通隊列的入隊區別是:雙端隊列既能夠在「隊尾」插入元素,也能夠在「隊首」插入元素。ConcurrentLinkedDeque的入隊方法有不少:addFirst(e)addLast(e)offerFirst(e)offerLast(e)

public void addFirst(E e) {
    linkFirst(e);
}

public void addLast(E e) {
    linkLast(e);
}

public boolean offerFirst(E e) {
    linkFirst(e);
    return true;
}

public boolean offerLast(E e) {
    linkLast(e);
    return true;
}

能夠看到,隊首「入隊」其實就是調用了linkFirst(e)方法,而隊尾「入隊」是調用了 linkLast(e)方法。咱們先來看下隊首「入隊」——linkFirst(e):

/**
 * 在隊首插入一個元素.
 */
private void linkFirst(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e); // 建立待插入的結點

    restartFromHead:
    for (; ; )
        for (Node<E> h = head, p = h, q; ; ) {
            if ((q = p.prev) != null && (q = (p = q).prev) != null)
                // Check for head updates every other hop.
                // If p == q, we are sure to follow head instead.
                p = (h != (h = head)) ? h : q;
            else if (p.next == p) // PREV_TERMINATOR
                continue restartFromHead;
            else {
                // p is first node
                newNode.lazySetNext(p); // CAS piggyback
                if (p.casPrev(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this deque,
                    // and for newNode to become "live".
                    if (p != h) // hop two nodes at a time
                        casHead(h, newNode);  // Failure is OK.
                    return;
                }
                // Lost CAS race to another thread; re-read prev
            }
        }
}

爲了便於理解,咱們以示例來看:假設有兩個線程ThreadA和ThreadB同時進行入隊操做。

①ThreadA先單獨入隊一個元素9

此時,ThreadA會執行CASE3分支:

else {                              // CASE3: p是隊首結點
    newNode.lazySetNext(p);         // 「新結點」的next指向隊首結點
    if (p.casPrev(null, newNode)) { // 隊首結點的prev指針指向「新結點」
        if (p != h) // hop two nodes at a time
            casHead(h, newNode);  // Failure is OK.
        return;
    }
    // 執行到此處說明CAS操做失敗,有其它線程也在隊首插入元素
}

隊列的結構以下:
clipboard.png


②ThreadA入隊一個元素2,同時ThreadB入隊一個元素10

此時,依然執行CASE3分支,咱們假設ThreadA操做成功,ThreadB操做失敗:

else {                              // CASE3: p是隊首結點
    newNode.lazySetNext(p);         // 「新結點」的next指向隊首結點
    if (p.casPrev(null, newNode)) { // 隊首結點的prev指針指向「新結點」
        if (p != h) // hop two nodes at a time
            casHead(h, newNode);  // Failure is OK.
        return;
    }
    // 執行到此處說明CAS操做失敗,有其它線程也在隊首插入元素
}

ThreadA的CAS操做成功後,會進入如下判斷:

if (p != h) // hop two nodes at a time
    casHead(h, newNode);  // Failure is OK.

上述判斷的做用就是重置head頭指針,能夠看到,ConcurrentLinkedDeque實際上是以每次跳2個結點的方式移動指針,這主要考慮到併發環境以這種hop跳的方式能夠提高效率。

此時隊列的機構以下:
clipboard.png

注意,此時ThreadB的p.casPrev(null, newNode)操做失敗了,因此會進入下一次自旋,在下一次自旋中繼續進入CASE3。若是ThreadA的casHead操做沒有完成,ThreadB就進入了下一次自旋,則會進入分支1,重置指針p指向隊首。最終隊列結構以下:

clipboard.png

在隊尾插入元素和隊首相似,再也不贅述,讀者能夠本身閱讀源碼。

出隊操做

ConcurrentLinkedDeque的出隊同樣分爲隊首、隊尾兩種狀況:removeFirst()pollFirst()removeLast()pollLast()

public E removeFirst() {
    return screenNullResult(pollFirst());
}

public E removeLast() {
    return screenNullResult(pollLast());
}

public E pollFirst() {
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && p.casItem(item, null)) {
            unlink(p);
            return item;
        }
    }
    return null;
}

public E pollLast() {
    for (Node<E> p = last(); p != null; p = pred(p)) {
        E item = p.item;
        if (item != null && p.casItem(item, null)) {
            unlink(p);
            return item;
        }
    }
    return null;
}

能夠看到,兩個remove方法其實內部都調用了對應的poll方法,咱們重點看下隊尾的「出隊」——pollLast方法:

public E pollLast() {
    for (Node<E> p = last(); p != null; p = pred(p)) {
        E item = p.item;
        if (item != null && p.casItem(item, null)) {
            unlink(p);
            return item;
        }
    }
    return null;
}

last方法用於尋找隊尾結點,即知足p.next == null && p.prev != p的結點:

Node<E> last() {
    restartFromTail:
    for (; ; )
        for (Node<E> t = tail, p = t, q; ; ) {
            if ((q = p.next) != null &&
                (q = (p = q).next) != null)
                // Check for tail updates every other hop.
                // If p == q, we are sure to follow tail instead.
                p = (t != (t = tail)) ? t : q;
            else if (p == t
                // It is possible that p is NEXT_TERMINATOR,
                // but if so, the CAS is guaranteed to fail.
                || casTail(t, p))
                return p;
            else
                continue restartFromTail;
        }
}

pred方法用於尋找當前結點的前驅結點(若是前驅是自身,則返回隊尾結點):

final Node<E> pred(Node<E> p) {
    Node<E> q = p.prev;
    return (p == q) ? last() : q;
}

unlink方法斷開結點的連接:

/**
 * Unlinks non-null node x.
 */
void unlink(Node<E> x) {
    // assert x != null;
    // assert x.item == null;
    // assert x != PREV_TERMINATOR;
    // assert x != NEXT_TERMINATOR;

    final Node<E> prev = x.prev;
    final Node<E> next = x.next;
    if (prev == null) {
        unlinkFirst(x, next);
    } else if (next == null) {
        unlinkLast(x, prev);
    } else {
        Node<E> activePred, activeSucc;
        boolean isFirst, isLast;
        int hops = 1;

        // Find active predecessor
        for (Node<E> p = prev; ; ++hops) {
            if (p.item != null) {
                activePred = p;
                isFirst = false;
                break;
            }
            Node<E> q = p.prev;
            if (q == null) {
                if (p.next == p)
                    return;
                activePred = p;
                isFirst = true;
                break;
            } else if (p == q)
                return;
            else
                p = q;
        }

        // Find active successor
        for (Node<E> p = next; ; ++hops) {
            if (p.item != null) {
                activeSucc = p;
                isLast = false;
                break;
            }
            Node<E> q = p.next;
            if (q == null) {
                if (p.prev == p)
                    return;
                activeSucc = p;
                isLast = true;
                break;
            } else if (p == q)
                return;
            else
                p = q;
        }

        // TODO: better HOP heuristics
        if (hops < HOPS
            // always squeeze out interior deleted nodes
            && (isFirst | isLast))
            return;

        // Squeeze out deleted nodes between activePred and
        // activeSucc, including x.
        skipDeletedSuccessors(activePred);
        skipDeletedPredecessors(activeSucc);

        // Try to gc-unlink, if possible
        if ((isFirst | isLast) &&

            // Recheck expected state of predecessor and successor
            (activePred.next == activeSucc) &&
            (activeSucc.prev == activePred) &&
            (isFirst ? activePred.prev == null : activePred.item != null) &&
            (isLast ? activeSucc.next == null : activeSucc.item != null)) {

            updateHead(); // Ensure x is not reachable from head
            updateTail(); // Ensure x is not reachable from tail

            // Finally, actually gc-unlink
            x.lazySetPrev(isFirst ? prevTerminator() : x);
            x.lazySetNext(isLast ? nextTerminator() : x);
        }
    }
}
ConcurrentLinkedDeque相比ConcurrentLinkedQueue,功能更豐富,可是因爲底層結構是雙鏈表,且徹底採用CAS+自旋的無鎖算法保證線程安全性,因此須要考慮各類併發狀況,源碼比ConcurrentLinkedQueue更加難懂,留待有精力做進一步分析。

4、總結

ConcurrentLinkedDeque使用了自旋+CAS的非阻塞算法來保證線程併發訪問時的數據一致性。因爲隊列自己是一種雙鏈表結構,因此雖然算法看起來很簡單,但其實須要考慮各類併發的狀況,實現複雜度較高,而且ConcurrentLinkedDeque不具有實時的數據一致性,實際運用中,若是須要一種線程安全的棧結構,可使用ConcurrentLinkedDeque。

另外,關於ConcurrentLinkedDeque還有如下須要注意的幾點:

  1. ConcurrentLinkedDeque的迭代器是弱一致性的,這在併發容器中是比較廣泛的現象,主要是指在一個線程在遍歷隊列結點而另外一個線程嘗試對某個隊列結點進行修改的話不會拋出ConcurrentModificationException,這也就形成在遍歷某個還沒有被修改的結點時,在next方法返回時能夠看到該結點的修改,但在遍歷後再對該結點修改時就看不到這種變化。
  2. size方法須要遍歷鏈表,因此在併發狀況下,其結果不必定是準確的,只能供參考。
相關文章
相關標籤/搜索