本文首發於一世流雲專欄: https://segmentfault.com/blog...
SynchronousQueue
是JDK1.5時,隨着J.U.C包一塊兒引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基於棧和隊列實現:html
沒有看錯,SynchronousQueue的底層實現包含兩種數據結構——棧和隊列。這是一種很是特殊的阻塞隊列,它的特色簡要歸納以下:java
注意:上述的特色1,和咱們以前介紹的Exchanger其實很是類似,能夠類比Exchanger的功能來理解。
以前提到,SynchronousQueue根據公平/非公平訪問策略的不一樣,內部使用了兩種不一樣的數據結構:棧和隊列。咱們先來看下對象的構造,SynchronousQueue只有2種構造器:node
/** * 默認構造器. * 默認使用非公平策略. */ public SynchronousQueue() { this(false); }
/** * 指定策略的構造器. */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
能夠看到,對於公平策略,內部構造了一個TransferQueue對象,而非公平策略則是構造了TransferStack對象。這兩個類都繼承了內部類Transferer,SynchronousQueue中的全部方法,其實都是委託調用了TransferQueue/TransferStack的方法:算法
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * tranferer對象, 構造時根據策略類型肯定. */ private transient volatile Transferer<E> transferer; /** * Shared internal API for dual stacks and queues. */ abstract static class Transferer<E> { /** * Performs a put or take. * * @param e 非null表示 生產者 -> 消費者; * null表示, 消費者 -> 生產者. * @return 非null表示傳遞的數據; null表示傳遞失敗(超時或中斷). */ abstract E transfer(E e, boolean timed, long nanos); } /** * Dual stack(雙棧結構). * 非公平策略時使用. */ static final class TransferStack<E> extends Transferer<E> { // ... } /** * Dual Queue(雙端隊列). * 公平策略時使用. */ static final class TransferQueue<E> extends Transferer<E> { // ... } // ... }
非公平策略由TransferStack類實現,既然TransferStack是棧,那就有結點。TransferStack內部定義了名爲SNode的結點:segmentfault
static final class SNode { volatile SNode next; volatile SNode match; // 與當前結點配對的結點 volatile Thread waiter; // 當前結點對應的線程 Object item; // 實際數據或null int mode; // 結點類型 SNode(Object item) { this.item = item; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } // ... }
上述SNode結點的定義中有個mode
字段,表示結點的類型。TransferStack一共定義了三種結點類型,任何線程對TransferStack的操做都會建立下述三種類型的某種結點:性能優化
static final class TransferStack<E> extends Transferer<E> { /** * 未配對的消費者 */ static final int REQUEST = 0; /** * 未配對的生產者 */ static final int DATA = 1; /** * 配對成功的消費者/生產者 */ static final int FULFILLING = 2; volatile SNode head; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } // ... }
SynchronousQueue的入隊操做調用了put方法:數據結構
/** * 入隊指定元素e. * 若是沒有另外一個線程進行出隊操做, 則阻塞該入隊線程. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }
SynchronousQueue的出隊操做調用了take方法:併發
/** * 出隊一個元素. * 若是沒有另外一個線程進行出隊操做, 則阻塞該入隊線程. */ public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
能夠看到,SynchronousQueue同樣不支持null元素,實際的入隊/出隊操做都是委託給了transfer方法,該方法返回null表示出/入隊失敗(一般是線程被中斷或超時):框架
/** * 入隊/出隊一個元素. */ E transfer(E e, boolean timed, long nanos) { SNode s = null; // s表示新建立的結點 // 入參e==null, 說明當前是出隊線程(消費者), 不然是入隊線程(生產者) // 入隊線程建立一個DATA結點, 出隊線程建立一個REQUEST結點 int mode = (e == null) ? REQUEST : DATA; for (; ; ) { // 自旋 SNode h = head; if (h == null || h.mode == mode) { // CASE1: 棧爲空 或 棧頂結點類型與當前mode相同 if (timed && nanos <= 0) { // case1.1: 限時等待的狀況 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 將當前結點壓入棧 SNode m = awaitFulfill(s, timed, nanos); // 阻塞當前調用線程 if (m == s) { // 阻塞過程當中被中斷 clean(s); return null; } // 此時m爲配對結點 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入隊線程null, 出隊線程返回配對結點的值 return (E) ((mode == REQUEST) ? m.item : s.item); } // 執行到此處說明入棧失敗(多個線程同時入棧致使CAS操做head失敗),則進入下一次自旋繼續執行 } else if (!isFulfilling(h.mode)) { // CASE2: 棧頂結點還未配對成功 if (h.isCancelled()) // case2.1: 元素取消狀況(因中斷或超時)的處理 casHead(h, h.next); else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // case2.2: 將當前結點壓入棧中 for (; ; ) { SNode m = s.next; // s.next指向原棧頂結點(也就是與當前結點匹配的結點) if (m == null) { // m==null說明被其它線程搶先匹配了, 則跳出循環, 從新下一次自旋 casHead(s, null); s = null; break; } SNode mn = m.next; if (m.tryMatch(s)) { // 進行結點匹配 casHead(s, mn); // 匹配成功, 將匹配的兩個結點所有彈出棧 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 } else // 匹配失敗 s.casNext(m, mn); // 移除原待匹配結點 } } } else { // CASE3: 其它線程正在匹配 SNode m = h.next; if (m == null) // 棧頂的next==null, 則直接彈出, 從新進入下一次自旋 casHead(h, null); else { // 嘗試和其它線程競爭匹配 SNode mn = m.next; if (m.tryMatch(h)) casHead(h, mn); // 匹配成功 else h.casNext(m, mn); // 匹配失敗(被其它線程搶先匹配成功了) } } } }
整個transfer方法考慮了限時等待的狀況,且入隊/出隊其實都是調用了同一個方法,其主幹邏輯就是在一個自旋中完成如下三種狀況之一的操做,直到成功,或者被中斷或超時取消:性能
爲了便於理解,咱們來看下面這個調用示例(假設不考慮限時等待的狀況),假設一共有三個線程ThreadA、ThreadB、ThreadC:
①初始棧結構
初始棧爲空,head
爲棧頂指針,始終指向棧頂結點:
②ThreadA(生產者)執行入隊操做
因爲此時棧爲空,因此ThreadA會進入CASE1,建立一個類型爲DATA
的結點:
if (h == null || h.mode == mode) { // CASE1: 棧爲空 或 棧頂結點類型與當前mode相同 if (timed && nanos <= 0) { // case1.1: 限時等待的狀況 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 將當前結點壓入棧 SNode m = awaitFulfill(s, timed, nanos); // 阻塞當前調用線程 if (m == s) { // 阻塞過程當中被中斷 clean(s); return null; } // 此時m爲配對結點 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入隊線程null, 出隊線程返回配對結點的值 return (E) ((mode == REQUEST) ? m.item : s.item); } // 執行到此處說明入棧失敗(多個線程同時入棧致使CAS操做head失敗),則進入下一次自旋繼續執行 }
CASE1分支中,將結點壓入棧後,會調用awaitFulfill
方法,該方法會阻塞調用線程:
/** * 阻塞當前調用線程, 並將線程信息記錄在s.waiter字段上. * * @param s 等待的結點 * @return 返回配對的結點 或 當前結點(說明線程被中斷了) */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 性能優化操做(計算自旋次數) int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (; ; ) { if (w.isInterrupted()) s.tryCancel(); /** * s.match保存當前結點的匹配結點. * s.match==null說明尚未匹配結點 * s.match==s說明當前結點s對應的線程被中斷了 */ SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins - 1) : 0; else if (s.waiter == null) // 尚未匹配結點, 則保存當前線程 s.waiter = w; // s.waiter保存當前阻塞線程 else if (!timed) LockSupport.park(this); // 阻塞當前線程 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
此時棧結構以下,結點的waiter
字段保存着建立該結點的線程ThreadA,ThreadA等待着被配對消費者線程喚醒:
③ThreadB(生產者)執行入隊操做
此時棧頂結點的類型和ThreadB建立的結點相同(都是DATA
類型的結點),因此依然走CASE1分支,直接將結點壓入棧:
④ThreadC(消費者)執行出隊操做
此時棧頂結點的類型和ThreadC建立的結點匹配(棧頂DATA
類型,ThreadC建立的是REQUEST
類型),因此走CASE2分支,該分支會將匹配的兩個結點彈出棧:
else if (!isFulfilling(h.mode)) { // CASE2: 棧頂結點還未配對成功 if (h.isCancelled()) // case2.1: 元素取消狀況(因中斷或超時)的處理 casHead(h, h.next); else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // case2.2: 將當前結點壓入棧中 for (; ; ) { SNode m = s.next; // s.next指向原棧頂結點(也就是與當前結點匹配的結點) if (m == null) { // m==null說明被其它線程搶先匹配了, 則跳出循環, 從新下一次自旋 casHead(s, null); s = null; break; } SNode mn = m.next; if (m.tryMatch(s)) { // 進行結點匹配 casHead(s, mn); // 匹配成功, 將匹配的兩個結點所有彈出棧 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 } else // 匹配失敗 s.casNext(m, mn); // 移除原待匹配結點 } } }
上述isFulfilling方法就是判斷結點是否匹配:
/** * 判斷m是否已經配對成功. */ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
ThreadC建立結點並壓入棧後,棧的結構以下:
此時,ThreadC會調用tryMatch方法進行匹配,該方法的主要做用有兩點:
match
字段置爲與當前配對的結點(如上圖中,結點m是待配對結點,最終m.math == s
)/** * 嘗試將當前結點和s結點配對. */ boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // 喚醒當前結點對應的線程 waiter = null; LockSupport.unpark(w); } return true; } return match == s; // 配對成功返回true }
匹配完成後,會將匹配的兩個結點彈出棧,並返回匹配值:
if (m.tryMatch(s)) { // 進行結點匹配 casHead(s, mn); // 匹配成功, 將匹配的兩個結點所有彈出棧 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 }
最終,ThreadC拿到了等待配對結點中的數據並返回,此時棧的結構以下:
注意: CASE2分支中ThreadC建立的結點的mode值並非REQUEST,其mode值爲FULFILLING | mode
,FULFILLING | mode
的主要做用就是給棧頂結點置一個標識(二進制爲11或10), 表示當前有線程正在對棧頂匹配,這時若是有其它線程進入自旋(併發狀況),則CASE2必定失敗,由於isFulfilling
的結果必然爲true,因此會進入 CASE3分支——跳過棧頂結點進行匹配。
casHead(h, s = snode(s, e, h, FULFILLING | mode))
⑤ThreadB(生產者)喚醒後繼續執行
ThreadB被喚醒後,會從原阻塞處繼續執行,並進入下一次自旋,在下一次自旋中,因爲結點的match
字段已經有了匹配結點,因此直接返回配對結點:
/** * 阻塞當前調用線程, 並將線程信息記錄在s.waiter字段上. * * @param s 等待的結點 * @return 返回配對的結點 或 當前結點(說明線程被中斷了) */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 性能優化操做(計算自旋次數) int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (; ; ) { if (w.isInterrupted()) s.tryCancel(); /** * s.match保存當前結點的匹配結點. * s.match==null說明尚未匹配結點 * s.match==s說明當前結點s對應的線程被中斷了 */ SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins - 1) : 0; else if (s.waiter == null) // 尚未匹配結點, 則保存當前線程 s.waiter = w; // s.waiter保存當前阻塞線程 else if (!timed) LockSupport.park(this); // 阻塞當前線程 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
最終,在下面分支中返回:
else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 將當前結點壓入棧 SNode m = awaitFulfill(s, timed, nanos); // 阻塞當前調用線程 if (m == s) { // 阻塞過程當中被中斷 clean(s); return null; } // 此時m爲配對結點 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入隊線程null, 出隊線程返回配對結點的值 return (E) ((mode == REQUEST) ? m.item : s.item); }
注意:對於 入隊線程(生產者),返回的是它入隊時攜帶的 原有元素值。
SynchronousQueue的公平策略由TransferQueue類實現,TransferQueue內部定義了名爲QNode
的結點,一個head
隊首指針,一個tail
隊尾指針:
/** * Dual Queue(雙端隊列). * 公平策略時使用. */ static final class TransferQueue<E> extends Transferer<E> { /** * Head of queue */ transient volatile QNode head; /** * Tail of queue */ transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it was cancelled. */ transient volatile QNode cleanMe; /** * 隊列結點定義. */ static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; // ... } // ... }
關於TransferQueue的transfer方法就再也不贅述了,其思路和TransferStack大體相同,總之就是入隊/出隊必須一一匹配,不然任意一方就會加入隊列並等待匹配線程喚醒。讀者能夠自行閱讀TransferQueued的源碼。
TransferQueue主要用於線程之間的數據交換,因爲採用無鎖算法,其性能通常比單純的其它阻塞隊列要高。它的最大特色時不存儲實際元素,而是在內部經過棧或隊列結構保存阻塞線程。後面咱們講JUC線程池框架的時候,還會再次看到它的身影。