源碼|併發一枝花之ConcurrentLinkedQueue【僞】

首先聲明,本文是僞源碼分析。主要是基於狀態機本身實現一個簡化的併發隊列,有助於讀者掌握併發程序設計的核心——狀態機;最後對源碼實現略有說起。java

ConcurrentLinkedQueue不支持阻塞,沒有BlockingQueue那麼易用;但在中等規模的併發場景下,其性能卻比BlockingQueue高很多,並且至關穩定。同時,ConcurrentLinkedQueue是學習CAS的經典案例。根據github的code results排名,ConcurrentLinkedQueue(164k)也十分流行,比我想象中的使用量大多了。很是值得一講。node

對於狀態機和併發程序設計的基本理解,能夠參考源碼|併發一枝花之BlockingQueue,建議第一次接觸狀態機的同窗速讀參考文章以後,再來閱讀此文章。git

JDK版本:oracle java 1.8.0_102github

準備知識:CAS

讀者能夠跳過這部分,後面講到offer()方法的實現時再回顧。面試

悲觀鎖與樂觀鎖

  • 悲觀鎖:假定併發環境是悲觀的,若是發生併發衝突,就會破壞一致性,因此要經過獨佔鎖完全禁止衝突發生。有一個經典比喻,「若是你不鎖門,那麼搗蛋鬼就回闖入並搞得一團糟」,因此「你只能一次打開門放進一我的,才能時刻盯緊他」。
  • 樂觀鎖:假定併發環境是樂觀的,即,雖然會有併發衝突,但衝突可發現且不會形成損害,因此,能夠不加任何保護,等發現併發衝突後再決定放棄操做仍是重試。可類比的比喻爲,「若是你不鎖門,那麼雖然搗蛋鬼會闖入,但他們一旦打算破壞你就能知道」,因此「你大能夠放進全部人,等發現他們想破壞的時候再作決定」。

一般認爲樂觀鎖的性能比悲觀所更高,特別是在某些複雜的場景。這主要因爲悲觀鎖在加鎖的同時,也會把某些不會形成破壞的操做保護起來;而樂觀鎖的競爭則只發生在最小的併發衝突處,若是用悲觀鎖來理解,就是「鎖的粒度最小」。但樂觀鎖的設計每每比較複雜,所以,複雜場景下仍是多用悲觀鎖。安全

首先保證正確性,有必要的話,再去追求性能。bash

CAS

樂觀鎖的實現每每須要硬件的支持,多數處理器都都實現了一個CAS指令,實現「Compare And Swap」的語義(這裏的swap是「換入」,也就是set),構成了基本的樂觀鎖。數據結構

CAS包含3個操做數:併發

  • 須要讀寫的內存位置V
  • 進行比較的值A
  • 擬寫入的新值B

當且僅當位置V的值等於A時,CAS纔會經過原子方式用新值B來更新位置V的值;不然不會執行任何操做。不管位置V的值是否等於A,都將返回V原有的值。oracle

一個有意思的事實是,「使用CAS控制併發」與「使用樂觀鎖」並不等價。CAS只是一種手段,既能夠實現樂觀鎖,也能夠實現悲觀鎖。樂觀、悲觀只是一種併發控制的策略。下文將分別用CAS實現悲觀鎖和樂觀鎖? 咱們先不講JDK提供的實現,用狀態機模型來分析一下,看咱們能不能本身實現一版。

隊列的狀態機模型

狀態機模型與是否須要併發無關,一個類無論是不是線程安全的,其狀態機模型從類被實現(此時,全部類行爲都是肯定的)開始就是肯定的。接口是類行爲的一個子集,咱們從接口出發,逐漸構建出簡化版ConcurrentLinkedQueue的狀態機模型。

隊列接口

ConcurrentLinkedQueue實現了Queue接口:

public interface BlockingQueue<E> extends Queue<E> {
  boolean add(E e);

  boolean offer(E e);

  E remove();

  E poll();

  E element();
  
  E peek();
}
複製代碼

須要關注的是一對方法:

  • offer():入隊,成功返回true,失敗返回false。JDK中ConcurrentLinkedQueue實現爲無界隊列,這裏咱們也只討論無界的狀況——所以,offer()方法必返回true。
  • poll():出隊,有元素返回元素,沒有就返回null。

同時,理想的線程安全隊列中,入隊和出隊之間不該該存在競爭,這樣入隊的狀態機模型和出隊的狀態機模型能夠徹底解耦,互不影響。

對咱們的狀態機做出兩個假設:

  • 假設1:只支持這入隊、出隊兩種行爲。
  • 假設2:入隊、出隊之間不存在競爭,即入隊模型與出隊模型是對偶、獨立的兩個狀態機。

從而,能夠先分析入隊,再參照分析出隊;而後可嘗試去掉假設2,看如何完善咱們的實現來保證假設2成立;最後看看真·神Doug Lea如何實現,學習一波

狀態機定義

如今基於假設1和假設2,嘗試定義入隊模型的狀態機。

咱們構造一個簡化的場景:存在2個生產者P一、P2,同時觸發入隊操做

狀態集

若是是單線程環境,入隊操做將是這樣的:

// 準備
newNode.next = null;
curTail = tail;

// 入隊前
assert tail == curTail && tail.next == null; // 狀態S1
// 開始入隊
tail.next = newNode; // 事件E1
// 入隊中
assert tail == curTail && tail.next == newNode; // 狀態S2
tail = tail.next; // 事件E2
// 結束入隊
// 入隊後
assert tail == newNode && tail.next == null; // 狀態S3,合併到狀態S1
複製代碼

該過程涉及對兩個域的修改:tail.next、tail。則隨着操做的進行,隊列會經歷2種狀態:

  • 狀態S1:事件E1執行前,tail指向實際的尾節點curTail,tail.next==null。如生產者P一、P2都尚未觸發入隊時,隊列處於狀態S1;生產者P1完成入隊P2還沒觸發入隊時,隊列處於狀態S1。
  • 狀態S2:事件E1執行後、E2執行前,tail指向舊的尾節點curTail,tail.next==newNode。
  • 狀態S3:事件E2執行後,tail指向新的尾節點newNode,tail.next==null。同狀態S1,合併。

狀態轉換集

兩個事件分別對應兩個狀態轉換:

  • 狀態轉換T1:S1->S2,即tail.next = newNode。
  • 狀態轉換T2:S2->S1,即tail = tail.next。

是否是很熟悉?由於ConcurrentLinkedQueue也是隊列,必然同BlockingQueue類似甚至相同。區別在於如何維護這些狀態和狀態轉換。

自擼ConcurrentLinkedQueue

依賴CAS,兩個狀態轉換T一、T2均可以實現爲原子操做。留給咱們的問題是,如何維護合法的狀態轉換。

入隊方法offer()

入隊過程須要通過兩個狀態轉換,且這兩個狀態轉換必須連續發生。

不嚴謹。「連續」並非必要的,最後分析源碼的時候會看到。不過,咱們暫時使用強一致性的模型。

思路1:讓同一個生產者P1連續完成兩個狀態轉換T一、T2,保證P2不會插入進來

LinkedBlockingQueue的思路便是如此。這是一種悲觀策略——一次開門只放進來一個生產者,彷佛只能像LinkedBlockingQueue那樣,用傳統的鎖putLock實現,實際上,依靠CAS也能實現:

public class ConcurrentLinkedQueue1<E> {
  private volatile Node<E> tail;

  public ConcurrentLinkedQueue1() {
    throw new UnsupportedOperationException("Not implement");
  }

  public boolean offer(E e) {
    Node<E> newNode = new Node<E>(e, new AtomicReference<>(null));
    while (true) {
      Node<E> curTail = tail;
      AtomicReference<Node<E>> curNext = curTail.next;
      // 嘗試T1:CAS設置tail.next
      if (curNext.compareAndSet(null, newNode)) {
        // 成功者視爲得到獨佔鎖,完成了T1。直接執行T2:設置tail
        tail = curNext.get();
        return true;
      }
      // 失敗者自旋等待
    }
  }

  private static class Node<E> {
    private volatile E item;
    private AtomicReference<Node<E>> next;

    public Node(E item, AtomicReference<Node<E>> next) {
      this.item = item;
      this.next = next;
    }
  }
}
複製代碼

思路2:生產者P1完成狀態轉換T1後,P2代勞完成狀態轉換T2

