併發容器之ConcurrentLinkedQueue

1.ConcurrentLinkedQueue簡介

在單線程編程中咱們會常常用到一些集合類,好比ArrayList,HashMap等,可是這些類都不是線程安全的類。在面試中也常常會有一些考點,好比ArrayList不是線程安全的,Vector是線程安全。而保障Vector線程安全的方式,是很是粗暴的在方法上用synchronized獨佔鎖,將多線程執行變成串行化。要想將ArrayList變成線程安全的也可使用Collections.synchronizedList(List<T> list)方法ArrayList轉換成線程安全的,但這種轉換方式依然是經過synchronized修飾方法實現的,很顯然這不是一種高效的方式,同時,隊列也是咱們經常使用的一種數據結構,爲了解決線程安全的問題,Doug Lea大師爲咱們準備了ConcurrentLinkedQueue這個線程安全的隊列。從類名就能夠看的出來實現隊列的數據結構是鏈式。html

1.1 Node

要想先學習ConcurrentLinkedQueue天然而然得先從它的節點類看起,明白它的底層數據結構。Node類的源碼爲:java

private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
		.......
}
複製代碼

Node節點主要包含了兩個域:一個是數據域item,另外一個是next指針,用於指向下一個節點從而構成鏈式隊列。而且都是用volatile進行修飾的,以保證內存可見性(關於volatile能夠看這篇文章)。另外ConcurrentLinkedQueue含有這樣兩個成員變量:node

private transient volatile Node<E> head;
private transient volatile Node<E> tail;
複製代碼

說明ConcurrentLinkedQueue經過持有頭尾指針進行管理隊列。當咱們調用無參構造器時,其源碼爲:面試

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
複製代碼

head和tail指針會指向一個item域爲null的節點,此時ConcurrentLinkedQueue狀態以下圖所示:編程

如圖,head和tail指向同一個節點Node0,該節點item域爲null,next域爲null。安全

1.ConcurrentLinkedQueue初始化狀態.png

1.2 操做Node的幾個CAS操做

在隊列進行出隊入隊的時候免不了對節點須要進行操做,在多線程就很容易出現線程安全的問題。能夠看出在處理器指令集可以支持CMPXCHG指令後,在java源碼中涉及到併發處理都會使用CAS操做(關於CAS操做能夠看這篇文章的第3.1節),那麼在ConcurrentLinkedQueue對Node的CAS操做有這樣幾個:數據結構

//更改Node中的數據域item	
boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更改Node中的指針域next
void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更改Node中的指針域next
boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
複製代碼

能夠看出這些方法其實是經過調用UNSAFE實例的方法,UNSAFE爲sun.misc.Unsafe類,該類是hotspot底層方法,目前爲止瞭解便可,知道CAS的操做歸根結底是由該類提供就好。多線程

2.offer方法

對一個隊列來講,插入知足FIFO特性,插入元素老是在隊列最末尾的地方進行插入,而取(移除)元素老是從隊列的隊頭。全部要想可以完全弄懂ConcurrentLinkedQueue天然而然是從offer方法和poll方法開始。那麼爲了可以理解offer方法,採用debug的方式來一行一行的看代碼走。另外,在看多線程的代碼時,可採用這樣的思惟方式:併發

單個線程offer 多個線程offer 部分線程offer,部分線程poll ----offer的速度快於poll --------隊列長度會愈來愈長,因爲offer節點老是在對隊列隊尾,而poll節點老是在隊列對頭,也就是說offer線程和poll線程二者並沒有「交集」,也就是說兩類線程間並不會相互影響,這種狀況站在相對速率的角度來看,也就是一個"單線程offer" ----offer的速度慢於poll --------poll的相對速率快於offer,也就是隊頭刪的速度要快於隊尾添加節點的速度,致使的結果就是隊列長度會愈來愈短,而offer線程和poll線程就會出現「交集」,即那一時刻就能夠稱之爲offer線程和poll線程同時操做的節點爲 臨界點 ,且在該節點offer線程和poll線程一定相互影響。根據在臨界點時offer和poll發生的相對順序又可從兩個角度去思考:1. 執行順序爲offer-->poll-->offer,即表現爲當offer線程在Node1後插入Node2時,此時poll線程已經將Node1刪除,這種狀況很顯然須要在offer方法中考慮; 2.執行順序可能爲:poll-->offer-->poll,即表現爲當poll線程準備刪除的節點爲null時(隊列爲空隊列),此時offer線程插入一個節點使得隊列變爲非空隊列高併發

