SynchronousQueue 是一個同步阻塞隊列,它的每一個插入操做都要等待其餘線程相應的移除操做,反之亦然。SynchronousQueue 像是生產者和消費者的會合通道,它比較適合「切換」或「傳遞」這種場景:一個線程必須同步等待另一個線程把相關信息/時間/任務傳遞給它。java
SynchronousQueue(後面稱SQ)內部沒有容量,因此不能經過 peek 方法獲取頭部元素;也不能單獨插入元素,能夠簡單理解爲它的插入和移除是「一對」對稱的操做。爲了兼容 Collection 的某些操做(例如contains),SQ 扮演了一個空集合的角色。node
SQ 的一個典型應用場景是在線程池中,Executors.newCachedThreadPool() 就使用了它,這個構造使線程池根據須要(新任務到來時)建立新的線程,若是有空閒線程則會重複使用,線程空閒了 60s 後會被回收。算法
SQ 實現原理參考:http://ifeve.com/java-synchronousqueue/數據結構
阻塞算法實現一般在內部採用一個鎖來保證多個線程中的 put() 和 take() 方法是串行執行的。採用鎖的開銷是比較大的,還會存在一種狀況是線程 A 持有線程 B 須要的鎖,B 必須一直等待 A 釋放鎖,即便 A 可能一段時間內由於 B 的優先級比較高而得不到時間片運行。因此在高性能的應用中咱們經常但願規避鎖的使用。併發
public class NativeSynchronousQueue<E> { boolean putting = false; E item = null; public synchronized E take() throws InterruptedException { while (item == null) wait(); E e = item; item = null; notifyAll(); return e; } public synchronized void put(E e) throws InterruptedException { if (e==null) return; while (putting) wait(); putting = true; item = e; notifyAll(); while (item!=null) wait(); putting = false; notifyAll(); } }
經典同步隊列實現採用了三個信號量,代碼很簡單,比較容易理解:ide
public class SemaphoreSynchronousQueue<E> { E item = null; Semaphore sync = new Semaphore(0); Semaphore send = new Semaphore(1); Semaphore recv = new Semaphore(0); public E take() throws InterruptedException { recv.acquire(); E x = item; sync.release(); send.release(); return x; } public void put (E x) throws InterruptedException{ send.acquire(); item = x; recv.release(); sync.acquire(); } }
SQ 爲等待過程當中的生產者或消費者線程提供可選的公平策略(默認非公平模式)。非公平模式經過棧(LIFO)實現,公平模式經過隊列(FIFO)實現。使用的數據結構是雙重隊列(Dual queue)和雙重棧(Dual stack)。FIFO 一般用於支持更高的吞吐量,LIFO 則支持更高的線程局部存儲(TLS)。源碼分析
// 生產者 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } // 消費者 public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
put 和 take 都是直接委託 transferer 完成的。本節以公平式 TransferQueue 爲例分析 JDK8 的實現原理。性能
以上是 TransferQueue 的大體結構,能夠看到 TransferQueue 是一個普通的隊列,同時存在一個指向隊列頭部的指針 head,和一個指向隊列尾部的指針 tail;cleanMe 的存在主要是解決不可清楚隊列的尾節點的問題;隊列的節點經過內部類 QNode 封裝,QNode 是一個單鏈表結構,包含四個變量:優化
static final class QNode { volatile Object item; // 節點包含的數據,非空表示生產者,空者是消費者 final boolean isData; // 表示該節點由生產者建立仍是由消費者建立,生產者true,消費者false volatile Thread waiter; // 等待在該節點上的線程。to control park/unpark volatile QNode next; // 指向隊列中的下一個節點 }
SQ 的阻塞算法能夠歸結爲如下幾點:ui
(1) 雙重隊列
和典型的單向鏈表結構不一樣,SQ 使用了雙重隊列(Dual queue)和雙重棧(Dual stack)存儲數據,隊列中的每一個節點均可以是一個生產者或是消費者。
在消費者獲取元素時,若是隊列爲空,當前消費者就會做爲一個「元素爲null」的節點被放入隊列中等待,因此 QNode 中 的節點存儲了生產者節點(item!=null & isData=true)和消費者節點(item=null & isData=false),這兩種節點就是經過 isData 來區分的。但同一時間鏈表中要麼全是生產者,要麼全是消費者。
(2) 節點匹配
節點命中後修改 item 的狀態,已取消節點引用指向自身,避免垃圾保留和內存損耗。經過自旋和 LockSupport 的 park/unpark 實現阻塞,在高爭用環境下,自旋能夠顯著提升吞吐量。
若是全是生產者線程,當消費者線程調用 take 時會匹配鏈表中的元素,將第一個生產者線程節點 node 出隊,也就是 transfer 的過程。數據從一個線程 transfer 到另外一個線程,同時修改該節點 node 的狀態。若是全是消費者線程亦然。
以生產者線程入隊爲例:
下面主要分析 TransferQueue 的三個重要方法:transfer、awaitFulfill、clean。這三個方法是 TransferQueue 的核心,入口是 transfer(),下面具體看代碼。
// 生產者e!=null,消費者e=null。timed=true表示超時等待,不然無限等待 E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin // 1.1 h==t 表示尚未節點入隊 // 1.2 isData==isData 表示該隊列中的等待的線程與當前線程是相同模式 // (同爲生產者,或者同爲消費者,隊列中只存在一種模式的線程) // 總之只有生產者或只有消費者時,須要將該線程插入到隊列中進行等待 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail) // 其它線程修改了尾節點,continue continue; if (tn != null) { // 其它線程有節點入隊,幫助其它線程修改尾節點 tail advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait return null; if (s == null) // 僅初始化一次s,經過區分isData生產者和消費者 s = new QNode(e, isData); // 2. 最重要的一步,上面判斷了這麼多數據不一致的狀況,最終完成節點入隊,失敗重試。 // 其實上面兩個 continue 不執行也沒有關係,大不了在這一步失敗後重試 // t 若是不是尾節點 next 確定不爲空。要麼指定本身(失效),要麼指向下一個節點。 if (!t.casNext(null, s)) // failed to link in continue; // 執行失敗沒有關係,會有其餘線程幫忙執行完成的 ok advanceTail(t, s); // swing tail and wait // 3. 等待其它線程匹配。二種狀況:一是匹配完成,返回數據;二是等待超時/取消,返回原節點s Object x = awaitFulfill(s, e, timed, nanos); // 3.1 等待超時/取消,返回原節點s if (x == s) { // wait was cancelled clean(t, s); return null; } // 3.2 匹配成功了,可是還須要將該節點從隊列中移除 if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; // 4. 若是隊列已有線程在等待,直接進行匹配便可 } else { // complementary-mode // 進行匹配,從隊列的頭部開始,即head.next QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read // 5.1 前面已經說過匹配成功會修改 item,併發時可能頭節點已經匹配過了 // isData == (x != null) 相等則說明 m 已經匹配過了,由於正常狀況是不相等纔對 // 5.2 x==m 說明 m 被取消了,見 QNode#tryCancel() // 5.3 CAS失敗說明 m 已經被其餘線程匹配了,因此將其出隊,而後 retry // CAS設置m.item爲e,這裏的e,若是是生產者則是數據,消費者則是null, // 因此m若是是生產者,則item變爲null,消費者則變爲生產者的數據 Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } // 6. 與m匹配成功,將m出隊,並喚醒等待在m上的線程m.waiter // 同上,失敗則說明有其它線程修改了頭節點 ok advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
從上面的代碼能夠看出 TransferQueue.transfer() 的總體流程:
// advanceHead 更新頭節點並將失效的頭節點踢出隊列(h.next = h) void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next }
/** * 等待匹配,該方法會進入阻塞,直到三種狀況下才返回: * a. 超時被取消了,返回值爲 s * b. 匹配上了,返回另外一個線程傳過來的值 * c. 線程被打斷,會取消,返回值爲 s */ Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 1. 自旋鎖次數,若是不是隊列的第一個元素則不自旋,由於壓根輪不上他,自旋只是浪費 CPU // 若是等待的話則自旋的次數少些,不等待就多些 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 2. 支持打斷 if (w.isInterrupted()) s.tryCancel(e); // 3. 若是s的item不等於e,有如下二種狀況,不論是哪一種狀況都不要再等待了,返回便可 // a. 超時或線程被打斷了,此時x==s // b. 匹配上了,此時x==另外一個線程傳過來的值 Object x = s.item; if (x != e) return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } if (spins > 0) // 自旋,直到spins==0,進入等待 --spins; else if (s.waiter == null) s.waiter = w; // 設置等待線程才能被喚醒 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
awaitFulfill() 主要涉及自旋以及 LockSupport.park() 兩個關鍵點,自旋可去了解自旋鎖的原理。
自旋鎖原理:經過空循環則霸佔着 CPU,避免當前線程進入睡眠,由於睡眠/喚醒是須要進行線程上下文切換的,因此若是線程睡眠的時間很段,那麼使用空循環可以避免線程進入睡眠的耗時,從而快速響應。可是因爲空循環會浪費 CPU,因此也不能一直循環。自旋鎖通常適合同步快很小,競爭不是很激烈的場景。
java 中大量運用了這樣的技術。凡有阻塞的操做都會這樣作,包括內置鎖在內,內置鎖其實也是這樣的,內置鎖分爲偏向鎖,輕量級鎖和重量級鎖,其中輕量級鎖其實就是自旋來替代阻塞。
固然須要自旋多長時間。這是一個根據不一樣狀況來設定的值並無一個準確的結論,一般來講競爭越激烈這樣多自旋一段時間老是好的,效果也越明顯,可是自旋時間過長會浪費 cpu 時間因此,設定時間仍是一個很依靠經驗的值。
在這裏實際上是這樣作的,首先看一下當前 cpu 的數量,NCPUS 而後分兩種狀況一種是設定了時間限的自旋時間。若是設定了時間限則使用 maxTimedSpins,若是 NCPUS 數量大於等於 2 則設定爲爲 32 不然爲 0,既一個 CPU 時不自旋;這是顯然了,由於惟一的 cpu 在自旋顯然不能進行其餘操做來知足條件。 若是沒有設定時間限則使用 maxUntimedSpins,若是 NCPUS 數量大於等於 2 則設定爲爲 32 * 16,不然爲 0;
另外還有一個參數 spinForTimeoutThreshold 這個參數是爲了防止自定義的時間限過長,而設置的,若是設置的時間限長於這個值則取這個 spinForTimeoutThreshold 爲時間限。這是爲了優化而考慮的。這個的單位爲納秒。
大概總結一下 clean 方法在作什麼?。
首先,這裏的隊列實際上是單向鏈表。因此只能設置後繼的節點而不能設置前向的節點,這會產生一個問題,就是加入隊列尾的節點失效了要刪除怎麼辦?咱們沒辦法引用隊列尾部倒數第二個節點。因此這裏採用了一個方法就是講當前的尾結點保存問 cleanMe 節點,這樣在下次再次清除的時候一般 cleanMe 一般就不是尾結點了,這樣就能夠刪除了。也就是每次調用的時候刪除的實際上是上次須要結束的節點。更多關於清除節點 clean
參考:
天天用心記錄一點點。內容也許不重要,但習慣很重要!