再來分析下T一、T2兩個狀態轉換:

  • T1涉及newNode,只能由封閉持有newNode的生產者P1完成
  • T2只涉及隊列中的信息,任何持有隊列的生產者都有能力完成。P1能夠,P2也能夠

思路1是悲觀的,認爲T一、T2必須都由P1完成,若是P2插入就會「搞破壞」。而思路2則打開大門,歡迎任何「有能力」的生產者完成T2,是典型的樂觀策略。

public class ConcurrentLinkedQueue2<E> {
  private AtomicReference<Node<E>> tail;

  public ConcurrentLinkedQueue2() {
    throw new UnsupportedOperationException("Not implement");
  }

  public boolean offer(E e) {
    Node<E> newNode = new Node<E>(e, new AtomicReference<>(null));
    while (true) {
      Node<E> curTail = tail.get();
      AtomicReference<Node<E>> curNext = curTail.next;
      // 嘗試T1:CAS設置tail.next
      if (curNext.compareAndSet(null, newNode)) {
        // 成功者完成了T1,隊列處於S2,繼續嘗試T2:CAS設置tail
        tail.compareAndSet(curTail, curNext.get());
        // 成功表示該生產者P1完成連續完成了T一、T2,隊列處於S1
        // 失敗表示T2已經由生產者P2完成,隊列處於S1
        return true;
      }
      // 失敗者得知隊列處於S2,則嘗試T2:CAS設置tail
      tail.compareAndSet(curTail, curNext.get());
      // 若是成功,隊列轉換到S1;若是失敗,隊列表示T2已經由生產者P1完成,隊列已經處於S1
      // 而後循環,從新嘗試T1
    }
  }

  private static class Node<E> {
    private volatile E item;
    private AtomicReference<Node<E>> next;

    public Node(E item, AtomicReference<Node<E>> next) {
      this.item = item;
      this.next = next;
    }
  }
}
複製代碼

減小無效的競爭

咱們涉及的狀態比較少(只有2個狀態),繼續看看可否減小無效的競爭,好比:

  • 前兩種實現的第一步都是CAS嘗試T1,失敗了就退化成一次探查(compare and swap中的compare)。發起CAS前,可能隊列已經處於S2,這時CAS嘗試T1就成了浪費,只須要探查便可。這有點像DCL單例的思路(面試中單例模式有幾種寫法?),能夠直接經過tail.next判斷隊列是否處於S1,來完成一部分探查,以減小無效的競爭
public class ConcurrentLinkedQueue3<E> {
  private AtomicReference<Node<E>> tail;

  public ConcurrentLinkedQueue3() {
    throw new UnsupportedOperationException("Not implement");
  }

  public boolean offer(E e) {
    Node<E> newNode = new Node<E>(e, new AtomicReference<>(null));
    while (true) {
      Node<E> curTail = tail.get();
      AtomicReference<Node<E>> curNext = curTail.next;

      // 先檢查一下隊列狀態的狀態,tail.next==null表示隊列處於狀態S1,僅此時纔有CAS嘗試T1的必要
      if (curNext.get() == null) {
        // 若是處於S1,嘗試T1:CAS設置tail.next
        if (curNext.compareAndSet(null, newNode)) {
          // 成功者完成了T1,隊列處於S2,繼續嘗試T2:CAS設置tail
          tail.compareAndSet(curTail, curNext.get());
          // 成功表示該生產者P1完成連續完成了T一、T2,隊列處於S1
          // 失敗表示T2已經由生產者P2完成,隊列處於S1
          return true;
        }
      }
      // 不然隊列處於處於S2,或CAS嘗試T1的失敗者得知隊列處於S2,則嘗試T2:CAS設置tail
      tail.compareAndSet(curTail, curNext.get());
      // 若是成功,隊列轉換到S1;若是失敗,隊列表示T2已經由生產者P1完成,隊列已經處於S1
      // 而後循環,從新嘗試T1
    }
  }

  private static class Node<E> {
    private volatile E item;
    private AtomicReference<Node<E>> next;

    public Node(E item, AtomicReference<Node<E>> next) {
      this.item = item;
      this.next = next;
    }
  }
}
複製代碼

注意,上述實現中,while代碼塊後都沒有返回值。這是被編譯器容許的,由於編譯器能夠分析出,該方法不可能運行到while代碼塊以後,因此while代碼塊後的返回值語句也是無效的。

出隊方法poll()