先看這麼一段代碼:

1. ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
2. queue.offer(1);
3. queue.offer(2);
複製代碼

建立一個ConcurrentLinkedQueue實例,先offer 1,而後再offer 2。offer的源碼爲:

public boolean offer(E e) {
1.    checkNotNull(e);
2.    final Node<E> newNode = new Node<E>(e);

3.    for (Node<E> t = tail, p = t;;) {
4.        Node<E> q = p.next;
5.        if (q == null) {
6.            // p is last node
7.            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
               // and for newNode to become "live".
8.                if (p != t) // hop two nodes at a time
9.                    casTail(t, newNode);  // Failure is OK.
10.                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
11.        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
12.            p = (t != (t = tail)) ? t : head;
           else
            // Check for tail updates after two hops.
13.            p = (p != t && t != (t = tail)) ? t : q;
    }
}
複製代碼

單線程執行角度分析

先從單線程執行的角度看起,分析offer 1的過程。第1行代碼會對是否爲null進行判斷,爲null的話就直接拋出空指針異常,第2行代碼將e包裝成一個Node類,第3行爲for循環,只有初始化條件沒有循環結束條件,這很符合CAS的「套路」,在循環體CAS操做成功會直接return返回,若是CAS操做失敗的話就在for循環中不斷重試直至成功。這裏實例變量t被初始化爲tail,p被初始化爲t即tail。爲了方便下面的理解,p被認爲隊列真正的尾節點,tail不必定指向對象真正的尾節點,由於在ConcurrentLinkedQueue中tail是被延遲更新的,具體緣由咱們慢慢來看。代碼走到第3行的時候,t和p都分別指向初始化時建立的item域爲null,next域爲null的Node0。第4行變量q被賦值爲null,第5行if判斷爲true,在第7行使用casNext將插入的Node設置成當前隊列尾節點p的next節點,若是CAS操做失敗,這次循環結束在下次循環中進行重試。CAS操做成功走到第8行,此時p==t,if判斷爲false,直接return true返回。若是成功插入1的話,此時ConcurrentLinkedQueue的狀態以下圖所示:

2.offer 1後隊列的狀態.png

如圖,此時隊列的尾節點應該爲Node1,而tail指向的節點依然仍是Node0,所以能夠說明tail是延遲更新的。那麼咱們繼續來看offer 2的時候的狀況,很顯然此時第4行q指向的節點不爲null了,而是指向Node1,第5行if判斷爲false,第11行if判斷爲false,代碼會走到第13行。好了,再插入節點的時候咱們會問本身這樣一個問題?上面已經解釋了tail並非指向隊列真正的尾節點,那麼在插入節點的時候,咱們是否是應該最開始作的就是找到隊列當前的尾節點在哪裏才能插入?那麼第13行代碼就是找出隊列真正的尾節點

定位隊列真正的對尾節點

p = (p != t && t != (t = tail)) ? t : q;
複製代碼

咱們來分析一下這行代碼,若是這段代碼在單線程環境執行時,很顯然因爲p==t,此時p會被賦值爲q,而q等於Node<E> q = p.next,即Node1。在第一次循環中指針p指向了隊列真正的隊尾節點Node1,那麼在下一次循環中第4行q指向的節點爲null,那麼在第5行中if判斷爲true,那麼在第7行依然經過casNext方法設置p節點的next爲當前新增的Node,接下來走到第8行,這個時候p!=t,第8行if判斷爲true,會經過casTail(t, newNode)將當前節點Node設置爲隊列的隊尾節點,此時的隊列狀態示意圖以下圖所示:

3.隊列offer 2後的狀態.png

tail指向的節點由Node0改變爲Node2,這裏的casTail失敗不須要重試的緣由是,offer代碼中主要是經過p的next節點q(Node<E> q = p.next)決定後面的邏輯走向的,當casTail失敗時狀態示意圖以下:

4.隊列進行入隊操做後casTail失敗後的狀態圖.png

如圖,若是這裏casTail設置tail失敗即tail仍是指向Node0節點的話,無非就是多循環幾回經過13行代碼定位到隊尾節點

經過對單線程執行角度進行分析,咱們能夠了解到poll的執行邏輯爲:

  1. 若是tail指向的節點的下一個節點(next域)爲null的話,說明tail指向的節點即爲隊列真正的隊尾節點,所以能夠經過casNext插入當前待插入的節點,但此時tail並未變化,如圖2;

  2. 若是tail指向的節點的下一個節點(next域)不爲null的話,說明tail指向的節點不是隊列的真正隊尾節點。經過q(Node<E> q = p.next)指針往前遞進去找到隊尾節點,而後經過casNext插入當前待插入的節點,並經過casTail方式更改tail,如圖3

咱們回過頭再來看p = (p != t && t != (t = tail)) ? t : q;這行代碼在單線程中,這段代碼永遠不會將p賦值爲t,那麼這麼寫就不會有任何做用,那咱們試着在多線程的狀況下進行分析。

多線程執行角度分析

多個線程offer

很顯然這麼寫另有深意,其實在多線程環境下這行代碼頗有意思的。 t != (t = tail)這個操做並不是一個原子操做,有這樣一種狀況:

5.線程A和線程B有可能的執行時序.png

如圖,假設線程A此時讀取了變量t,線程B恰好在這個時候offer一個Node後,此時會修改tail指針,那麼這個時候線程A再次執行t=tail時t會指向另一個節點,很顯然線程A先後兩次讀取的變量t指向的節點不相同,即t != (t = tail)爲true,而且因爲t指向節點的變化p != t也爲true,此時該行代碼的執行結果爲p和t最新的t指針指向了同一個節點,而且此時t也是隊列真正的對尾節點。那麼,如今已經定位到隊列真正的隊尾節點,就能夠執行offer操做了。

offer->poll->offer

那麼還剩下第11行的代碼咱們沒有分析,大體能夠猜測到應該就是回答一部分線程offer,一部分poll的這種狀況。當if (p == q)爲true時,說明p指向的節點的next也指向它本身,這種節點稱之爲哨兵節點這種節點在隊列中存在的價值不大,通常表示爲要刪除的節點或者是空節點。爲了可以很好的理解這種狀況,咱們先看看poll方法的執行過程後,再回過頭來看,總之這是一個頗有意思的事情 :)。

