SynchronousQueue是一種特殊的阻塞隊列,不一樣於LinkedBlockingQueue、ArrayBlockingQueue和PriorityBlockingQueue,其內部沒有任何容量,任何的入隊操做都須要等待其餘線程的出隊操做,反之亦然。若是將SynchronousQueue用於生產者/消費者模式,那麼至關於生產者和消費者手遞手交易,即生產者生產出一個貨物,則必須等到消費者過來取貨,方可完成交易。
SynchronousQueue有一個fair選項,若是fair爲true,稱爲fair模式,不然就是unfair模式。fair模式使用一個先進先出的隊列保存生產者或者消費者線程,unfair模式則使用一個後進先出的棧保存。java
SynchronousQueue經過將入隊出隊的線程綁定到隊列的節點上,並藉助LockSupport的park()和unpark()實現等待,先到達的線程A需調用LockSupport的park()方法將當前線程進入阻塞狀態,知道另外一個與之匹配的線程B調用LockSupport.unpark(Thread)來喚醒在該節點上等待的線程A。
基本邏輯:node
在深刻分析其實現機制以前,咱們先了解對於SynchronousQueue可執行哪些操做,因爲SynchronousQueue的容量爲0,因此一些針對集合的操做,如:isEmpty()/size()/clear()/remove(Object)/contains(Object)等操做都是無心義的,一樣peek()也老是返回null。因此針對SynchronousQueue只有兩類操做:安全
這兩類操做內部都是調用Transferer的transfer(Object, boolean, long)方法,經過第一個參數是否爲null,來區分是生產者仍是消費者(生產者不爲null)。
針對以上狀況,咱們將着重分析Transferer的transfer(Object, boolean, long)方法,這裏因爲兩種不一樣的公平模式,會存在兩個Transferer的派生類:併發
public SynchronousQueue(boolean fair) { transferer = (fair)? new TransferQueue() : new TransferStack(); }
可見fair模式使用TransferQueue,unfair模式使用TransferStack,下面咱們將分別對這兩種模式進行着重分析。app
fair模式使用一個FIFO的隊列保存線程,TransferQueue的結構以下:oop
/** Dual Queue */ static final class TransferQueue extends Transferer { /** Node class for TransferQueue. */ static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } ... } /** Head of queue */ transient volatile QNode head; /** Tail of queue */ transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } ... }
以上是TransferQueue的大體結構,能夠看到TransferQueue同一個普通的隊列,同時存在一個指向隊列頭部的指針——head,和一個指向隊列尾部的指針——tail;cleanMe的存在主要是解決不可清楚隊列的尾節點的問題,後面會介紹到;隊列的節點經過內部類QNode封裝,QNode包含四個變量:this
其餘的內容就是一些CAS變量以及操做,下面主要分析TransferQueue的三個重要方法:transfer(Object, boolean, long)、awaitFulfill(QNode, Object, boolean, long)、clean(QNode, QNode)。這三個方法是TransferQueue的核心,入口是transfer(),下面具體看代碼。atom
/** * @By Vicky:交換數據,生產者和消費者經過e==null來區分 */ Object transfer(Object e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null)? REQUEST : DATA;// 根據e==null判斷生產者仍是消費者,對應不一樣的mode值 for (;;) { SNode h = head; // 棧爲null或者棧頂元素的模式同當前模式,則進行入棧操做 if (h == null || h.mode == mode) { // empty or same-mode // 不等待,則直接返回null,返回以前順帶清理下被取消的元素 if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) {// 入棧,更新棧頂爲新節點 // 等待,返回值m==s,則被取消,需清除 SNode m = awaitFulfill(s, timed, nanos); // m==s說明s被取消了,清除 if (m == s) { // wait was cancelled clean(s); return null; } // 幫忙出棧 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 消費者則返回生產者的數據,生產者則返回本身的數據 return mode == REQUEST? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // try to fulfill // 棧頂未開始匹配,則開始匹配 // h被取消,則出棧 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // 更新棧頂爲新插入的節點,並更新節點的mode爲FULFILLING,對應判斷是否正在出棧的方法 // 匹配須要先將待匹配的節點入棧,因此不論是匹配仍是不匹配都須要建立一個節點入棧 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 循環直到找到一個能夠匹配的節點 for (;;) { // loop until matched or waiters disappear // m即與s匹配的節點 SNode m = s.next; // m is s's match // m==null說明棧s以後無元素了,直接將棧頂設置爲null,並從新進行最外層的循環 if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } // 將s設置爲m的匹配節點,並更新棧頂爲m.next,即將s和m同時出棧 SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (mode == REQUEST)? m.item : s.item; } else // lost match // 設置匹配失敗,則說明m正準備出棧,幫助出棧 s.casNext(m, mn); // help unlink } } } else { // help a fulfiller // 棧頂已開始匹配,幫助匹配 // 此處的操做邏輯同上面的操做邏輯一致,目的就是幫助上面進行操做,由於此處完成匹配須要分紅兩步: // a.m.tryMatch(s)和b.casHead(s, mn) // 因此必然會插入其餘線程,只要插入的線程也按照這個步驟執行那麼就避免了不一致問題 SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
從上面的代碼能夠看出TransferQueue.transfer()的總體流程:spa
下面看看具體如何讓一個線程進入阻塞。線程
/** *@ By Vicky:等待匹配,該方法會進入阻塞,直到三種狀況下才返回: * a.等待被取消了,返回值爲s * b.匹配上了,返回另外一個線程傳過來的值 * c.線程被打斷,會取消,返回值爲s */ Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { // timed==false,則不等待,lastTime==0便可 long lastTime = (timed)? System.nanoTime() : 0; // 當前線程 Thread w = Thread.currentThread(); // 循環次數,原理同自旋鎖,若是不是隊列的第一個元素則不自旋,由於壓根輪不上他,自旋只是浪費CPU // 若是等待的話則自旋的次數少些,不等待就多些 int spins = ((head.next == s) ? (timed? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted())// 支持打斷 s.tryCancel(e); // 若是s的item不等於e,有三種狀況: // a.等待被取消了,此時x==s // b.匹配上了,此時x==另外一個線程傳過來的值 // c.線程被打斷,會取消,此時x==s // 不論是哪一種狀況都不要再等待了,返回便可 Object x = s.item; if (x != e) return x; // 等到,直接超時取消 if (timed) { long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0) { s.tryCancel(e); continue; } } // 自旋,直到spins==0,進入等待 if (spins > 0) --spins; // 設置等待線程 else if (s.waiter == null) s.waiter = w; // 調用LockSupport.park進入等待 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
awaitFulfill()主要涉及自旋以及LockSupport.park()兩個關鍵點,自旋可去了解自旋鎖的原理。
自旋鎖原理:經過空循環則霸佔着CPU,避免當前線程進入睡眠,由於睡眠/喚醒是須要進行線程上下文切換的,因此若是線程睡眠的時間很段,那麼使用空循環可以避免線程進入睡眠的耗時,從而快速響應。可是因爲空循環會浪費CPU,因此也不能一直循環。自旋鎖通常適合同步快很小,競爭不是很激烈的場景。
LockSupport.park()可到API文檔進行了解。
下面再看看如何清除被取消的節點。
/** *@By Vicky:清除節點被取消的節點 */ void clean(QNode pred, QNode s) { s.waiter = null; // forget thread // 若是pred.next!=s則說明s已經出隊了 while (pred.next == s) { // Return early if already unlinked QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head // 從隊列頭部開始遍歷,遇到被取消的節點則將其出隊 if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail // t==h則隊列爲null if (t == h) return; QNode tn = t.next; if (t != tail) continue; // 幫助其餘線程入隊 if (tn != null) { advanceTail(t, tn); continue; } // 只能出隊非尾節點 if (s != t) { // If not tail, try to unsplice // 出隊方式很簡單,將pred.next指向s.next便可 QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; } // 若是s是隊尾元素,那麼就須要cleanMe出場了,若是cleanMe==null,則只需將pred賦值給cleanMe便可, // 賦值cleanMe的意思是等到s不是隊尾時再進行清除,畢竟隊尾只有一個 // 同時將上次的cleanMe清除掉,正常狀況下此時的cleanMe已經不是隊尾了,由於當前須要清除的節點是隊尾 // (上面說的cleanMe實際上是須要清除的節點的前繼節點) QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node QNode d = dp.next; QNode dn; // d==null說明須要清除的節點已經沒了 // d==dp說明dp已經被清除了,那麼dp.next也一併被清除了 // 若是d未被取消,說明哪裏出錯了,將cleanMe清除,不清除這個節點了 // 後面括號將清除cleanMe的next出局,前提是cleanMe.next沒有已經被出局 if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); // dp==pred說明cleanMe.next已經其餘線程被更新了 if (dp == pred) return; // s is already saved node } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }
清除節點時有個原則:不能清除隊尾節點。因此若是對尾節點須要被清除,則將其保存到cleanMe變量,等待下次進行清除。在清除cleanMe時可能說的有點模糊,由於涉及到太多的併發會出現不少狀況,因此if條件太多,致使難以分析所有狀況。
以上就是TransferQueue的操做邏輯,下面看看後進先出的TransferStack。
unfair模式使用一個LIFO的隊列保存線程,TransferStack的結構以下:
/** Dual stack */ static final class TransferStack extends Transferer { /* Modes for SNodes, ORed together in node fields */ /** Node represents an unfulfilled consumer */ static final int REQUEST = 0;// 消費者請求數據 /** Node represents an unfulfilled producer */ static final int DATA = 1;// 生產者生產數據 /** Node is fulfilling another unfulfilled DATA or REQUEST */ static final int FULFILLING = 2;// 正在匹配中... /** 只須要判斷mode的第二位是否==1便可,==1則正在匹配中...*/ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } /** Node class for TransferStacks. */ static final class SNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. SNode(Object item) { this.item = item; } } /** The head (top) of the stack */ volatile SNode head; static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; } }
TransferStacks比TransferQueue的結構複雜些。使用一個head指向棧頂元素,使用內部類SNode封裝棧中的節點信息,SNode包含5個變量:
SNode的5個變量,三個是volatile的,另外兩個item和mode沒有volatile修飾,代碼註釋給出的解釋是:對這兩個變量的寫老是發生在volatile/原子操做的以前,讀老是發生在volatile/原子操做的以後。
上面提到SNode.mode的三個常量表示棧中節點的狀態,f分別爲:
其餘內部基本同TransferQueue,不一樣之處是當匹配到一個節點時並不是是將被匹配的節點出棧,而是將匹配的節點入棧,而後同時將匹配上的兩個節點一塊兒出棧。下面咱們參照TransferQueue來看看TransferStacks的三個方法:transfer(Object, boolean, long)、awaitFulfill(QNode, Object, boolean, long)、clean(QNode, QNode)。
/** * @By Vicky:交換數據,生產者和消費者經過e==null來區分 */ Object transfer(Object e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null)? REQUEST : DATA;// 根據e==null判斷生產者仍是消費者,對應不一樣的mode值 for (;;) { SNode h = head; // 棧爲null或者棧頂元素的模式同當前模式,則進行入棧操做 if (h == null || h.mode == mode) { // empty or same-mode // 不等待,則直接返回null,返回以前順帶清理下被取消的元素 if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) {// 入棧,更新棧頂爲新節點 // 等待,返回值m==s,則被取消,需清除 SNode m = awaitFulfill(s, timed, nanos); // m==s說明s被取消了,清除 if (m == s) { // wait was cancelled clean(s); return null; } // 幫忙出棧 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 消費者則返回生產者的數據,生產者則返回本身的數據 return mode == REQUEST? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // try to fulfill // 棧頂未開始匹配,則開始匹配 // h被取消,則出棧 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // 更新棧頂爲新插入的節點,並更新節點的mode爲FULFILLING,對應判斷是否正在出棧的方法 // 匹配須要先將待匹配的節點入棧,因此不論是匹配仍是不匹配都須要建立一個節點入棧 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 循環直到找到一個能夠匹配的節點 for (;;) { // loop until matched or waiters disappear // m即與s匹配的節點 SNode m = s.next; // m is s's match // m==null說明棧s以後無元素了,直接將棧頂設置爲null,並從新進行最外層的循環 if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } // 將s設置爲m的匹配節點,並更新棧頂爲m.next,即將s和m同時出棧 SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (mode == REQUEST)? m.item : s.item; } else // lost match // 設置匹配失敗,則說明m正準備出棧,幫助出棧 s.casNext(m, mn); // help unlink } } } else { // help a fulfiller // 棧頂已開始匹配,幫助匹配 // 此處的操做邏輯同上面的操做邏輯一致,目的就是幫助上面進行操做,由於此處完成匹配須要分紅兩步: // a.m.tryMatch(s)和b.casHead(s, mn) // 因此必然會插入其餘線程,只要插入的線程也按照這個步驟執行那麼就避免了不一致問題 SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
從上面的代碼能夠看出TransferStack.transfer()的總體流程:
下面看看TransferStack是如何讓一個線程進入阻塞。
/** *@ By Vicky:等待匹配,邏輯大體同TransferQueue可參考閱讀 */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { long lastTime = (timed)? System.nanoTime() : 0; Thread w = Thread.currentThread(); SNode h = head; // 計算自旋的次數,邏輯大體同TransferQueue int spins = (shouldSpin(s)? (timed? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); // 若是s的match不等於null,有三種狀況: // a.等待被取消了,此時x==s // b.匹配上了,此時match==另外一個節點 // c.線程被打斷,會取消,此時x==s // 不論是哪一種狀況都不要再等待了,返回便可 SNode m = s.match; if (m != null) return m; if (timed) { // 等待 long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0) { s.tryCancel(); continue; } } // 自旋 if (spins > 0) spins = shouldSpin(s)? (spins-1) : 0; // 設置等待線程 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter // 等待 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
邏輯基本同TransferQueue,不一樣之處是經過修改SNode的match變量標示匹配,以及取消。
下面再看看如何清除被取消的節點。
/** * @By Vicky:清除節點 */ void clean(SNode s) { s.item = null; // forget item s.waiter = null; // forget thread // 清除 SNode past = s.next; if (past != null && past.isCancelled()) past = past.next; // Absorb cancelled nodes at head // 從棧頂節點開始清除,一直到遇到未被取消的節點,或者直到s.next SNode p; while ((p = head) != null && p != past && p.isCancelled()) casHead(p, p.next); // Unsplice embedded nodes // 若是p自己未取消(上面的while碰到一個未取消的節點就會退出,但這個節點和past節點之間可能還有取消節點), // 再把p到past之間的取消節點都移除。 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } }
以上即所有的TransferStack的操做邏輯。
看完了TransferQueue和TransferStack的邏輯,SynchronousQueue的邏輯基本清楚了。
SynchronousQueue的應用場景得看具體業務需求,J.U.C下有一個應用案例:Executors.newCachedThreadPool()就是使用SynchronousQueue做爲任務隊列。