J.U.C 之 ConcurrentLinkedQueue

ConcurrentLinkedQueue 簡介

要實現一個線程安全的隊列有兩種方式:阻塞和非阻塞。阻塞隊列無非就是鎖的應用,而非阻塞則是CAS算法的應用。下面咱們就開始一個非阻塞算法的研究:CoucurrentLinkedQueue。html

ConcurrentLinkedQueue是一個基於連接節點的無邊界的線程安全隊列,它採用FIFO原則對元素進行排序。採用「wait-free」算法(即CAS算法)來實現的。java

CoucurrentLinkedQueue規定了以下幾個不變性:node

  1. 在入隊的最後一個元素的next爲null
  2. 隊列中全部未刪除的節點的item都不能爲null且都能從head節點遍歷到
  3. 對於要刪除的節點,不是直接將其設置爲null,而是先將其item域設置爲null(迭代器會跳過item爲null的節點)
  4. 容許head和tail更新滯後。這是什麼意思呢?意思就說是head、tail不老是指向第一個元素和最後一個元素。

head的不變性和可變性:算法

不變性安全

  1. 全部未刪除的節點均可以經過head節點遍歷到
  2. head不能爲null
  3. head節點的next不能指向自身

可變性多線程

  1. head的item可能爲null,也可能不爲null
  2. 容許tail滯後head,也就是說調用succc()方法,從head不可達tail

tail的不變性和可變性併發

不變性源碼分析

  1. tail不能爲null

可變性this

  1. tail的item可能爲null,也可能不爲null
  2. tail節點的next域能夠指向自身
  3. 容許tail滯後head,也就是說調用succc()方法,從head不可達tail

ConcurrentLinkedQueue 源碼分析

CoucurrentLinkedQueue的結構由head節點和tail節點組成,每一個節點由節點元素item和指向下一個節點的next引用組成,而節點與節點之間的關係就是經過該next關聯起來的,從而組成一張鏈表的隊列。節點Node爲ConcurrentLinkedQueue的內部類,定義以下:spa

private static class Node<E> {
      /** 節點元素域 */
      volatile E item;
      volatile Node<E> next;

      //初始化,得到item 和 next 的偏移量,爲後期的CAS作準備

      Node(E item) {
          UNSAFE.putObject(this, itemOffset, item);
      }

      boolean casItem(E cmp, E val) {
          return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
      }

      void lazySetNext(Node<E> val) {
          UNSAFE.putOrderedObject(this, nextOffset, val);
      }

      boolean casNext(Node<E> cmp, Node<E> val) {
          return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
      }

      // Unsafe mechanics

      private static final sun.misc.Unsafe UNSAFE;
      /** 偏移量 */
      private static final long itemOffset;
      /** 下一個元素的偏移量 */

     private static final long nextOffset;

      static {
          try {
              UNSAFE = sun.misc.Unsafe.getUnsafe();
              Class<?> k = Node.class;
              itemOffset = UNSAFE.objectFieldOffset
                      (k.getDeclaredField("item"));
              nextOffset = UNSAFE.objectFieldOffset
                      (k.getDeclaredField("next"));
          } catch (Exception e) {
              throw new Error(e);
          }
      }
  }
複製代碼

入列

入列,咱們認爲是一個很是簡單的過程:tail節點的next執行新節點,而後更新tail爲新節點便可。從單線程角度咱們這麼理解應該是沒有問題的,可是多線程呢?若是一個線程正在進行插入動做,那麼它必須先獲取尾節點,而後設置尾節點的下一個節點爲當前節點,可是若是已經有一個線程剛恰好完成了插入,那麼尾節點是否是發生了變化?對於這種狀況ConcurrentLinkedQueue怎麼處理呢?咱們先看源碼:

offer(E e):將指定元素插入都隊列尾部:

public boolean offer(E e) {
        //檢查節點是否爲null
        checkNotNull(e);
        // 建立新節點
        final Node<E> newNode = new Node<E>(e);

        //死循環 直到成功爲止
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // q == null 表示 p已是最後一個節點了,嘗試加入到隊列尾
            // 若是插入失敗,則表示其餘線程已經修改了p的指向
            if (q == null) {                                // --- 1
                // casNext:t節點的next指向當前節點
                // casTail:設置tail 尾節點
                if (p.casNext(null, newNode)) {             // --- 2
                    // node 加入節點後會致使tail距離最後一個節點相差大於一個,須要更新tail
                    if (p != t)                             // --- 3
                        casTail(t, newNode);                    // --- 4
                    return true;
                }
            }
            // p == q 等於自身
            else if (p == q)                                // --- 5
                // p == q 表明着該節點已經被刪除了
                // 因爲多線程的緣由,咱們offer()的時候也會poll(),若是offer()的時候正好該節點已經poll()了
                // 那麼在poll()方法中的updateHead()方法會將head指向當前的q,而把p.next指向本身,即:p.next == p
                // 這樣就會致使tail節點滯後head(tail位於head的前面),則須要從新設置p
                p = (t != (t = tail)) ? t : head;           // --- 6
            // tail並無指向尾節點
            else
                // tail已經不是最後一個節點,將p指向最後一個節點
                p = (p != t && t != (t = tail)) ? t : q;    // --- 7
        }
    }
複製代碼

光看源碼仍是有點兒迷糊的,插入節點一次分析就會明朗不少。

初始化

ConcurrentLinkedQueue初始化時head、tail存儲的元素都爲null,且head等於tail:

添加元素A

按照程序分析:第一次插入元素A,head = tail = dummyNode,全部q = p.next = null,直接走步驟2:p.casNext(null, newNode),因爲 p == t成立,因此不會執行步驟3:casTail(t, newNode),直接return。插入A節點後以下:

添加元素B

q = p.next = A ,p = tail = dummyNode,因此直接跳到步驟7:p = (p != t && t != (t = tail)) ? t : q;。此時p = q,而後進行第二次循環 q = p.next = null,步驟2:p == null成立,將該節點插入,由於p = q,t = tail,因此步驟3:p != t 成立,執行步驟4:casTail(t, newNode),而後return。以下:

添加節點C

此時t = tail ,p = t,q = p.next = null,和插入元素A無異,以下:

這裏整個offer()過程已經分析完成了,可能p == q 有點兒難理解,p 不是等於q.next麼,怎麼會有p == q呢?這個疑問咱們在出列poll()中分析

出列

ConcurrentLinkedQueue提供了poll()方法進行出列操做。入列主要是涉及到tail,出列則涉及到head。咱們先看源碼:

public E poll() {
        // 若是出現p被刪除的狀況須要從head從新開始
        restartFromHead:        // 這是什麼語法?真心沒有見過
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {

                // 節點 item
                E item = p.item;

                // item 不爲null,則將item 設置爲null
                if (item != null && p.casItem(item, null)) {                    // --- 1
                    // p != head 則更新head
                    if (p != h)                                                 // --- 2
                        // p.next != null,則將head更新爲p.next ,不然更新爲p
                        updateHead(h, ((q = p.next) != null) ? q : p);          // --- 3
                    return item;
                }
                // p.next == null 隊列爲空
                else if ((q = p.next) == null) {                                // --- 4
                    updateHead(h, p);
                    return null;
                }
                // 當一個線程在poll的時候,另外一個線程已經把當前的p從隊列中刪除——將p.next = p,p已經被移除不能繼續,須要從新開始
                else if (p == q)                                                // --- 5
                    continue restartFromHead;
                else
                    p = q;                                                      // --- 6
            }
        }
    }
複製代碼

這個相對於offer()方法而言會簡單些,裏面有一個很重要的方法:updateHead(),該方法用於CAS更新head節點,以下:

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}
複製代碼

咱們先將上面offer()的鏈表poll()掉,添加A、B、C節點結構以下:

poll A

head = dumy,p = head, item = p.item = null,步驟1不成立,步驟4:(q = p.next) == null不成立,p.next = A,跳到步驟6,下一個循環,此時p = A,因此步驟1 item != null,進行p.casItem(item, null)成功,此時p == A != h,因此執行步驟3:updateHead(h, ((q = p.next) != null) ? q : p),q = p.next = B != null,則將head CAS更新成B,以下:

poll B

head = B , p = head = B,item = p.item = B,步驟成立,步驟2:p != h 不成立,直接return,以下:

poll C

head = dumy ,p = head = dumy,tiem = p.item = null,步驟1不成立,跳到步驟4:(q = p.next) == null,不成立,而後跳到步驟6,此時,p = q = C,item = C(item),步驟1成立,因此講C(item)設置爲null,步驟2:p != h成立,執行步驟3:updateHead(h, ((q = p.next) != null) ? q : p),以下:

看到這裏是否是一目瞭然了,在這裏咱們再來分析offer()的步驟5:

else if(p == q){
	p = (t != (t = tail))? t : head;
}
複製代碼

ConcurrentLinkedQueue中規定,p == q代表,該節點已經被刪除了,也就說tail滯後於head,head沒法經過succ()方法遍歷到tail,怎麼作? (t != (t = tail))? t : head;(這段代碼的可讀性實在是太差了,真他媽難理解:不知道是否能夠理解爲t != tail ? tail : head)這段代碼主要是來判讀tail節點是否已經發生了改變,若是發生了改變,則說明tail已經從新定位了,只須要從新找到tail便可,不然就只能指向head了。

就上面那個咱們再次插入一個元素D。則p = head,q = p.next = null,執行步驟1: q = null且 p != t ,因此執行步驟4:,以下:

再插入元素E,q = p.next = null,p == t,因此插入E後以下:

總結

ConcurrentLinkedQueue 的非阻塞算法實現可歸納爲下面 5 點:

  1. 使用 CAS 原子指令來處理對數據的併發訪問,這是非阻塞算法得以實現的基礎。
  2. head/tail 並不是老是指向隊列的頭 / 尾節點,也就是說容許隊列處於不一致狀態。 這個特性把入隊 / 出隊時,本來須要一塊兒原子化執行的兩個步驟分離開來,從而縮小了入隊 / 出隊時須要原子化更新值的範圍到惟一變量。這是非阻塞算法得以實現的關鍵。
  3. 因爲隊列有時會處於不一致狀態。爲此,ConcurrentLinkedQueue 使用三個不變式來維護非阻塞算法的正確性。
  4. 以批處理方式來更新 head/tail,從總體上減小入隊 / 出隊操做的開銷。
  5. 爲了有利於垃圾收集,隊列使用特有的 head 更新機制;爲了確保從已刪除節點向後遍歷,可到達全部的非刪除節點,隊列使用了特有的向後推動策略。
相關文章
相關標籤/搜索