聊聊 JDK 非阻塞隊列源碼(CAS實現)

正如上篇文章聊聊 JDK 阻塞隊列源碼(ReentrantLock實現)所說,隊列在咱們現實生活中隊列隨處可見,最經典的就是去銀行辦理業務,超市買東西排隊等。今天樓主要講的就是JDK中安全隊列的另外一種實現使用CAS算法實現的安全隊列。
html

JDK 中的隊列

JDK中的隊列都實現了 java.util.Queue 接口,下面就是樓主要說的無鎖版本的隊列實現:java

隊列名字 是否加鎖 數據結構
LinkedTransferQueue 鏈表
ConcurrentLinkedQueue 鏈表

LinkedTransferQueue 源碼分析

LinkedTransferQueue 的原理就是經過使用原子變量compare and swap(簡稱「CAS」)這種不加鎖的方式來實現的進行併發控制,LinkedTransferQueue是一個無界的安全隊列,其長度能夠無限延伸,固然其帶來的問題也是顯而易見的。
node

主要方法源碼實現

  1. add:添加元素到隊列裏,添加成功返回true;
  2. offer:添加元素到隊列裏,添加成功返回true,添加失敗返回false;
  3. put:添加元素到隊列裏,若是容量滿了會阻塞直到容量不滿;
  4. poll:刪除隊列頭部元素,若是隊列爲空,返回null。不然返回元素;
  5. take:刪除隊列頭部元素,若是隊列爲空,一直阻塞到隊列有元素並刪除。

add方法:
算法

public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

offer方法:
安全

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

poll方法:
數據結構

public E poll() {
    return xfer(null, false, NOW, 0);
}

take方法:
併發

public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

從上面代碼中能夠看出,這些方法最終都指向了 xfer 方法,只不過傳入的不一樣的參數。app

/**
 * Implements all queuing methods. See above for explanation.
 *
 * @param e the item or null for take
 * @param haveData true if this is a put, else a take
 * @param how NOW, ASYNC, SYNC, or TIMED
 * @param nanos timeout in nanosecs, used only if mode is TIMED
 * @return an item if matched, else e
 * @throws NullPointerException if haveData mode but e is null
 */

從源碼的 doc 註釋中能夠知道
第一個參數,若是是 put 類型,就是實際的值,反之就是 null。
第二個參數,是否包含數據,put 類型就是 true,take 就是 false。
第三個參數,執行類型,有當即返回的NOW,有異步的ASYNC,有阻塞的SYNC, 有帶超時的 TIMED。
第四個參數,只有在 TIMED類型纔有做用。less

接下來咱們來看看 xfer 究竟是何方神聖異步