對偶的構造一個簡化的場景:存在2個消費者C一、C2,同時觸發出隊操做

不須要考慮悲觀策略和優化方案,咱們嘗試基於思路2的第一種實現擼一版基礎的poll()方法。

而後,,,沒擼動。想了一下,樸素鏈表(如LinkedList)中,直接用head表示維護頭結點沒法區分「已取出item未移動head指針」和「未取出item未移動head指針」(同「已取出item已移動head指針」)兩種狀態。因此仍是寫一寫才知道深淺啊,碰巧前兩天寫了BlockingQueue的分析,dummy node正好派上用場。

隊列初始化以下:

dummy = new Node(null, null);
// tail = dummy; // 後面會用到
// head = dummy.next; // dummy.next 表示實際的頭結點,但咱們不須要存儲它
複製代碼

狀態機

單線程環境的出隊過程:

// 準備
curDummy = dummy;
curNext = curDummy.next;
oldItem = curNext.item;

// 出隊前
assert dummy == curDummy && dummy.next.item == oldItem; // 狀態S1
// 開始出隊
dummy.next.item = null; // 事件E1
// 出隊中
assert dummy == curDummy && dummy.next.item == null; // 狀態S2
dummy = dummy.next; // 事件E2
// 結束出隊
// 出隊後
assert dummy == curNext && dummy.next.item != null; // 狀態S3,合併到狀態S1
複製代碼

狀態:

  • 狀態S1:事件E1執行前,dummy指向實際的dummy節點curDummy,dummy.next.item== oldItem。如消費者C一、C2都尚未觸發出隊時,隊列處於狀態S1;消費者C1完成入隊C2還沒觸發出隊時,隊列處於狀態S1。
  • 狀態S2:事件E1執行後、E2執行前,dummy指向舊的dummy節點curDummy,dummy.next.item==null。
  • 狀態S3:事件E2執行後,dummy指向新的dummy節點curNext,dummy.next.item!=null。這在本質上同狀態S1是一致的,合併。

狀態轉換:

  • 狀態轉換T1:S1->S2,即dummy.next.item = null。
  • 狀態轉換T2:S2->S1,即dummy = dummy.next。

代碼

public class ConcurrentLinkedQueue4<E> {
  private AtomicReference<Node<E>> dummy;

  public ConcurrentLinkedQueue4() {
    dummy = new AtomicReference<>(new Node<>(null, null));
  }

  public E poll() {
    while (true) {

      Node<E> curDummy = dummy.get();
      Node<E> curNext = curDummy.next;
      E oldItem = curNext.item.get();
      // 嘗試T1:CAS設置dummy.next.item
      if (curNext.item.compareAndSet(oldItem, null)) {
        // 成功者完成了T1,隊列處於S2,繼續嘗試T2:CAS設置dummy
        dummy.compareAndSet(curDummy, curNext);
        // 成功表示該消費者C1完成連續完成了T一、T2,隊列處於S1
        // 失敗表示T2已經由消費者C2完成,隊列處於S1
        return oldItem;
      }
      // 失敗者得知隊列處於S2,則嘗試T2:CAS設置dummy
      dummy.compareAndSet(curDummy, curNext);
      // 若是成功,隊列轉換到S1;若是失敗,隊列表示T2已經由消費者P1完成,隊列已經處於S1
      // 而後循環,從新嘗試T1
    }
  }

  private static class Node<E> {
    private AtomicReference<E> item;
    private volatile Node<E> next;

    public Node(AtomicReference<E> item, Node<E> next) {
      this.item = item;
      this.next = next;
    }
  }
}
複製代碼

另外一種狀態機

實際上,前面的討論有意迴避了一個問題——若是入隊/出隊操做順序不一樣,咱們會構造出不一樣的狀態機。這至關於同一個類的另外一種實現,不違反前面做出的聲明:

狀態機模型與是否須要併發無關,一個類無論是不是線程安全的,其狀態機模型從類被實現(此時,全部類行爲都是肯定的)開始就是肯定的。

繼續以出隊爲例,假設在單線程下,採用這樣的順序出隊:

// 準備
curDummy = dummy;
curNext = curDummy.next;
oldItem = curNext.item;

// 出隊前
assert dummy == curDummy && dummy.item == null; // 狀態S1

