系列傳送門:java
SynchronousQueue是一個不存儲元素的阻塞隊列,每一個插入的操做必須等待另外一個線程進行相應的刪除操做,反之亦然,所以這裏的Synchronous指的是讀線程和寫線程須要同步,一個讀線程匹配一個寫線程。node
你不能在該隊列中使用peek方法,由於peek是隻讀取不移除,不符合該隊列特性,該隊列不存儲任何元素,數據必須從某個寫線程交給某個讀線程,而不是在隊列中等待倍消費,很是適合傳遞性場景。面試
SynchronousQueue的吞吐量高於LinkedBlockingQueue和ArrayBlockingQueue。編程
該類還支持可供選擇的公平性策略,默認採用非公平策略,當隊列可用時,阻塞的線程均可以爭奪訪問隊列的資格。併發
public class TestSync { public static void main (String[] args) { SynchronousQueue<Integer> queue = new SynchronousQueue<>(true); Producer producer = new Producer(queue); Customer customer = new Customer(queue); producer.start(); customer.start(); } } class Producer extends Thread{ SynchronousQueue<Integer> queue; Producer(SynchronousQueue<Integer> queue){ this.queue = queue; } @SneakyThrows @Override public void run () { while(true){ int product = new Random().nextInt(500); System.out.println("生產產品, id : " + product); System.out.println("等待3s後給消費者消費..."); TimeUnit.SECONDS.sleep(3); queue.put(product); TimeUnit.MILLISECONDS.sleep(100); } } } class Customer extends Thread{ SynchronousQueue<Integer> queue; Customer(SynchronousQueue<Integer> queue){ this.queue = queue; } @SneakyThrows @Override public void run () { while(true){ Integer product = queue.take(); System.out.println("消費產品, id : " + product); System.out.println(); } } } // 打印結果 生產產品, id : 194 等待3s後給消費者消費... 消費產品, id : 194 生產產品, id : 140 等待3s後給消費者消費... 消費產品, id : 140 生產產品, id : 40 等待3s後給消費者消費... 消費產品, id : 40
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // put方法 : e是生產者傳遞給消費者的元素 if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }
public E take() throws InterruptedException { // take方法: 表示消費者等待生產者提供元素 E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
put方法和take方法都調用了transferer的transfer方法,他們的區別在哪呢?咱們能夠發現:app
這一點必須明確,transfer是根據這一點來判斷讀or寫線程,接着決定是否匹配等,直接來看下Transfer類吧。dom
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private transient volatile Transferer<E> transferer; }
SynchronousQueue內部維護了volatile修飾的Transferer變量,它的核心操做都將委託給transferer。ide
abstract static class Transferer<E> { /** * Performs a put or take. */ abstract E transfer(E e, boolean timed, long nanos); }
Transferer類中定義了抽象方法transfer,該方法用於轉移元素,是最最核心的方法,咱們先大概瞭解一下定義:工具
// 默認使用非公平策略 public SynchronousQueue() { this(false); } /** * 指定公平策略, */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
能夠發現,在構造SynchronousQueue的時候,能夠傳入fair參數指定公平策略,有下面兩種選擇:oop
他倆即是Transfer類的實現,SynchronousQueue相關操做也都是基於這倆類的,咱們接下來將會重點分析這倆的實現。
static final class TransferQueue<E> extends Transferer<E> { static final class QNode{...} transient volatile QNode head; transient volatile QNode tail; transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); // 初始化虛擬頭節點 head = h; tail = h; }
QNode定義了隊列中存放的節點:
static final class QNode { volatile QNode next; // next域 volatile Object item; // 存放數據,用CAS設置 volatile Thread waiter; // 標記在該節點上等待的線程是哪一個 final boolean isData; // isData == true表示寫線程節點 QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } // ...省略一系列CAS方法 }
E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed // 判斷當前節點的模式 boolean isData = (e != null); // 循環 for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin // 隊列爲空 或 當前節點和隊列尾節點類型相同,則將節點入隊 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; // 說明有其餘節點入隊,致使讀到的tail不一致,continue if (t != tail) // inconsistent read continue; // 有其餘節點入隊,可是tail是一致的,嘗試將tn設置爲尾節點,continue if (tn != null) { // lagging tail advanceTail(t, tn); // 若是tail爲t,設置爲tn continue; } // timed == true 而且超時了, 直接返回null if (timed && nanos <= 0) // can't wait return null; // 構建一個新節點 if (s == null) s = new QNode(e, isData); // 將當前節點插入到tail以後,如不成功,則continue if (!t.casNext(null, s)) // failed to link in continue; // 將當前節點設置爲新的tail advanceTail(t, s); // swing tail and wait // 這個方法下面會分析:自旋或阻塞線程,直到知足s.item != e Object x = awaitFulfill(s, e, timed, nanos); // x == s 表示節點被取消、中斷或超時 if (x == s) { // wait was cancelled clean(t, s); return null; } // isOffList用於判斷節點是否已經出隊 next == this if (!s.isOffList()) { // not already unlinked // 嘗試將s節點設置爲head advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; // 隊列不爲空 且節點類型不一樣,一個讀一個寫,就能夠匹配了 } else { // complementary-mode // 隊頭節點 QNode m = h.next; // node to fulfill // 這裏若是其餘線程對隊列進行了操做,就從新再來 if (t != tail || m == null || h != head) continue; // inconsistent read // 下面是出隊的代碼 Object x = m.item; //isData == (x != null) 判斷isData的類型是否和隊頭節點類型相同 // x == m 表示m被取消了 // !m.casItem(x, e))表示將e設置爲m的item失敗 if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS // 上面三種狀況,任意一種發生,都進行h的出隊操做,m變成head,而後重試 advanceHead(h, m); // dequeue and retry continue; } // 匹配成功,將m變爲head,虛擬節點 advanceHead(h, m); // successfully fulfilled // 喚醒在m上等待的線程 LockSupport.unpark(m.waiter); // 獲得數據 return (x != null) ? (E)x : e; } } }
這個方法將會進行自旋或者阻塞,直到知足某些條件。
//Spins/blocks until node s is fulfilled. Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 計算須要自旋的次數 // 若是剛好 s 正好是第一個加入的節點,則會自旋一段時間,避免阻塞,提升效率 // 由於其餘狀況是會涉及到 park掛起線程的 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // w爲當前線程,若是被中斷了,則取消該節點 if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; // 知足這個條件,纔會退出循環,也是惟一的出口 // 若是 線程一、被阻塞,接着喚醒或者二、中斷了,x != e 就會成立 if (x != e) return x; // 若是設置了timed,須要判斷一下是否超時 if (timed) { nanos = deadline - System.nanoTime(); // 若是超時,取消該節點,continue,下一次在 x!=e時退出循環 if (nanos <= 0L) { s.tryCancel(e); continue; } } // 每次減小自旋次數 if (spins > 0) --spins; // 次數用完了,設置一下s的等待線程爲當前線程 else if (s.waiter == null) s.waiter = w; // 沒有超時設置的阻塞 else if (!timed) LockSupport.park(this); // 剩餘時間小於spinForTimeoutThreshold的時候,自旋性能的效率更高 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
這邊總結一下一些注意點:
取消操做其實就是將節點的item設置爲this,
void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } boolean isCancelled() { return item == this; }
也就是說,若是一旦執行了tryCancel操做【中斷,取消,超時】,退出awaitFulfill以後,必定知足:
// x == s 表示節點被取消、中斷或超時 if (x == s) { // wait was cancelled clean(t, s); return null; }
會執行clean方法清理s節點:
void clean(QNode pred, QNode s) { s.waiter = null; // 清除thread引用 /* * 不管什麼時候,隊列中的最後一個節點都沒法刪除,所以使用cleanMe保存它的前驅 */ while (pred.next == s) { QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head // 隊頭被取消的狀況,出隊 if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail if (t == h) // 隊列此時爲空,就退出了 return; QNode tn = t.next; if (t != tail) // 隊尾併發改變了 continue; // tn一直定位到爲null if (tn != null) { advanceTail(t, tn); continue; } // 這裏 s!= t 表示沒有到要刪除的元素不是最後一個, // 那麼直接將pred.next = s.next就能夠了 if (s != t) { // If not tail, try to unsplice QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) // 刪除完畢,退出 return; } // 走到這裏,說明須要刪除的s節點是隊尾節點,須要使用cleanMe QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node // d這裏指的就是 要刪除的節點 QNode d = dp.next; QNode dn; if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); // 清除cleanMe if (dp == pred) return; // s is already saved node // 該分支將dp定位到 pred的位置【第一次應該都會走到這】 } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }
注意:不管什麼時候, 最後插入的節點不能被刪除,由於直接刪除會存在併發風險,當節點s是最後一個節點時, 將s.pred保存爲cleamMe節點,下次再進行清除操做。
transfer就是在一個循環中,不斷地去作下面這些事情:
注意:不管是上面哪一種狀況,都會不斷檢測是否有其餘線程在進行操做,若是有的話,會幫助其餘線程執行入隊出隊操做。
TransferStack就大體過一下吧:
static final class TransferStack<E> extends Transferer<E> { // 表示一個未匹配的消費者 static final int REQUEST = 0; // 表明一個未匹配的生產者 static final int DATA = 1; // 表示匹配另外一個生產者或消費者 static final int FULFILLING = 2; // 頭節點 volatile SNode head; // SNode節點定義 static final class SNode {...}
static final class SNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. SNode(Object item) { this.item = item; } }
E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; // e爲null表示讀,非null表示寫 for (;;) { SNode h = head; // 若是棧爲空,或者節點模式和頭節點模式相同, 將節點壓入棧 if (h == null || h.mode == mode) { // empty or same-mode // 處理超時 if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) // 頭節點彈出 casHead(h, h.next); // pop cancelled node else return null; //未超時狀況,生成snode節點,嘗試將s設置爲頭節點 } else if (casHead(h, s = snode(s, e, h, mode))) { // 自旋,等待線程匹配 SNode m = awaitFulfill(s, timed, nanos); // 表示節點被取消、或中斷、或超時 if (m == s) { // wait was cancelled // 清理節點 clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 若是是請求數據,則返回匹配的item, 不然返回s的item return (E) ((mode == REQUEST) ? m.item : s.item); } // 棧不爲空, 且模式不相等,說明是一對匹配的節點 // 嘗試用節點s 去知足 h, 這裏判斷 (m & FULFILLING) == 0會走這個分支 } else if (!isFulfilling(h.mode)) { // try to fulfill // h已經被取消了 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // 將當前節點 標記爲FULFILLING, 並設置爲head else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear // 這裏m是頭節點 SNode m = s.next; // m is s's match // 說明被其餘線程搶走了,從新設置head if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } // 獲得與m匹配的節點 SNode mn = m.next; // 嘗試去匹配,匹配成功會喚醒等待的線程 if (m.tryMatch(s)) { // 匹配成功,兩個都彈出 casHead(s, mn); // pop both s and m // 返回數據節點的值 m.item return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } // 走到這,表示有其餘線程在進行配對(m & FULFILLING) != 0 // 幫助進行匹配,接着執行出棧操做 } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
transfer方法其實就是在一個循環中持續地去作下面三件事情:
FULFILLING
標記,將當前線程壓入棧頂,和棧中的節點進行匹配,匹配成功,出棧這兩個節點。isFulfilling(h.mode)
,則幫助它進行匹配並出棧,再執行後續操做。SynchronousQueue是一個不存儲元素的阻塞隊列,每一個插入的操做必須等待另外一個線程進行相應的刪除操做,反之亦然,所以這裏的Synchronous指的是讀線程和寫線程須要同步,一個讀線程匹配一個寫線程。
該類還支持可供選擇的公平性策略,針對不一樣的公平性策略有兩種不一樣的Transfer實現,TransferQueue實現公平模式和TransferStack實現非公平模式。
take和put操做都調用了transfer核心方法,根據傳入的參數e是否爲null來對應處理。
最後:Synchronous好抽象啊,好難懂,有不少地方畫了圖也是很難理解,若有不足,望評論區指教。
《Java併發編程的藝術》
《Java併發編程之美》