非阻塞隊列ConcurrentLinkedQueue與CAS算法應用分析

ConcurrentLinkedQueue是無阻塞隊列的一種實現, 依賴與CAS算法實現。node

入隊offer

  1. if(q==null)當前是尾節點 -> CAS賦值tail.next = newNode, 成功就跳出循環
  2. elseif(p == q)尾節點被移除 -> 從tail或head從新日後找
  3. else不是尾節點 -> 往next找

規則定義:

當一個節點的next指向自身時, 表示節點已經被移除, 註釋中還會強調這一點。算法

完整代碼(JDK8):

/**
 * Inserts the specified element at the tail of this queue.
 * As the queue is unbounded, this method will never return {@code false}.
 *
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws NullPointerException if the specified element is null
 */
/*
* 變量說明:
*   成員變量: 
*       head: 首節點
*       tail: 尾節點, 不必定指向末尾, 兩次入隊才更新一次
*   局部變量
*         t= tail; //保存循環開始時, 當時的tail值
*         p= t; // 每次查找的起始位置, 可能指向首節點head或者臨時尾節點t
*         q= p.next; // 每次循環下一個節點
*        newNode= new Node; // 新節點
*
*
* 重要概念:
*     當p = p.next時, 表示節點已經被移除
*/
public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {    // 狀況1:  p是尾節點
            // p is last node
            // p是尾節點就直接將新節點放入末尾
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time // 一次跳兩個節點, 即插入兩次, tail更新一次
                    casTail(t, newNode);  // Failure is OK. // 失敗也無妨, 說明被別的線程更新了
                return true;  // 退出循環
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)  // 狀況2:  p節點被刪除了
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            // 保存的節點p、t都已經失效了,這時須要從新檢索,從新檢索的起始位置有兩種狀況
            //     1.1. 若是tail==t,表示tail也是失效的, 那麼從head開始找
            //     1.2. 不然tail就是被其餘線程更新了, 能夠又試着從tail找
            p = (t != (t = tail)) ? t : head;
        else             // 狀況3:   沿着p往下找
            // Check for tail updates after two hops.
            // 這段簡單看做p = q就好理解了,  這麼寫是爲了提升效率:
            //     1. 狀況二中p可能指向了head(因爲tail節點失效致使的)
            //     2. 如今tail可能被其餘線程更新,也許從新指向了隊尾
            //     3. 若是是, 嘗試則從隊尾開始找, 以減小迭代次數
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

這兩段代碼看了好久, 特別記錄下:

  1. 狀況2中的p = (t != (t = tail)) ? t : head;
    (t != (t = tail))能夠分三步來看緩存

    1.1. 首先取出t
      1.2. 將tail賦值給t
      1.3. 將先前取出的t與更新後的t比較
  2. 狀況3中p = (p != t && t != (t = tail)) ? t : q;併發

    首先: p != t: 這種狀況只有可能發生在執行了狀況2後
    現狀: 這時p指向head或者中間的元素, t指向一個被刪除了的節點
    那麼若是tail被其餘線程更新了, 咱們能夠將t從新指向tail, p指向t, 就像剛進循環同樣, 從尾節點開始檢索。
    這樣比從head日後找更有效率

出隊poll

規則定義:

補充一項, item==null,也表示節點已經被刪除(參考remove方法)。高併發

/**
* updateHead
*   
*/
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

/**
 * Tries to CAS head to p. If successful, repoint old head to itself
 * as sentinel for succ(), below.
 */
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

出隊設值操做:

先更新head, 再將舊head的next指向本身this

Note:

CAS算法實現依靠Unsafe.compareAndSwapObject實現

UNSAFE.compareAndSwapObject(對象, 字段偏移量, 當前值, 新值)線程

能夠爲對象中的某個字段實現CAS操做

lazySet依賴UNSAFE.putOrderedObject實現

UNSAFE.putOrderedObject(對象, 字段偏移量, 新值)rest

這個只能用在 volatile字段上 我的理解: volatile的設值會致使本地緩存失效, 那麼須要從新從主存讀取, 使用這個方法可使寄存器緩存依舊有效, 沒必要急於從主存取值。 使用目的: 移除節點時, 須要更新節點的next指向自身, 但如今next指向的數據實際是有效的; 高併發狀況下,若是offser方法已經緩存了這個next值, 直接設置next會致使緩存行失效, CPU須要從新讀取next; 而使用putOrderedObject可讓offser從這個next繼續檢索
相關文章
相關標籤/搜索