// 開始出隊
dummmy = dummy.next; // 事件E1
// 出隊中
assert dummy == curNext && dummy.item == oldItem; // 狀態S2
dummy.item = null; // 事件E2
// 結束出隊

// 出隊後
assert dummy == curNext && dummy.item == null; // 狀態S3,合併到狀態S1
複製代碼

看起來,這樣的操做順序更容易定義各狀態:

  • 狀態S1:事件E1執行前,dummy指向實際的dummy節點curDummy,dummy.item == null。如消費者C一、C2都尚未觸發出隊時,隊列處於狀態S1;消費者C1完成入隊C2還沒觸發出隊時,隊列處於狀態S1。
  • 狀態S2:事件E1執行後、E2執行前,dummy指向新的dummy節點curNext,dummy.item == oldItem。
  • 狀態S3:事件E2執行後,dummy指向新的dummy節點curNext,dummy.item == null。顯然同狀態S1,合併。

狀態轉換:

  • 狀態轉換T1:S1->S2,即dummmy = dummy.next。
  • 狀態轉換T2:S2->S1,即dummy.item = null。

實現以下:

public class ConcurrentLinkedQueue5<E> {
  private AtomicReference<Node<E>> dummy;

  public ConcurrentLinkedQueue5() {
    dummy = new AtomicReference<>(new Node<>(null, null));
  }

  public E poll() {
    while (true) {

      Node<E> curDummy = dummy.get();
      Node<E> curNext = curDummy.next;
      E oldItem = curNext.item.get();
      // 嘗試T1:CAS設置dummmy
      if (dummy.compareAndSet(curDummy, curNext)) {
        // 成功者完成了T1,隊列處於S2,繼續嘗試T2:CAS設置dummy.item
        curDummy.item.compareAndSet(oldItem, null);
        // 成功表示該消費者C1完成連續完成了T一、T2,隊列處於S1
        // 失敗表示T2已經由消費者C2完成,隊列處於S1
        return oldItem;
      }
      // 失敗者得知隊列處於S2,則嘗試T2:CAS設置dummy.item
      curDummy.item.compareAndSet(oldItem, null);
      // 若是成功,隊列轉換到S1;若是失敗,隊列表示T2已經由消費者P1完成,隊列已經處於S1
      // 而後循環,從新嘗試T1
    }
  }

  private static class Node<E> {
    private AtomicReference<E> item;
    private volatile Node<E> next;

    public Node(AtomicReference<E> item, Node<E> next) {
      this.item = item;
      this.next = next;
    }
  }
}
複製代碼

一個trick

實現上面狀態機的過程當中,我想出了一個針對出隊操做的trick:能夠去掉dummy node,用head維護頭結點+一步狀態轉換完成出隊

對啊,我寫着寫着又擼出來了。。。

去掉了dummy node,那麼head.item的初始狀態就是非空的,下面是簡化的狀態機。

單線程出隊的操做順序:

// 準備
curHead = head;
curNext = curHead.next;
oldItem = curHead.item;

// 出隊前
assert head == curHead; // 狀態S1

// 出隊
head = head.next; // 事件E1

// 出隊後
assert head == curNext; // 狀態S2,合併到狀態S1
複製代碼

出隊只須要嘗試head後移,成功者可從舊的頭結點curHead中取出item,以後curHead將被廢棄;失敗者再從新嘗試便可。若是在嘗試前就獲得了item的引用,那麼E1發生後,無論成功與否,在curHead上作什麼都是無所謂的了,由於事實上沒有任何消費者會再去訪問它。

這是一個單狀態的狀態機,則狀態:

  • 狀態S1:head指向實際的頭節點curHead。隊列始終處於狀態S1。
  • 狀態S2:head指向新的頭節點curNext。同S1,合併

狀態轉換:

  • 狀態轉換T1:S1->S1,即head = head.next。

實現以下:

public class ConcurrentLinkedQueue6<E> {
  private AtomicReference<Node<E>> head;

  public ConcurrentLinkedQueue6() {
    throw new UnsupportedOperationException("Not implement");
  }

  public E poll() {
    while (true) {

      Node<E> curHead = head.get();
      Node<E> curNext = curHead.next;
      // 嘗試T1:CAS設置head
      if (head.compareAndSet(curHead, curNext)) {
        // 成功者完成了T1,隊列處於S1
        return curHead.item; // 只讓成功者取出item
      }
      // 失敗者重試嘗試
    }
  }

  private static class Node<E> {
    private volatile E item;
    private volatile Node<E> next;

    public Node(E item, Node<E> next) {
      this.item = item;
      this.next = next;
    }
  }
}
複製代碼

其餘特殊狀況

前面都是基於假設2「入隊、出隊無競爭」討論的。如今須要放開假設2,看如何完善已有的實現以保證假設2成立。或者若是不能保證假設2的話,如何解決競爭問題。

根據對LinkedBlockingQueue的分析,咱們得知,若是底層數據結構是樸素鏈表,那麼隊列空或長度爲1的時候,head、tail都指向同一個節點(或都爲null),這時必然存在競爭;dummy node較好的解決了這一問題。ConcurrentLinkedQueue4是基於dummy node的方案,咱們嘗試在此基礎上修改。

回顧dummy node的使用方法(配合ConcurrentLinkedQueue2和ConcurrentLinkedQueue4作了調整和精簡):

  • 初始化鏈表時,建立dummy node:
    • dummy = new Node(null, null)
    • // head = dummy.next // head 爲 null <=> 隊列空
    • tail = dummy // tail.item 爲 null <=> 隊列空
  • 在隊尾入隊時,tail後移:
    • tail.next = new Node(newItem, null)
    • tail = tail.next
  • 在隊頭出隊時,dummy後移,同步更新head:
    • oldItem = dummy.next.item // == head.item
    • dummy.next.item = null
    • dummy = dummy.next
    • // head = dummy.next
    • return oldItem

下面分狀況討論。

case1:隊列空

隊列空時,隊列處於一個特殊的狀態,從該狀態出發,僅能完成入隊相關的狀態轉換——通俗講就是隊列空時只容許入隊操做。這時消除競爭很簡單,只容許入隊不容許出隊便可:

public class ConcurrentLinkedQueue7<E> {
  private AtomicReference<Node<E>> dummy;
  private AtomicReference<Node<E>> tail;

  public ConcurrentLinkedQueue7() {
    Node<E> initNode = new Node<E>(
        new AtomicReference<E>(null), new AtomicReference<Node<E>>(null));
    dummy = new AtomicReference<>(initNode);
    tail = new AtomicReference<>(initNode);
    // Node<E> head = dummy.get().next.get();
  }

  public boolean offer(E e) {
    Node<E> newNode = new Node<E>(new AtomicReference<>(e), new AtomicReference<>(null));
    while (true) {
      Node<E> curTail = tail.get();
      AtomicReference<Node<E>> curNext = curTail.next;
      if (curNext.compareAndSet(null, newNode)) {
        tail.compareAndSet(curTail, curNext.get());
        return true;
      }
      tail.compareAndSet(curTail, curNext.get());
    }
  }

  public E poll() {
    while (true) {
      Node<E> curDummy = dummy.get();
      Node<E> curNext = curDummy.next.get();
      // 既能夠用 dummy.next == null (head) 判空,也能夠用 tail.item == null
      // 不過鑑於處於poll()方法中,使用 dummy.next 可讀性更好
      if (curNext == null) {
        return null;
      }
      E oldItem = curNext.item.get();
      if (curNext.item.compareAndSet(oldItem, null)) {
        dummy.compareAndSet(curDummy, curNext);
        return oldItem;
      }
      dummy.compareAndSet(curDummy, curNext);
    }
  }

  private static class Node<E> {
    private AtomicReference<E> item;
    private AtomicReference<Node<E>> next;

    public Node(AtomicReference<E> item, AtomicReference<Node<E>> next) {
      this.item = item;
      this.next = next;
    }
  }
}
複製代碼

ConcurrentLinkedQueue7須要原子的操做item和next,所以Node的item、next域都被聲明爲了AtomicReference。

隊列空的時候:offer()方法同ConcurrentLinkedQueue2#offer(),不須要作特殊處理;poll()方法在ConcurrentLinkedQueue4#poll()的基礎上,增長了32-34行的隊列空檢查。須要注意的是,檢查必須放在隊列轉換的過程當中,防止消費者C2第一次嘗試時隊列非空,但第二次嘗試時隊列變空(因爲C1取出了惟一的元素)的狀況。

case2:隊列長度等於1