private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race
        // 從  head 開始
        for (Node h = head, p = h; p != null;) { // find & match first node
            // head 的類型。
            boolean isData = p.isData;
            // head 的數據
            Object item = p.item;
            // item != null 有 2 種狀況,一是 put 操做, 二是 take 的 itme 被修改了(匹配成功)
            // (itme != null) == isData 要麼表示 p 是一個 put 操做, 要麼表示 p 是一個還沒匹配成功的 take 操做
            if (item != p && (item != null) == isData) { 
                // 若是當前操做和 head 操做相同,就沒有匹配上,結束循環,進入下面的 if 塊。
                if (isData == haveData)   // can't match
                    break;
                // 若是操做不一樣,匹配成功, 嘗試替換 item 成功,
                if (p.casItem(item, e)) { // match
                    // 更新 head
                    for (Node q = p; q != h;) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    // 喚醒原 head 線程.
                    LockSupport.unpark(p.waiter);
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            // 找下一個
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }
        // 若是這個操做不是馬上就返回的類型    
        if (how != NOW) {                 // No matches available
            // 且是第一次進入這裏
            if (s == null)
                // 建立一個 node
                s = new Node(e, haveData);
            // 嘗試將 node 追加對隊列尾部,並返回他的上一個節點。
            Node pred = tryAppend(s, haveData);
            // 若是返回的是 null, 表示不能追加到 tail 節點,由於 tail 節點的模式和當前模式相反.
            if (pred == null)
                // 重來
                continue retry;           // lost race vs opposite mode
            // 若是不是異步操做(即馬上返回結果)
            if (how != ASYNC)
                // 阻塞等待匹配值
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

代碼有點長,其實邏輯很簡單。就是找到 head 節點,若是 head 節點是匹配的操做,就直接賦值,若是不是,添加到隊列中。
注意:隊列中永遠只有一種類型的操做,要麼是 put 類型, 要麼是 take 類型.

ConcurrentLinkedQueue 源碼分析

LinkedTransferQueue 同樣,ConcurrentLinkedQueue 同樣是採用原子變量實現的併發控制,ConcurrentLinkedQueue 是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部,當咱們獲取一個元素時,它會返回隊列頭部的元素。它採用了「wait-free」算法來實現。

主要方法源碼實現

  1. add:添加元素到隊列裏,添加成功返回true;
  2. offer:添加元素到隊列裏,添加成功返回true,添加失敗返回false;
  3. put:添加元素到隊列裏,若是容量滿了會阻塞直到容量不滿;
  4. poll:刪除隊列頭部元素,若是隊列爲空,返回null。不然返回元素;
  5. remove:基於對象找到對應的元素,並刪除。刪除成功返回true,不然返回false;

add方法:

public boolean add(E e) {
    return offer(e);
}

offer方法:

ConcurrentLinkedQueue是無界的,因此offer永遠返回true,不能經過返回值來判斷是否入隊成功,

public boolean offer(E e) {
    // 校驗是否爲空
    checkNotNull(e);
    //入隊前,建立一個入隊節點
    final Node<E> newNode = new Node<E>(e);

    //循環CAS直到入隊成功。
    // 一、根據tail節點定位出尾節點(last node);
    // 二、將新節點置爲尾節點的下一個節點,
    // 三、更新尾節點casTail。
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        //判斷p是否是尾節點,tail節點不必定是尾節點,判斷是否是尾節點的依據是該節點的next是否是null
        if (q == null) { 
            // p is last node
            if (p.casNext(null, newNode)) {
                 //設置P節點的下一個節點爲新節點,若是p的next爲null,說明p是尾節點,casNext返回true;
                 // 若是p的next不爲null,說明有其餘線程更新過隊列的尾節點,casNext返回false。
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            //p節點是null的head節點恰好被出隊,更新head節點時h.lazySetNext(h)把舊的head節點指向本身
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            //判斷tail節點有沒有被更新,若是沒被更新,1)p=q:p指向p.next繼續尋找尾節點;
            //若是被更新了,2)p=t:P賦值爲新的tail節點
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

poll方法:

public E poll() {
    restartFromHead:
    //兩層循環
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            //隊列爲空,更新head節點
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                //p節點是null的head節點恰好被出隊,更新head節點時h.lazySetNext(h);把舊的head節點指向本身。
                //從新從head節點開始
                continue restartFromHead;
            else
                p = q;    //將p執行p的下一個節點
        }
    }
}

//更新head節點
final void updateHead(Node<E> h, Node<E> p) {
    //經過CAS將head更新爲P
    if (h != p && casHead(h, p))
        h.lazySetNext(h);//把舊的head節點指向本身
}

void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}

remove方法:

public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        // 循環CAS直到刪除節點
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            if (item != null) {
                if (!o.equals(item)) {
                    next = succ(p);
                    continue;
                }
                // 經過CAS刪除節點
                removed = p.casItem(item, null);
            }

            next = succ(p);
            if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}

小結

本文主要介紹了兩種CAS算法實現的安全隊列,然而穩定性要較高的系統中,爲了防止生產者速度過快,致使內存溢出,一般是不建議選擇無界隊列的。固然樓主水平有限,文章中難免有紕漏,望小夥伴諒解並指出,在技術的道路上一塊兒成長。

參考連接

相關文章
相關標籤/搜索