3.poll方法

poll方法源碼以下:

public E poll() {
    restartFromHead:
    1. for (;;) {
    2.    for (Node<E> h = head, p = h, q;;) {
    3.        E item = p.item;

    4.        if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
    5.            if (p != h) // hop two nodes at a time
    6.                updateHead(h, ((q = p.next) != null) ? q : p);
    7.            return item;
            }
    8.        else if ((q = p.next) == null) {
    9.            updateHead(h, p);
    10.            return null;
            }
    11.        else if (p == q)
    12.            continue restartFromHead;
            else
    13.            p = q;
        }
    }
}
複製代碼

咱們仍是先站在單線程的角度去理清該方法的基本邏輯。假設ConcurrentLinkedQueue初始狀態以下圖所示:

6.隊列初始狀態.png

參數offer時的定義,咱們仍是先將變量p做爲隊列要刪除真正的隊頭節點,h(head)指向的節點並不必定是隊列的隊頭節點。先來看poll出Node1時的狀況,因爲p=h=head,參照上圖,很顯然此時p指向的Node1的數據域不爲null,在第4行代碼中item!=null判斷爲true後接下來經過casItem將Node1的數據域設置爲null。若是CAS設置失敗則這次循環結束等待下一次循環進行重試。若第4行執行成功進入到第5行代碼,此時p和h都指向Node1,第5行if判斷爲false,而後直接到第7行return回Node1的數據域1,方法運行結束,此時的隊列狀態以下圖。

7.隊列出隊操做後的狀態.png