隊列長度等於1時,入隊與出隊不會同時修改同一節點,這時必定不會發生競爭。分析以下。

假設存在一個生產者P1,一個消費者C1,同時觸發入隊/出隊,隊列中只有一個元素,因此只兩個節點dummyNode、singleNode則此時:

assert dummy == dummyNode;
assert dummy.next.item == singleNode.item;

assert tail == singleNode;
assert tail.next == singleNode.next;
複製代碼

回顧ConcurrentLinkedQueue7的實現:

  • poll()方法修改引用dummy、singleNode.item
  • offer()方法操tail、singleNode.next

所以,因爲dummy node的引入,隊列長度爲1時,入隊、出隊之間天生就不存在競爭。

小結

至此,咱們從最簡單的場景觸發,基於狀態機實現了一個支持高性能offer()、poll()方法的ConcurrentLinkedQueue7。CAS的好處暫且不表,重要的是基於狀態機進行併發程序設計的思想。只有抓住其狀態機的本質,才能設計出正確、高效的併發類。

若是仍是沒有體會到狀態機的精妙之處,能夠拋開狀態機,並本身嘗試基於樂觀策略實現ConcurrentLinkedQueue。(之因此要基於樂觀策略,是由於悲觀策略能夠認爲是樂觀策略的是特例,容易讓人忽略其狀態機的本質)

JDK實現

但願看到這裏,你已經理解了ConcurrentLinkedQueue的狀態機本質,由於下面就再也不是本文的重點。

真·神Doug Lea的實現基於一個弱一致性的狀態機:容許隊列處於多種不一致的狀態,經過恰當的選擇「不一致的狀態」,能作到用戶無感;雖然增長了狀態機的複雜度,但也進一步提升了性能。

網上分析文章很是多,讀者可自行閱讀,有必定難度。本文不打算講解Doug Lea的實現,貼出源碼僅供你們膜拜

構造方法

經常使用的是默認的空構造函數:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {
...
    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
...
}
複製代碼

Doug Lea也使用了dummy node,不過命名爲了head。初始化方法同咱們實現的ConcurrentLinkedQueue7。

入隊方法offer()

ConcurrentLinkedQueue7#offer()至關於ConcurrentLinkedQueue#offer()的一個特例。

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            if (p.casNext(null, newNode)) {
                if (p != t)
                    casTail(t, newNode);
                return true;
            }
        }
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        else
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
複製代碼

具體來說,ConcurrentLinkedQueue容許的多個狀態大致是這樣的:

  • 狀態S1:一致;newNode已銜接在tail.next,但tail指向倒數第1個節點
  • 狀態S2:不一致;newNode已銜接在tail.next,但tail指向倒數第2個節點
  • 狀態S3:不一致;newNode已銜接在tail.next,但tail指向倒數第3個節點
  • ...

狀態轉換的規則也隨之打破——再也不須要連續完成T一、T2,能夠連續執行屢次類T1,最後執行一次類T2

for循環中的幾個分支就是在處理這些一致和不一致的狀態。咱們前面定義的狀態機空間中只容許狀態S一、S2,所以是一個子集。增長的這些不一致的狀態主要是爲了減小CAS次數,進一步提升隊列性能,這包含兩個重要意義:

  • 下降延遲:部分入隊請求再也不須要走完完整的狀態轉換,只須要循環到tail.next.cas(null, newNode)成功。
  • 提升吞吐:以前每一次入隊請求都要設置一次tail節點;目前只須要積攢幾回入隊,在某個新的newNode入隊時,直接嘗試tail.cas(t, newNode),將tail跳躍到最新的newNode。

增長這些不一致的狀態是很危險的,如S3,當隊列長度爲1的時候,tail與head的位置存在交叉。Doug Lea牛逼之處在於,在保證正確性的前提下,不只經過增長狀態提升了性能,還減小了實際的CAS次數。

出隊方法poll()

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                if (p != h)
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}
複製代碼

分析方法相似於offer()。注意下updateHead()。

未完

原本是想分析ConcurrentLinkedQueue源碼的,沒想到寫完狀態機就3600多字了,乾貨卻很少。前路漫漫,源碼咱下回見。


本文連接:源碼|併發一枝花之ConcurrentLinkedQueue【僞】
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。

相關文章
相關標籤/搜索