在併發編程中咱們有時候須要使用線程安全的隊列。若是咱們要實現一個線程安全的隊列有兩種實現方式:一種是使用阻塞算法,另外一種是使用非阻塞算法。使用阻塞算法的隊列能夠用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不一樣的鎖)等方式來實現,而非阻塞的實現方式則可使用循環CAS的方式來實現,本文讓咱們一塊兒來研究下Doug Lea是如何使用非阻塞的方式來實現線程安全隊列ConcurrentLinkedQueue的,相信從大師身上咱們能學到很多併發編程的技巧。html
ConcurrentLinkedQueue是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部,當咱們獲取一個元素時,它會返回隊列頭部的元素。它採用了「wait-free」算法來實現,該算法在Michael & Scott算法上進行了一些修改, Michael & Scott算法的詳細信息能夠參見參考資料一。java
咱們經過ConcurrentLinkedQueue的類圖來分析一下它的結構。算法
(圖1)編程
ConcurrentLinkedQueue由head節點和tair節點組成,每一個節點(Node)由節點元素(item)和指向下一個節點的引用(next)組成,節點與節點之間就是經過這個next關聯起來,從而組成一張鏈表結構的隊列。默認狀況下head節點存儲的元素爲空,tair節點等於head節點。安全
private transient volatile Node<E> tail = head;
入隊列就是將入隊節點添加到隊列的尾部。爲了方便理解入隊時隊列的變化,以及head節點和tair節點的變化,每添加一個節點我就作了一個隊列的快照圖。併發
(圖二)spa
經過debug入隊過程並觀察head節點和tail節點的變化,發現入隊主要作兩件事情,第一是將入隊節點設置成當前隊列尾節點的下一個節點。第二是更新tail節點,若是tail節點的next節點不爲空,則將入隊節點設置成tail節點,若是tail節點的next節點爲空,則將入隊節點設置成tail的next節點,因此tail節點不老是尾節點,理解這一點對於咱們研究源碼會很是有幫助。線程
上面的分析讓咱們從單線程入隊的角度來理解入隊過程,可是多個線程同時進行入隊狀況就變得更加複雜,由於可能會出現其餘線程插隊的狀況。若是有一個線程正在入隊,那麼它必須先獲取尾節點,而後設置尾節點的下一個節點爲入隊節點,但這時可能有另一個線程插隊了,那麼隊列的尾節點就會發生變化,這時當前線程要暫停入隊操做,而後從新獲取尾節點。讓咱們再經過源碼來詳細分析下它是如何使用CAS算法來入隊的。debug
public boolean offer(E e) { if (e == null) throw new NullPointerException(); //入隊前,建立一個入隊節點 Node<E> n = new Node<E>(e); retry: //死循環,入隊不成功反覆入隊。 for (;;) { //建立一個指向tail節點的引用 Node<E> t = tail; //p用來表示隊列的尾節點,默認狀況下等於tail節點。 Node<E> p = t; for (int hops = 0; ; hops++) { //得到p節點的下一個節點。 Node<E> next = succ(p); //next節點不爲空,說明p不是尾節點,須要更新p後在將它指向next節點 if (next != null) { //循環了兩次及其以上,而且當前節點仍是不等於尾節點 if (hops > HOPS && t != tail) continue retry; p = next; } //若是p是尾節點,則設置p節點的next節點爲入隊節點。 else if (p.casNext(null, n)) { //若是tail節點有大於等於1個next節點,則將入隊節點設置成tair節點,更新失敗了也 不要緊,由於失敗了表示有其餘線程成功更新了tair節點。 if (hops >= HOPS) casTail(t, n); // 更新tail節點,容許失敗 return true; } // p有next節點,表示p的next節點是尾節點,則從新設置p節點 else { p = succ(p); } } } }
從源代碼角度來看整個入隊過程主要作二件事情。第一是定位出尾節點,第二是使用CAS算法能將入隊節點設置成尾節點的next節點,如不成功則重試。設計
第一步定位尾節點。tail節點並不老是尾節點,因此每次入隊都必須先經過tail節點來找到尾節點,尾節點可能就是tail節點,也多是tail節點的next節點。代碼中循環體中的第一個if就是判斷tail是否有next節點,有則表示next節點多是尾節點。獲取tail節點的next節點須要注意的是p節點等於p的next節點的狀況,只有一種可能就是p節點和p的next節點都等於空,表示這個隊列剛初始化,正準備添加第一次節點,因此須要返回head節點。獲取p節點的next節點代碼以下
final Node<E> succ(Node<E> p) { Node<E> next = p.getNext(); return (p == next) ? head : next; }
第二步設置入隊節點爲尾節點。p.casNext(null, n)方法用於將入隊節點設置爲當前隊列尾節點的next節點,p若是是null表示p是當前隊列的尾節點,若是不爲null表示有其餘線程更新了尾節點,則須要從新獲取當前隊列的尾節點。
hops的設計意圖。上面分析過對於先進先出的隊列入隊所要作的事情就是將入隊節點設置成尾節點,doug lea寫的代碼和邏輯仍是稍微有點複雜。那麼我用如下方式來實現行不行?
public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node<E> n = new Node<E>(e); for (;;) { Node<E> t = tail; if (t.casNext(null, n) && casTail(t, n)) { return true; } } }
讓tail節點永遠做爲隊列的尾節點,這樣實現代碼量很是少,並且邏輯很是清楚和易懂。可是這麼作有個缺點就是每次都須要使用循環CAS更新tail節點。若是能減小CAS更新tail節點的次數,就能提升入隊的效率,因此doug lea使用hops變量來控制並減小tail節點的更新頻率,並非每次節點入隊後都將 tail節點更新成尾節點,而是當 tail節點和尾節點的距離大於等於常量HOPS的值(默認等於1)時才更新tail節點,tail和尾節點的距離越長使用CAS更新tail節點的次數就會越少,可是距離越長帶來的負面效果就是每次入隊時定位尾節點的時間就越長,由於循環體須要多循環一次來定位出尾節點,可是這樣仍然能提升入隊的效率,由於從本質上來看它經過增長對volatile變量的讀操做來減小了對volatile變量的寫操做,而對volatile變量的寫操做開銷要遠遠大於讀操做,因此入隊效率會有所提高。
private static final int HOPS = 1;
還有一點須要注意的是入隊方法永遠返回true,因此不要經過返回值判斷入隊是否成功。
出隊列的就是從隊列裏返回一個節點元素,並清空該節點對元素的引用。讓咱們經過每一個節點出隊的快照來觀察下head節點的變化。
從上圖可知,並非每次出隊時都更新head節點,當head節點裏有元素時,直接彈出head節點裏的元素,而不會更新head節點。只有當head節點裏沒有元素時,出隊操做纔會更新head節點。這種作法也是經過hops變量來減小使用CAS更新head節點的消耗,從而提升出隊效率。讓咱們再經過源碼來深刻分析下出隊過程。
public E poll() { Node<E> h = head; // p表示頭節點,須要出隊的節點 Node<E> p = h; for (int hops = 0;; hops++) { // 獲取p節點的元素 E item = p.getItem(); // 若是p節點的元素不爲空,使用CAS設置p節點引用的元素爲null,若是成功則返回p節點的元素。 if (item != null && p.casItem(item, null)) { if (hops >= HOPS) { //將p節點下一個節點設置成head節點 Node<E> q = p.getNext(); updateHead(h, (q != null) ? q : p); } return item; } // 若是頭節點的元素爲空或頭節點發生了變化,這說明頭節點已經被另一個線程修改了。那麼獲取p節點的下一個節點 Node<> next = succ(p); // 若是p的下一個節點也爲空,說明這個隊列已經空了 if (next == null) { // 更新頭節點。 updateHead(h, p); break; } // 若是下一個元素不爲空,則將頭節點的下一個節點設置成頭節點 p = next; } return null; }
首先獲取頭節點的元素,而後判斷頭節點元素是否爲空,若是爲空,表示另一個線程已經進行了一次出隊操做將該節點的元素取走,若是不爲空,則使用CAS的方式將頭節點的引用設置成null,若是CAS成功,則直接返回頭節點的元素,若是不成功,表示另一個線程已經進行了一次出隊操做更新了head節點,致使元素髮生了變化,須要從新獲取頭節點。