下面繼續從隊列中poll,很顯然當前h和p指向的Node1的數據域爲null,那麼第一件事就是要定位準備刪除的隊頭節點(找到數據域不爲null的節點)

定位刪除的隊頭節點

繼續看,第三行代碼item爲null,第4行代碼if判斷爲false,走到第8行代碼(q = p.next)if也爲false,因爲q指向了Node2,在第11行的if判斷也爲false,所以代碼走到了第13行,這個時候p和q共同指向了Node2,也就找到了要刪除的真正的隊頭節點。能夠總結出,定位待刪除的隊頭節點的過程爲:若是當前節點的數據域爲null,很顯然該節點不是待刪除的節點,就用當前節點的下一個節點去試探。在通過第一次循環後,此時狀態圖爲下圖:

8.通過一次循環後的狀態.png

進行下一次循環,第4行的操做同上述,當前假設第4行中casItem設置成功,因爲p已經指向了Node2,而h還依舊指向Node1,此時第5行的if判斷爲true,而後執行updateHead(h, ((q = p.next) != null) ? q : p),此時q指向的Node3,全部傳入updateHead方法的分別是指向Node1的h引用和指向Node3的q引用。updateHead方法的源碼爲:

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}
複製代碼

該方法主要是經過casHead將隊列的head指向Node3,而且經過 h.lazySetNext將Node1的next域指向它本身。最後在第7行代碼中返回Node2的值。此時隊列的狀態以下圖所示:

9.Node2從隊列中出隊後的狀態.png

