public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
上面是SynchronousQueue的兩個構造方法,能夠看出SynchronousQueue的底層實際是建立一個轉移隊列,而且這個隊列有公平和非公平兩種模式,下面咱們先來看看公平模式的轉移隊列是怎麼實現的。java
2.SynchronousQueue的公平模式node
不管是非公平的TransferStack隊列仍是公平的TransferQueue隊列,都是對父類Transfer的實現:安全
//Transferer是SynchronousQueue中的一個內部類,定義了一個轉移方法 //TransferStack和TransferQueue都要實現該轉移方法 abstract static class Transferer<E> { abstract E transfer(E e, boolean timed, long nanos); }
TransferQueue的是一個底層由鏈表實現的FIFO隊列,其結點的定義以下:數據結構
static final class QNode { volatile QNode next; // 結點的後繼 volatile Object item; // 結點中存儲的數據 volatile Thread waiter; // 等待的線程 //是不是數據,此標識要與判斷入隊的操做是什麼操做 //即判斷相鄰兩次入隊的操做是否相同,如果相鄰兩次入隊 //的是不相同的操做(一次put一次take)那麼就要進行配對後移除出隊。 //不然,將操做入隊便可 final boolean isData; //構造方法 QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } //CAS更新後繼結點 boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } //CAS更新結點中的數據 boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } //取消操做,即嘗試將item更新爲結點自己 void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } //判斷節點是否應該取消,即item是否是結點自己 boolean isCancelled() { return item == this; } //判斷當前結點是否已再也不隊列之中 boolean isOffList() { return next == this; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
知道了TransferQueue的底層實現,在看看TransferQueue中重要的屬性及構造方法:併發
static final class TransferQueue<E> extends Transferer<E> { //隊列的隊首結點 transient volatile QNode head; //隊列的隊尾結點 transient volatile QNode tail; //清除標記 transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long cleanMeOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); cleanMeOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("cleanMe")); } catch (Exception e) { throw new Error(e); } } }
3.公平模式下的take和put操做app
//將指定元素添加到此隊列,若有必要則等待另外一個線程接收它 public void put(E e) throws InterruptedException { //判斷轉移元素是否爲null,也就是說同步隊列中不能對null元素進行轉移 //由於e元素是否爲null,是transfer方法中判斷入隊操做是put仍是take //的一個依據 if (e == null) throw new NullPointerException(); //put方法中傳入transfer的e不爲null //調用transfer 默認不進行超時等待,若是發生中斷則拋出中斷異常 if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } //獲取並移除此隊列的頭,若有必要則等待另外一個線程插入它 public E take() throws InterruptedException { //take方法中傳入transfer方法的e爲null E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
由take方法和put方法的源碼可知,在公平模式下,這兩個方法本質都是對TransferQueue中transfer方法的調用,下面來看看TransferQueue中的transfer方法是如何實現的?oop
E transfer(E e, boolean timed, long nanos) { QNode s = null; //根據e是否爲null,標記本次入隊的是什麼操做 //e爲null,isData爲false,不然isData爲true boolean isData = (e != null); for (;;) { //自旋 QNode t = tail; //獲取隊尾 QNode h = head; //獲取隊首 //判斷隊首或隊尾是否爲null,實際上h和t不會爲null(構造方法中已經初始化過) if (t == null || h == null) // saw uninitialized value continue; // spin //判斷是不是空隊列,或者入隊操做與隊尾相同(即與隊尾都是put或者take操做) //不管是空隊列仍是與隊尾相同操做,說明只能將操做入隊,而不能進行 //匹配(操做相同,沒法配對,配對要操做不一樣才行) if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; //獲取隊尾的後繼結點 //判斷隊尾是否已經改變,便是否有其餘線程搶先進行入隊或者配對操做 if (t != tail) // inconsistent read continue; //從新進行入隊或者配對判斷 //判斷是否是有新增的與隊尾相同的入隊操做,如果,更新成新的隊尾 if (tn != null) { // lagging tail advanceTail(t, tn); //嘗試更新隊尾結點,失敗也沒有關係 continue; } //判斷是否超時(timed爲是否設置了超時等待操做,nanos爲剩餘的等待時間) if (timed && nanos <= 0) // can't wait return null; //超時後直接返回null //判斷s是否爲null,如果則以e爲item建立一個結點 if (s == null) s = new QNode(e, isData); //嘗試更新隊尾的後繼結點爲s結點,失敗的話就循環繼續嘗試直到成功 if (!t.casNext(null, s)) // failed to link in continue; //在隊尾新增了一個後繼結點,那麼隊尾就應該是這個後繼結點了 //所以須要將s更新爲新的隊尾結點 advanceTail(t, s); // swing tail and wait //空旋或者阻塞直到匹配的操做到來 Object x = awaitFulfill(s, e, timed, nanos); //到這一步,說明阻塞的操做已經配對成功或者操做已經被取消了,線程被喚醒了 //判斷是配對成功了,仍是操做被取消了 //如果操做被取消,會設置s.item=s if (x == s) { // wait was cancelled clean(t, s); //清除結點s return null; //操做已經被取消,直接返回null } //判斷結點s是否還在隊列中 //如果還在隊列中,且又配對操做成功,說明s結點應該是新的head //而且若s.item不爲null還須要將s.item設爲s自身,等待線程賦null if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head //判斷是否 if (x != null) s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; // 入隊的操做與以前的隊尾的操做不一樣,能夠進行配對(take配put,或 // put配take) } else { //獲取隊首的後繼結點 QNode m = h.next; //出現t與隊尾不一樣,m爲null,h與隊首不一樣,說明隊列發生了改變 //即隊列出現了其餘線程搶先執行了入隊或者配對的操做 if (t != tail || m == null || h != head) continue; //循環從新來 //獲取後繼結點的item Object x = m.item; //isData == (x != null)是判斷m結點對應的操做與當前操做是否相同 //x == m 則是判斷m結點是否被取消 //!m.casItem(x, e) 則是判斷嘗試更新m結點的item爲e是否成功 //以上三個判斷有一個爲真,那就說明m結點已經不在隊列中或是被取消或是匹配過了 if (isData == (x != null) || x == m || !m.casItem(x, e)) { advanceHead(h, m); //舊隊首已通過時,更新隊首 continue; } //更新head,到此處說明配對操做已經成功,應該將m結點變爲head //而h結點則須要移除出列 advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); //喚醒m結點對應的線程 // return (x != null) ? (E)x : e; } } } //嘗試更新隊尾結點 void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); } //等待結點被對應的操做匹配 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { //根據timed標識即超時時間計算截止時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //湖區當前線程的引用 //計算出自旋時間 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); //死循環,確保操做能成功 for (;;) { //判斷當前線程是否被中斷,如果被中斷那麼取消 if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; //獲取結點對應的item對象 //判斷結點的item是否仍是對象e //生成s節點的時候,s.item是等於e的,當取消操做(item變爲s)或者 //匹配了操做的時候會進行更改 if (x != e) return x; //判斷是否設置了超時等待功能 if (timed) { nanos = deadline - System.nanoTime(); //計算剩餘的等待時間 if (nanos <= 0L) { //判斷是否應超時,若超時直接嘗試取消操做 s.tryCancel(e); continue; } } //自旋時間控制 if (spins > 0) --spins; //自旋時間減小 //判斷是否須要設置等待線程 else if (s.waiter == null) s.waiter = w; else if (!timed) //沒有設置超時等待功能,直接讓讓線程一直掛起 LockSupport.park(this); //設置了超時等待功能,就判斷等待時間是否大於自旋的最大時間 //若大於自旋最大時間那就讓線程阻塞一段時間 //不然讓線程自旋一段時間 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } //嘗試取消當前結點對應的操做,即將結點中item值更新成結點自身(this) void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } //更新隊首head指針,並將原隊首結點的next指向其自身,方便GC void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next } //如果某個結點對應的操做被取消,那麼這個操做對應的結點須要移除出隊 //也就是清理無效結點。 /** * clean方法中若是刪除的節點不是尾節點,那麼能夠直接進行刪除, * 若是刪除的節點是尾節點,那麼用cleanMe標記須要刪除的節點的前驅, * 這樣在下一輪的clean的過程將會清除打了標記的節點。 */ void clean(QNode pred, QNode s) { s.waiter = null; //操做已經被取消,那麼就不會有等待線程了 //判斷s是不是pred的後繼結點 while (pred.next == s) { // Return early if already unlinked QNode h = head; //隊首引用 QNode hn = h.next; //隊首的後繼結點 //判斷hn是否被取消,若被取消,那麼更新head if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // 隊尾結點 //判斷是不是空隊列 if (t == h) return; QNode tn = t.next; //隊尾的後繼 //隊尾改變,說明隊列被其餘線程修改過了 if (t != tail) continue; //t有後繼結點,說明隊尾該更新了 if (tn != null) { advanceTail(t, tn); continue; } //判斷s結點是不是隊尾結點 //若不是隊尾結點,則只需將s的前驅pred結點的next指向s結點的後繼結點sn //即成功將s結點從隊列中清除 if (s != t) { // If not tail, try to unsplice QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; } //到此處,說明要清除的結點就是隊列的隊尾,而隊尾不能直接刪除 QNode dp = cleanMe; //獲取標識有要刪除的結點的前驅結點 //判斷是否有須要刪除的結點,dp不爲null,即cleanMe存在 //說明隊列以前有隊尾結點須要刪除 if (dp != null) { // Try unlinking previous cancelled node QNode d = dp.next; QNode dn; //當cleanMe處於如下四種情形時,cleanMe失效 //(1)cleanMe的後繼而空(cleanMe 標記的是須要刪除節點的前驅) //(2)cleanMe的後繼等於自身, //(3)須要刪除節點的操做沒有被取消, //(4)被刪除的節點不是尾節點且其後繼節點有效 if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); //cleanMe失效,賦值爲null if (dp == pred) return; // s is already saved node } else if (casCleanMe(null, pred)) //將隊尾的前驅結點標記成cleanMe return; // Postpone cleaning s } } //用於判斷結點是否還在隊列中,若後繼結點是自身,說明已經再也不隊列中 //不然還在隊列中 boolean isOffList() { return next == this; }
由上面對公平的SynchronousQueue的分析可知,底層是使用隊列來實現公平模式的,而且線程安全是經過CAS方式實現的。公平的TransferQueue隊列中會將連續相同的操做入隊,而不一樣的操做則會進行配對,即TransferQueue隊列中要麼沒有存放操做,要麼存放都是相同的操做(要麼都是take,要麼都是put),當有一個與隊列中的操做不相同的操做時,隊列會自動將隊首操做與之進行匹配。大體流程(操做被取消未顯示,方便理解)以下圖所示:學習
4.SynchronousQueue的非公平模式this
SynchronousQueue的非公平模式是基於棧來實現的,咱們知道棧是後進先出的(LIFO),也就是說這裏的非公平模式與ReentrantLock中的非公平模式區別巨大,後來先服務這太不公平了。spa
先來看看非公平模式的具體實現TransferStack的底層數據結構鏈表中結點的定義:
static final class SNode { volatile SNode next; // 鏈表的後繼結點 volatile SNode match; // 匹配的結點 volatile Thread waiter; // 等待的線程 Object item; // 數據 int mode; //結點的模式 SNode(Object item) { this.item = item; } //CAS方式更新後繼結點 boolean casNext(SNode cmp, SNode val) { return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } //嘗試匹配結點,匹配成功就將等待匹配結點對應的線程喚醒繼續後續操做 boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } return true; } return match == s; } //嘗試取消結點,將match更新成結點自身即標誌着結點已處於取消狀態 void tryCancel() { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } //判斷是否被取消 boolean isCancelled() { return match == this; } private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
知道告終點的基本結構,在來看看TransferStack中的重要屬性及構造方法:
//TransferStack中沒有構造方法,所以只有一個空構造 /** * TransferStack中定義了三個標記:REQUEST表示消費者,DATA表示生產者, * FULFILLING表示操做匹配狀態。任何線程對TransferStack的操做都屬於 * 上述3種狀態中的一種 */ static final class TransferStack<E> extends Transferer<E> { //用於標記結點的類型,表明結點是消費者(對應take操做) static final int REQUEST = 0; //用於標記結點的類型,表明結點是生產者(對應put操做) static final int DATA = 1; //用於標記結點的狀態,表明結點正處於匹配狀態 static final int FULFILLING = 2; //棧頂結點 volatile SNode head; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } }
5.非公平模式下的take和put操做
有上文對公平模式的學習,咱們知道take和put操做最終調用的都是transfer方法,只不過公平模式調用的是TransferQueue中的轉移方法,非公平模式則是調用TransferStack中的轉移方法
/** * transfer的大體過程:將一個操做與棧頂的操做進行配對 * 如果配對不成功(take對應put,或put對應take),那麼直接將該操做 * 入棧;如果配對成功,即此時應該將棧頂操做出棧,可是不能直接出棧( * 若此時其餘線程進行入棧,那麼直接出棧會出問題),而是先將匹配的操做 * 標記成FULFILLING狀態(匹配狀態)而後入棧;當其餘線程檢查到這個匹配 * 的過程,就會先幫助配對,在去執行自身的操做 */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed //判斷當前操做是何種模式,REQUEST對應take,DATA對應put int mode = (e == null) ? REQUEST : DATA; for (;;) { //死循環 SNode h = head; //獲取棧頂 //判斷棧是否爲空棧,若不爲空棧,那麼棧頂操做模式與當前操做模式是否相同 //如果棧頂操做與當前操做模式相同,那麼就須要入棧該操做 if (h == null || h.mode == mode) { // empty or same-mode //判斷是否設置了超時等待,若設置了超時等待,那等待時間是否還有剩餘 if (timed && nanos <= 0) { //判斷棧頂是否爲null,且棧頂是否被取消 //若不爲null,且棧頂操做被取消,那麼就嘗試更新棧頂操做爲其後繼 if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else //棧頂爲null或者沒被取消 return null; //沒有設置超時等待,或超時等待時間還未到,則嘗試將當前操做入棧 } else if (casHead(h, s = snode(s, e, h, mode))) { //s入棧成功,那麼就自旋或掛起等待匹配的操做到來在被喚醒 SNode m = awaitFulfill(s, timed, nanos); //操做自旋或掛起 //到這一步說明操做已經被取消,或者匹配到了對應的操做 //匹配結點m爲結點本身自己,說明操做被取消 if (m == s) { // wait was cancelled clean(s); //清除被取消的結點 return null; } //如果棧不是空棧且棧頂的後繼是s,說明操做s和其匹配的操做m,都還在棧中 //須要將其移除出棧,即更新棧頂爲s的後繼 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller //根據操做類型返回對應的數據,最後返回的其實都是put進去的數據 return (E) ((mode == REQUEST) ? m.item : s.item); } //當前的操做與棧頂操做相匹配,進行匹配,將操做的狀態更新成FULFILLING併入棧 } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // 判斷棧頂是否被取消 casHead(h, h.next); // 棧頂被取消更新棧頂 //入棧新結點s,s的後繼爲h,而且s處於FULFILLING狀態(匹配狀態) else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // s的後繼m,由於s處於匹配狀態,m多是其配對的結點 //配對結點爲null //這裏有些疑問,沒搞懂什麼情形下會出現m爲null(按理不該該出現的) if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } //m的後繼結點 SNode mn = m.next; //判斷m和s配對是否成功,配對成功嘗試更新棧頂爲m的後繼 if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match //沒有匹配成功,說明m結點不是s的配對結點,繼續向後尋找 s.casNext(m, mn); // help unlink } } //到這說明棧頂處於匹配狀態,那麼幫助其匹配 } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } } //將結點對應的線程自旋或掛起以等待匹配的操做到來 SNode awaitFulfill(SNode s, boolean timed, long nanos) { //計算截止時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //計算自旋時間 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); //嘗試取消操做 SNode m = s.match; //獲取匹配的結點 //匹配結點不爲null,說明要麼匹配到了,要麼被取消,均可以結束掛起了 if (m != null) return m; //判斷是否設置了超時等待 if (timed) { //計算剩餘時間 nanos = deadline - System.nanoTime(); //設置了超時等待,若截止時間已到,那麼操做要被取消 if (nanos <= 0L) { s.tryCancel(); continue; } } //自旋時間遞減 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; //設置等待線程 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter //沒有設置超時等待,線程一直掛起,知道被配對操做喚醒 else if (!timed) LockSupport.park(this); //線程掛起 //判斷超時等待時間是否大於自旋最大時間 //如果大於,就直接將線程掛起一段時間 //不然只自旋不掛起 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } //須要自旋的情形 //當前結點爲棧頂 //棧頂爲null //棧頂正在匹配中 boolean shouldSpin(SNode s) { SNode h = head; return (h == s || h == null || isFulfilling(h.mode)); } //嘗試配對 boolean tryMatch(SNode s) { //嘗試將當前結點的配對結點更新爲s,更新成功就喚醒當前結對應的線程 if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } return true; } return match == s; }
經過上面對非公平模式下TransferStack中transfer方法的分析,可知非公平模式實際上能夠說是很是的不公平,由於TransferStack是利用棧的後進先出性質來進行配對的,也就說基本上都是後來先服務。其大體過程能夠簡化成以下圖所示: