今天繼續來說解阻塞隊列,一個比較特殊的阻塞隊列SynchronousQueue,經過Executors框架提供的線程池cachedThreadPool中咱們能夠看到其被使用做爲可緩存線程池的隊列實現,下面經過源碼來了解其內部實現,便於後面幫助咱們更好的使用線程池java
JDK版本號:1.8.0_171
synchronousQueue是一個沒有數據緩衝的阻塞隊列,生產者線程的插入操做put()必須等待消費者的刪除操做take(),反過來也同樣。固然,也能夠不進行等待直接返回,例如poll和offernode
在使用上很好理解,每次操做都須要找到對應的匹配操做,如A線程經過put插入操做填入值1,若是無其餘線程操做則須要阻塞等待一個線程執行take操做A線程才能繼續,反過來一樣道理,這樣看彷佛synchronousQueue是沒有隊列進行保存數據的,每次操做都在等待其互補操做一塊兒執行緩存
這裏和其餘阻塞隊列不一樣之處在於,內部類將入隊出隊操做統一封裝成了一個接口實現,內部類數據保存的是每一個操做動做,好比put操做,保存插入的值,並根據標識來判斷是入隊仍是出隊操做,若是是take操做,則值爲null,經過標識符能判斷出來是出隊操做框架
多思考下,咱們須要找到互補的操做必然須要一個公共的區域來判斷已經發生的全部操做,內部類就是用來進行這些操做的,SynchronousQueue分爲公平策略(FIFO)和非公平策略(LIFO),兩種策略分別對應其兩個內部類實現,公平策略使用隊列結構實現,非公平策略使用棧結構實現ide
因爲篇幅過長,本篇先說明SynchronousQueue相關知識和公平策略下的實現類TransferQueue,下篇將說明非公平策略下的實現類TransferStack和其餘知識ui
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
/** The number of CPUs, for spin control */ // cpu數量,會在自旋控制時使用 static final int NCPUS = Runtime.getRuntime().availableProcessors(); // 自旋次數,指定了超時時間時使用,這個常量配合CAS操做使用,至關於循環次數 // 若是CAS操做失敗,則根據這個參數判斷繼續循環 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; // 自旋次數,未指定超時時間時使用 static final int maxUntimedSpins = maxTimedSpins * 16; /** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices. */ // 自旋超時時間閾值,在設置的時間超過這個時間時以這個時間爲準,單位,納秒 static final long spinForTimeoutThreshold = 1000L; // 後進先出隊列和先進先出隊列 @SuppressWarnings("serial") static class WaitQueue implements java.io.Serializable { } static class LifoWaitQueue extends WaitQueue { private static final long serialVersionUID = -3633113410248163686L; } static class FifoWaitQueue extends WaitQueue { private static final long serialVersionUID = -3623113410248163686L; } // 序列化操做使用 private ReentrantLock qlock; private WaitQueue waitingProducers; private WaitQueue waitingConsumers; /** * The transferer. Set only in constructor, but cannot be declared * as final without further complicating serialization. Since * this is accessed only at most once per public method, there * isn't a noticeable performance penalty for using volatile * instead of final here. */ // 全部的隊列操做都經過transferer來執行,統一方法執行 // 初始化時會根據所選的策略實例化對應的內部實現類 private transient volatile Transferer<E> transferer;
從上邊也能看出沒有設置變量來保存入隊出隊操做的數據,統一操做方法都放置到了Transferer中this
構造方法很清晰,根據所選的策略實現對應的Transferer內部接口實現類來進行隊列操做idea
// 默認非公平策略 public SynchronousQueue() { this(false); } // 可選策略,經過兩個內部類TransferQueue和TransferStack來實現公平策略(隊列)和非公平策略(棧) public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
抽象內部類Transferer,transfer方法用來替代put和take操做,每一個參數解釋以下:spa
返回值:非空則代表操做成功,返回消費的item或生產的item;空則代表因爲超時或中斷引發操做失敗。調用者能夠經過檢查Thread.interrupted判斷是哪一種緣由線程
/** * Shared internal API for dual stacks and queues. */ abstract static class Transferer<E> { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. * @param timed if this operation should timeout * @param nanos the timeout, in nanoseconds * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. */ abstract E transfer(E e, boolean timed, long nanos); }
入隊操做經過內部類調用transfer,傳參含義以下已在上面內部抽象類中說明,入隊元素e非空
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) return true; if (!Thread.interrupted()) // 超時返回false return false; // 線程中斷拋錯 throw new InterruptedException(); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); return transferer.transfer(e, true, 0) != null; }
出隊操做經過內部類調用transfer,入隊元素e爲null
public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } public E poll() { return transferer.transfer(null, true, 0); }
其餘方法以空隊列爲標準進行處理,好比隊列長度直接返回0,判空老是返回true,其餘方法相似,直接參考源碼,比較簡單,很少說
上面已經看到了最重要的核心方法在於transferer.transfer方法,那麼其具體的實現類中這個方法是如何實現的呢?
先說明公平策略下的實現類TransferQueue
基於Transferer實現公平策略下的實現類TransferQueue,既然是公平策略,則須要先進先出,這裏queue也代表其結構特色,內部經過QNode類實現鏈表的隊列形態,經過CAS操做更新鏈表元素
有兩種狀態須要注意:
QNode即爲隊列的鏈表實現,其中的變量屬性isData也能夠看出其保存的是每次的操做動做而不只僅是入隊的值,入隊操做會以QNode保存,出隊操做一樣會以QNode保存,同時變量都是經過CAS操做更新
static final class QNode { // next指向鏈表下一個節點 volatile QNode next; // next node in queue // 隊列元素的值 volatile Object item; // CAS'ed to or from null // 保存等待的線程 volatile Thread waiter; // to control park/unpark // 是否有數據,隊列元素的類型標識,入隊時有數據值爲true,出隊時無數據值爲false final boolean isData; QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } // cas操做更新next boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // cas操做更新item boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // cas操做取消操做,將當前的QNode的item賦值爲當前的QNode void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } // 判斷是否取消成功,緊跟着tryCancel操做後進行判斷 boolean isCancelled() { return item == this; } // 判斷當前節點是否已處於離隊狀態,這裏看到是將節點next指向本身 boolean isOffList() { return next == this; } // 獲取item和next的偏移量,操做CAS使用 // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
隊頭隊尾元素引用設置,須要注意的是cleanMe節點的含義,在具體方法操做中會進行說明
/** Head of queue */ // 隊頭 transient volatile QNode head; /** Tail of queue */ // 隊尾 transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it was cancelled. */ // 標記節點,清理鏈表尾部節點時,不直接刪除尾部節點,而是將尾節點的前驅節點next指向設置爲cleanMe // 防止此時向尾部插入節點的線程失敗致使出現數據問題 transient volatile QNode cleanMe; // 偏移量獲取 private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long cleanMeOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); cleanMeOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("cleanMe")); } catch (Exception e) { throw new Error(e); } }
頭尾節點初始化操做
TransferQueue() { // 初始化一個值爲null的QNode,初始化頭尾節點 QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; }
CAS更新變量操做
/** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ // 嘗試將nh更新爲新的隊頭 void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) // 原頭節點next指向更新爲本身,使得h爲離隊狀態,isOffList方法爲true h.next = h; // forget old next } /** * Tries to cas nt as new tail. */ // 嘗試更新隊尾節點 void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); } /** * Tries to CAS cleanMe slot. */ // 嘗試更新cleanMe節點 boolean casCleanMe(QNode cmp, QNode val) { return cleanMe == cmp && UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); }
入隊和出隊操做,統一使用一個方法,即實現接口中的transfer方法來完成,須要明白的是保存的是每次操做這個動做
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed // e爲null時至關於出隊操做isData爲false,入隊操做爲true boolean isData = (e != null); for (;;) { // 獲取最新的尾節點和頭節點 QNode t = tail; QNode h = head; // 頭,尾節點爲空,未初始化,則循環spin if (t == null || h == null) // saw uninitialized value continue; // spin // 首尾節點相同則爲空隊列或尾節點類型和新操做的類型相同,都是入隊操做或出隊操做 // 爲什麼只判斷尾部,由於若是頭節點和尾結點不一樣在隊列中不可能存在 // 一入隊和一出隊直接進入else匹配上不會再保存在鏈表中 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; // 尾節點已經被其餘線程更新修改,則從新循環判斷 if (t != tail) // inconsistent read continue; // tn非空,說明其餘線程已經添加了節點,嘗試更新尾節點,從新循環判斷 if (tn != null) { // lagging tail advanceTail(t, tn); continue; } // 設置超時時間而且超時時間小於等於0則直接返回null if (timed && nanos <= 0) // can't wait return null; // s爲null則初始化節點s if (s == null) s = new QNode(e, isData); // 嘗試將s添加到尾節點的next上,失敗則從新循環 if (!t.casNext(null, s)) // failed to link in continue; // 嘗試更新尾節點,尾節點此時爲s advanceTail(t, s); // swing tail and wait // 經過awaitFulfill方法自旋阻塞找到匹配操做的節點item,這個下面進行說明 Object x = awaitFulfill(s, e, timed, nanos); // 表示當前線程已經中斷或者超時,在awaitFulfill超時或者中斷時更新s.item指向本身 if (x == s) { // wait was cancelled // 清理節點,取消本次操做 clean(t, s); return null; } // 判斷s是否已從隊列移除,正常狀況下,出隊和入隊操做匹配上s節點確定是須要被清理掉的 if (!s.isOffList()) { // not already unlinked // 未被從隊列清除則嘗試更新隊頭 advanceHead(t, s); // unlink if head // 當前線程爲出隊操做時,s節點取消操做 if (x != null) // and forget fields s.item = s; // 清除等待線程 s.waiter = null; } return (x != null) ? (E)x : e; // 與上次隊列操做非同一類型操做,上次入隊,此次爲出隊,上次出隊,此次爲入隊纔會執行 // 匹配操做纔會執行下面邏輯 } else { // complementary-mode QNode m = h.next; // node to fulfill // 頭節點或尾節點被其餘線程更新或者爲空隊列則循環操做 if (t != tail || m == null || h != head) continue; // inconsistent read // 頭節點的下一個節點對應的item Object x = m.item; // 同類型,被取消操做或更新item失敗則更新頭節點指向從新操做 if (isData == (x != null) || // m already fulfilled 相同類型操做說明m已經被其餘線程操做匹配 x == m || // m cancelled 取消操做標識 // CAS更新item爲匹配上的操做值,好比當前是出隊操做,m爲入隊操做x爲入隊的值,那麼此時要替換爲出隊值null // CAS操做失敗 !m.casItem(x, e)) { // lost CAS // 刪除匹配上的頭節點,更新頭節點 advanceHead(h, m); // dequeue and retry continue; } // 更新頭節點 advanceHead(h, m); // successfully fulfilled // 釋放m的等待線程鎖使得m操做結束 LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
在transfer相同類型操做時被調用,正常狀況下(不算超時和中斷)阻塞線程直到與之匹配的操做到來再繼續執行
/** * Spins/blocks until node s is fulfilled. * * @param s the waiting node * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled */ // 自旋或阻塞直到超時或被喚醒匹配上節點 // 好比此時是入隊操做,上次也是入隊操做,在未設置超時時,這裏可能須要自旋或阻塞等待一個出隊操做來喚醒本次入隊操做 // 至關於互補匹配上同時繼續完成後續操做,出隊操做拿走入隊操做的值才能完成 入隊操做被出隊操做獲取值才能完成 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ // 獲取超時時間點 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 當前線程 Thread w = Thread.currentThread(); // 僅在head.next==s時才使用spins(自旋次數),同時判斷是否設置了超時 // 非head.next則不走spins,至關於只是在第一次操做入鏈表時執行自旋spins操做,不是上來就進行阻塞 // 也能明白,在入隊和出隊操做匹配時 新操做是和頭節點匹配,故自旋必定次數而不是直接阻塞來提高執行效率,減小線程切換開銷 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 判斷當前線程是否中斷,外部中斷操做,至關於取消本次操做 if (w.isInterrupted()) // 嘗試將s節點的item設置爲s本身,這樣判斷的時候就知道這個節點是被取消的 s.tryCancel(e); Object x = s.item; // s的item已經改變,直接返回x // 沒改變的狀況下即沒有匹配的操做,有匹配上的item即x將被改變,取消時如上也會改變,以下超時也會改變 // 故return後還須要要區分出取消和超時的狀況 if (x != e) return x; // 線程超時將s節點的item設置爲s本身 if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } // 須要自旋時循環 if (spins > 0) --spins; // 設置s的等待線程 else if (s.waiter == null) s.waiter = w; // 未設置超時,直接阻塞 else if (!timed) LockSupport.park(this); // 設置超時時間阻塞 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
清理s節點,同時須要關注cleanMe節點,總體處理過程以下:
/** * Gets rid of cancelled node s with original predecessor pred. */ // 中斷取消操做將pred節點代替s節點,修改先後節點之間的關聯 void clean(QNode pred, QNode s) { // 清理前先將等待線程置空 s.waiter = null; // forget thread // pred與s的先後關係 while (pred.next == s) { // Return early if already unlinked QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head // hn非空且被取消操做,更新頭節點爲hn if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } // 尾節點 QNode t = tail; // Ensure consistent read for tail // 空隊列返回 if (t == h) return; // 尾節點下一個 QNode tn = t.next; // 尾節點已被其餘線程更新 if (t != tail) continue; // 非空 更新尾節點 if (tn != null) { advanceTail(t, tn); continue; } // s不是尾節點 if (s != t) { // If not tail, try to unsplice // s的下一個節點 QNode sn = s.next; // 更新pred節點後一個節點爲s的下一個節點,至關於刪除s在鏈表中的關係 if (sn == s || pred.casNext(s, sn)) return; } // 執行到這裏說明s爲尾節點則須要處理cleanMe節點 QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node // 被清除的節點,從下面else部分代碼也能夠看出若是爲空,傳入的節點爲清理節點的前一個節點 // 這裏表明上次須要清理的cleanMe節點 // 這裏d表明真正須要清理的節點即dp.next QNode d = dp.next; QNode dn; if (d == null || // 清除節點爲null,至關於已經清理了 d == dp || // dp節點處於離隊狀態 !d.isCancelled() || // 清除節點被取消 (d != t && // 清除節點非尾節點 (dn = d.next) != null && // 清除節點下一節點非null dn != d && // 清除節點下一節點在隊列中 dp.casNext(d, dn))) // 清理d與其餘節點的關係 casCleanMe(dp, null); // 清理完畢設置爲null // 至關於s爲須要清理的節點,上邊已經清理過了,不須要再次清理 if (dp == pred) return; // s is already saved node // 更新cleanMe爲pred,爲下次清理準備 } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }
TransferQueue的源碼操做上面已經說明完畢,爲了更好的理解其內部數據轉換,舉個例子同時畫圖進行說明方便各位理解:
public class SynchronousQueueTest { public static void main(String[] args) { BlockingQueue<String> sq = new SynchronousQueue<>(true); new Thread(() -> { try { System.out.println(sq.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { sq.put("test"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
上面代碼很簡單,公平模式下,一個線程執行take操做,一個線程執行put操做,那麼SynchronousQueue內部是如何處理的呢?咱們以圖進行說明
1.建立公平策略下的SynchronousQueue
2.一線程執行take操做,以先執行take的線程爲例子進行說明,此時另外一線程put操做還未執行,take操做阻塞等待
3.另外一線程執行put操做,喚醒以前阻塞等待的take操做,同時處理完成
以後會進行節點的清理和頭尾節點的指向更新,這部分自行讀者可自行畫圖理解
SynchronousQueue是一個無數據緩衝的阻塞隊列,在不進行超時和中斷的狀況下,入隊操做需匹配出隊操做才能繼續執行下去,至關於進行互補操做,同時執行,成雙成對完成,在理解這點的基礎上,咱們能夠看到其擁有如下特色:
非公平策略下的實現類TransferStack和其餘知識將放在下篇文章進行說明
以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