Node1的next域指向它本身,head指向了Node3。若是隊列爲空隊列的話,就會執行到代碼的第8行(q = p.next) == null,if判斷爲true,所以在第10行中直接返回null。以上的分析是從單線程執行的角度去看,也可讓咱們瞭解poll的總體思路,如今來作一個總結:

  1. 若是當前head,h和p指向的節點的Item不爲null的話,說明該節點即爲真正的隊頭節點(待刪除節點),只須要經過casItem方法將item域設置爲null,而後將原來的item直接返回便可。

  2. 若是當前head,h和p指向的節點的item爲null的話,則說明該節點不是真正的待刪除節點,那麼應該作的就是尋找item不爲null的節點。經過讓q指向p的下一個節點(q = p.next)進行試探,若找到則經過updateHead方法更新head指向的節點以及構造哨兵節點(經過updateHead方法的h.lazySetNext(h)

接下來,按照上面分析offer的思惟方式,下面來分析一下多線程的狀況,第一種狀況是;

多線程執行狀況分析:

多個線程poll

如今回過頭來看poll方法的源碼,有這樣一部分:

else if (p == q)
    continue restartFromHead;
複製代碼

這一部分就是處理多個線程poll的狀況,q = p.next也就是說q永遠指向的是p的下一個節點,那麼什麼狀況下會使得p,q指向同一個節點呢?根據上面咱們的分析,只有p指向的節點在poll的時候轉變成了哨兵節點(經過updateHead方法中的h.lazySetNext)。當線程A在判斷p==q時,線程B已經將執行完poll方法將p指向的節點轉換爲哨兵節點而且head指向的節點已經發生了改變,因此就須要從restartFromHead處執行,保證用到的是最新的head。

poll->offer->poll

試想,還有這樣一種狀況,若是當前隊列爲空隊列,線程A進行poll操做,同時線程B執行offer,而後線程A在執行poll,那麼此時線程A返回的是null仍是線程B剛插入的最新的那個節點呢?咱們來寫一代demo:

public static void main(String[] args) {
    Thread thread1 = new Thread(() -> {
        Integer value = queue.poll();
        System.out.println(Thread.currentThread().getName() + " poll 的值爲:" + value);
        System.out.println("queue當前是否爲空隊列:" + queue.isEmpty());
    });
    thread1.start();
    Thread thread2 = new Thread(() -> {
        queue.offer(1);
    });
    thread2.start();
}
複製代碼

輸出結果爲:

Thread-0 poll 的值爲:null queue當前是否爲空隊列:false

經過debug控制線程thread1和線程thread2的執行順序,thread1先執行到第8行代碼if ((q = p.next) == null),因爲此時隊列爲空隊列if判斷爲true,進入if塊,此時先讓thread1暫停,而後thread2進行offer插入值爲1的節點後,thread2執行結束。再讓thread1執行,這時thread1並無進行重試,而是代碼繼續往下走,返回null,儘管此時隊列因爲thread2已經插入了值爲1的新的節點。因此輸出結果爲thread0 poll的爲null,然隊列不爲空隊列。所以,在判斷隊列是否爲空隊列的時候是不能經過線程在poll的時候返回爲null進行判斷的,能夠經過isEmpty方法進行判斷

4. offer方法中部分線程offer部分線程poll

在分析offer方法的時候咱們還留下了一個問題,即對offer方法中第11行代碼的理解。

offer->poll->offer

在offer方法的第11行代碼if (p == q),可以讓if判斷爲true的狀況爲p指向的節點爲哨兵節點,而何時會構造哨兵節點呢?在對poll方法的討論中,咱們已經找到了答案,即**當head指向的節點的item域爲null時會尋找真正的隊頭節點,等到待插入的節點插入以後,會更新head,而且將原來head指向的節點設置爲哨兵節點。**假設隊列初始狀態以下圖所示:

10.offer和poll相互影響分析時隊列初始狀態.png
所以在線程A執行offer時,線程B執行poll就會存在以下一種狀況:
11.線程A和線程B可能存在的執行時序.png

如圖,線程A的tail節點存在next節點Node1,所以會經過引用q往前尋找隊列真正的隊尾節點,當執行到判斷if (p == q)時,此時線程B執行poll操做,在對線程B來講,head和p指向Node0,因爲Node0的item域爲null,一樣會往前遞進找到隊列真正的隊頭節點Node1,在線程B執行完poll以後,Node0就會轉換爲哨兵節點,也就意味着隊列的head發生了改變,此時隊列狀態爲下圖。

12.線程B進行poll後隊列的狀態圖.png

此時線程A在執行判斷if (p == q)時就爲true,會繼續執行p = (t != (t = tail)) ? t : head;,因爲tail指針沒有發生改變因此p被賦值爲head,從新從head開始完成插入操做。

5. HOPS的設計

經過上面對offer和poll方法的分析,咱們發現tail和head是延遲更新的,二者更新觸發時機爲:

tail更新觸發時機:當tail指向的節點的下一個節點不爲null的時候,會執行定位隊列真正的隊尾節點的操做,找到隊尾節點後完成插入以後纔會經過casTail進行tail更新;當tail指向的節點的下一個節點爲null的時候,只插入節點不更新tail。

**head更新觸發時機:**當head指向的節點的item域爲null的時候,會執行定位隊列真正的隊頭節點的操做,找到隊頭節點後完成刪除以後纔會經過updateHead進行head更新;當head指向的節點的item域不爲null的時候,只刪除節點不更新head。

而且在更新操做時,源碼中會有註釋爲:hop two nodes at a time。因此這種延遲更新的策略就被叫作HOPS的大概緣由是這個(猜的 :)),從上面更新時的狀態圖能夠看出,head和tail的更新是「跳着的」即中間老是間隔了一個。那麼這樣設計的意圖是什麼呢?

若是讓tail永遠做爲隊列的隊尾節點,實現的代碼量會更少,並且邏輯更易懂。可是,這樣作有一個缺點,**若是大量的入隊操做,每次都要執行CAS進行tail的更新,彙總起來對性能也會是大大的損耗。若是能減小CAS更新的操做,無疑能夠大大提高入隊的操做效率,因此doug lea大師每間隔1次(tail和隊尾節點的距離爲1)進行才利用CAS更新tail。**對head的更新也是一樣的道理,雖然,這樣設計會多出在循環中定位隊尾節點,但整體來講讀的操做效率要遠遠高於寫的性能,所以,多出來的在循環中定位尾節點的操做的性能損耗相對而言是很小的。

參考資料

《java併發編程的藝術》

《Java高併發程序設計》

ConcurrentLinkedQueue博文:https://www.cnblogs.com/sunshine-2015/p/6067709.html

相關文章
相關標籤/搜索