【實戰Java高併發程序設計 1】Java中的指針:Unsafe類html
【實戰Java高併發程序設計 2】無鎖的對象引用:AtomicReferencenode
【實戰Java高併發程序設計 3】帶有時間戳的對象引用:AtomicStampedReference算法
【實戰Java高併發程序設計 4】數組也能無鎖:AtomicIntegerArray數組
【實戰Java高併發程序設計 5】讓普通變量也享受原子操做併發
【實戰Java高併發程序設計6】挑戰無鎖算法:無鎖的Vector實現函數
在對線程池的介紹中,提到了一個很是特殊的等待隊列SynchronousQueue。SynchronousQueue的容量爲0,任何一個對SynchronousQueue的寫須要等待一個對SynchronousQueue的讀,反之亦然。所以,SynchronousQueue與其說是一個隊列,不如說是一個數據交換通道。那SynchronousQueue的其妙功能是如何實現的呢?高併發
既然我打算在這一節中介紹它,那麼SynchronousQueue好比和無鎖的操做脫離不了關係。實際上SynchronousQueue內部也正是大量使用了無鎖工具。工具
對SynchronousQueue來講,它將put()和take()兩個功能大相徑庭的操做抽象爲一個共通的方法Transferer.transfer()。從字面上看,這就是數據傳遞的意思。它的完整簽名以下:post
Object transfer(Object e, boolean timed, long nanos)
當參數e爲非空時,表示當前操做傳遞給一個消費者,若是爲空,則表示當前操做須要請求一個數據。timed參數決定是否存在timeout時間,nanos決定了timeout的時長。若是返回值非空,則表示數據以及接受或者正常提供,若是爲空,則表示失敗(超時或者中斷)。spa
SynchronousQueue內部會維護一個線程等待隊列。等待隊列中會保存等待線程以及相關數據的信息。好比,生產者將數據放入SynchronousQueue時,若是沒有消費者接受,那麼數據自己和線程對象都會打包在隊列中等待(由於SynchronousQueue容積爲0,沒有數據能夠正常放入)。
Transferer.transfer()函數的實現是SynchronousQueue的核心,它大致上分爲三個步驟:
一、若是等待隊列爲空,或者隊列中節點的類型和本次操做是一致的,那麼將當前操做壓入隊列等待。好比,等待隊列中是讀線程等待,本次操做也是讀,所以這2個讀都須要等待。進入等待隊列的線程可能會被掛起,它們會等待一個「匹配」操做。
二、若是等待隊列中的元素和本次操做是互補的(好比等待操做是讀,而本次操做是寫),那麼就插入一個「完成」狀態的節點,而且讓他「匹配」到一個等待節點上。接着彈出這2個節點,而且使得對應的2個線程繼續執行。
三、若是線程發現等待隊列的節點就是「完成」節點。那麼幫助這個節點完成任務。其流程和步驟2是一致的。
步驟1的實現以下(代碼參考JDK 7u60):
SNode h = head; if (h == null || h.mode == mode) { // 若是隊列爲空,或者模式相同 if (timed && nanos <= 0) { // 不進行等待 if (h != null && h.isCancelled()) casHead(h, h.next); // 處理取消行爲 else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { SNode m = awaitFulfill(s, timed, nanos); //等待,直到有匹配操做出現 if (m == s) { // 等待被取消 clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // 幫助s的 fulfiller return (mode == REQUEST) ? m.item : s.item; } }
上述代碼中,第1行SNode表示等待隊列中的節點。內部封裝了當前線程、next節點、匹配節點、數據內容等信息。第2行,判斷當前等待隊列爲空,或者隊列中元素的模式與本次操做相同(好比,都是讀操做,那麼都必需要等待)。第8行,生成一個新的節點並置於隊列頭部,這個節點就表明當前線程。若是入隊成功,則執行第9行awaitFulfill()函數。該函數會進行自旋等待,並最終掛起當前線程。直到一個與之對應的操做產生,將其喚醒。線程被喚醒後(表示已經讀取到數據或者本身產生的數據已經被別的線程讀取),在14~15行嘗試幫助對應的線程完成兩個頭部節點的出隊操做(這僅僅是友情幫助)。並在最後,返回讀取或者寫入的數據(第16行)。
步驟2的實現以下:
} else if (!isFulfilling(h.mode)) { //是否處於fulfill狀態 if (h.isCancelled()) // 若是之前取消了 casHead(h, h.next); // 彈出並重試 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // 一直循環直到匹配(match)或者沒有等待者了 SNode m = s.next; // m 是 s的匹配者(match) if (m == null) { // 已經沒有等待者了 casHead(s, null); // 彈出fulfill節點 s = null; // 下一次使用新的節點 break; // 從新開始主循環 } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // 彈出s 和 m return (mode == REQUEST) ? m.item : s.item; } else // match 失敗 s.casNext(m, mn); // 幫助刪除節點 } } }
上述代碼中,首先判斷頭部節點是否處於fulfill模式。若是是,則須要進入步驟3。不然,將試本身爲對應的fulfill線程。第4行,生成一個SNode元素,設置爲fulfill模式並將其壓入隊列頭部。接着,設置m(原始的隊列頭部)爲s的匹配節點(第13行),這個tryMatch()操做將會激活一個等待線程,並將m傳遞給那個線程。若是設置成功,則表示數據投遞完成,將s和m兩個節點彈出便可(第14行)。若是tryMatch()失敗,則表示已經有其餘線程幫我完成了操做,那麼簡單得刪除m節點便可(第17行),由於這個節點的數據已經被投遞,不須要再次處理,而後,再次跳轉到第5行的循環體,進行下一個等待線程的匹配和數據投遞,直到隊列中沒有等待線程爲止。
步驟3:若是線程在執行時,發現頭部元素剛好是fulfill模式,它就會幫助這個fulfill節點儘快被執行:
} else { // 幫助一個 fulfiller SNode m =h.next; // m 是 h的 match if (m ==null) // 沒有等待者 casHead(h,null); // 彈出fulfill節點 else { SNode mn =m.next; if(m.tryMatch(h)) // 嘗試 match casHead(h, mn); // 彈出 h 和 m else // match失敗 h.casNext(m,mn); // 幫助刪除節點 } }
上述代碼的執行原理和步驟2是徹底一致的。惟一的不一樣是步驟3不會返回,由於步驟3所進行的工做是幫助其餘線程儘快投遞它們的數據。而本身並無完成對應的操做,所以,線程進入步驟3後,再次進入大循環體(代碼中沒有給出),從步驟1開始從新判斷條件和投遞數據。
從整個數據投遞的過程當中能夠看到,在SynchronousQueue中,參與工做的全部線程不只僅是競爭資源的關係。更重要的是,它們彼此之間還會相互幫助。在一個線程內部,可能會幫助其餘線程完成它們的工做。這種模式能夠更大程度上減小飢餓的可能,提升系統總體的並行度。
摘自《實戰Java高併發程序設計》