以前一篇文章已經講解了阻塞隊列SynchronousQueue的大部份內容,其中默認的非公平策略還未說明,本文就緊接上文繼續講解其中的非公平策略下的內部實現,順便簡單說明其涉及到的線程池部分的使用java
回顧一下,SynchronousQueue經過兩個內部類實現了公平策略和非公平策略的無緩存阻塞隊列,每種操做都須要對應的互補操做同時進行才能完成,例如,入隊操做必然對應出隊操做,在不涉及超時和中斷的狀況下,必須等待另外一個線程進行出隊操做,兩兩匹配才能執行,不然就阻塞等待node
以前已經對公平策略下的內部類實現TransferQueue作了詳細的說明,今天就非公平策略下的內部實現類TransferStack進行說明緩存
不一樣於公平策略下的操做,只有一種狀態須要注意:app
SNode基於棧的節點實現,變量與QNode有些不一樣,其中match在兩個操做匹配上以後能夠經過這個變量找到其匹配的節點,節點類型mode在使用上也有所不一樣,下面使用到時會進行說明,其餘參數可參考TransferQueue的QNode說明ide
static final class SNode { // next指向棧中下一個元素 volatile SNode next; // next node in stack // 和當前節點匹配的節點 volatile SNode match; // the node matched to this // 等待線程 volatile Thread waiter; // to control park/unpark // 節點內容 Object item; // data; or null for REQUESTs // 節點類型 int mode; // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. SNode(Object item) { this.item = item; } // CAS更新next字段 boolean casNext(SNode cmp, SNode val) { return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } /** * Tries to match node s to this node, if so, waking up thread. * Fulfillers call tryMatch to identify their waiters. * Waiters block until they have been matched. * * @param s the node to match * @return true if successfully matched to s */ // 嘗試s節點與當前節點進行匹配,成功則喚醒等待線程繼續執行 // 在使用到時才能理解,同時可參考我舉例上的圖示說明部分 boolean tryMatch(SNode s) { // match == null 代表當前節點未被其餘節點匹配上 // cas更新match字段爲s if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; // 當前節點等待線程未被其餘線程操做 if (w != null) { // waiters need at most one unpark // 喚醒等待線程同時將waiter置空 waiter = null; LockSupport.unpark(w); } return true; } // 判斷當前節點是否已與s進行匹配 return match == s; } /** * Tries to cancel a wait by matching node to itself. */ // 嘗試取消操做 將match置爲this void tryCancel() { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } // 判斷tryCancel是否操做成功 boolean isCancelled() { return match == this; } // 獲取match和next在對象中的偏移量 // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
變量部分須要注意的就在於這3種類型,其中FULFILLING須要注意的是這個變量不是直接進行使用的,而是與其餘兩種操做進行位操做時使用oop
爲何是2呢?由於2在二進制中表示10,在高位上有個1,而REQUEST與DATA的0和1二進制只在最低位上用二進制0和1表示便可,咱們能夠經過FULFILLING與其餘兩種類型位操做使得高位不一樣來判斷節點是否已經被其餘節點匹配互補上,此時還能經過最低位判斷出此節點是什麼操做,固然,因爲是棧結構,主要在棧頂元素,這裏經過高位的不一樣來判斷出是去匹配節點操做仍是幫助匹配的兩個節點進行一些操做,在後面要說明的transfer部分你會看到有3個條件分支執行,第3個即爲幫助已經肯定匹配的兩個節點進行一些操做以便儘快完成出棧讓本身繼續執行匹配操做this
/** Node represents an unfulfilled consumer */ // 數據請求操做 如take操做 表明未被匹配上的消費者 static final int REQUEST = 0; /** Node represents an unfulfilled producer */ // 數據保存操做 如put操做 表明未被匹配上的生產者 static final int DATA = 1; /** Node is fulfilling another unfulfilled DATA or REQUEST */ // 有節點與其匹配,至關於已經有互補操做,使用上不是直接使用,可參考後面的源碼部分 static final int FULFILLING = 2; /** The head (top) of the stack */ // 棧頂指針 volatile SNode head; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } }
CAS更新棧頂指針,比較簡單atom
boolean casHead(SNode h, SNode nh) { return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh); }
判斷m對應的節點是否已經被匹配,和FULFILLING進行位與操做,判斷m對應的棧節點處於FULFILLING狀態,即已經匹配上了,在transfer裏與棧頂節點非相同操做時會入棧一個節點,此節點的mode和普通節點不同,會經過FULFILLING|mode操做更新mode,故這裏最低位來區分是保存數據仍是請求數據,高位來區分此節點是不是已經找到匹配節點的節點,固然,只在這次操做中使用,具體參見下面方法的說明spa
/** Returns true if m has fulfilling bit set. */ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
建立或重置SNode節點,若是爲空則建立新的SNode節點,不爲空則重置節點的mode和next屬性線程
/** * Creates or resets fields of a node. Called only from transfer * where the node to push on stack is lazily created and * reused when possible to help reduce intervals between reads * and CASes of head and to avoid surges of garbage when CASes * to push nodes fail due to contention. */ static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; }
相似於在公平模式下的TransferQueue.transfer,入隊和出隊操做,統一使用一個方法,即實現接口中的transfer方法來完成,須要明白的是保存的是每次操做這個動做,固然,與TransferQueue.transfer有所不一樣的在於這裏有3個條件分支,按順序含義以下:
須要注意的就是上面屢次提醒的mode變量部分,須要好好理解
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed // 節點類型,是put仍是take操做,便是保存數據仍是請求數據 int mode = (e == null) ? REQUEST : DATA; for (;;) { // 獲取棧頂指針 SNode h = head; // 棧爲空 // 或棧頂節點和當前操做節點爲相同操做 if (h == null || h.mode == mode) { // empty or same-mode // 設置超時時間且超時時間小於等於0 if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) // 棧頂非空且棧頂節點爲取消操做狀態 // 出棧,嘗試將棧頂節點更新 casHead(h, h.next); // pop cancelled node else return null; // 建立節點,嘗試更新棧頂節點 } else if (casHead(h, s = snode(s, e, h, mode))) { // 經過awaitFulfill方法自旋阻塞找到匹配操做的節點,這個下面進行說明 // 能夠類比公平模式下的awaitFulfill SNode m = awaitFulfill(s, timed, nanos); // 取消或超時 if (m == s) { // wait was cancelled // 清理節點,取消本次操做 clean(s); return null; } // 棧頂節點更新爲s的next元素 // 執行到這一步時應該是棧頂兩個節點進行了匹配 // 出棧棧頂2個節點元素,幫助更新棧頂元素爲第三個節點元素即爲s.next // 固然,也可能另外一個棧頂節點線程幫助更新了 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 判斷下,若是當前是請求數據,即take操做,返回m.item值,即返回匹配節點的item // 當前是保存數據,即put操做,返回s.item return (E) ((mode == REQUEST) ? m.item : s.item); } // 與棧頂節點非相同操做,棧頂元素非匹配互補節點 } else if (!isFulfilling(h.mode)) { // try to fulfill // 棧頂元素處於取消操做狀態 if (h.isCancelled()) // already cancelled // 嘗試出棧更新棧頂元素 casHead(h, h.next); // pop and retry // 入棧新建立的節點,同時FULFILLING|mode 位與操做 // s的mode爲10或者11 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 進入這裏代表s已經爲棧頂節點,並且s.next是其匹配節點 // 循環直到匹配上 for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match // 空則可能被其餘線程匹配上了則更新頭節點爲null,從新進入外層循環 if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node // 這裏s節點需置空,由於比較特殊,mode不一樣於普通節點 // 從新循環時根據狀況從新建立節點 s = null; // use new node next time break; // restart main loop } // SNode mn = m.next; // 嘗試m與s進行匹配,其實是更新m節點的match爲s,同時喚醒m的等待線程 if (m.tryMatch(s)) { // 成功則出棧棧頂兩個元素,即更新棧頂節點 casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match // 未匹配上,可能被其餘節點匹配上了,嘗試更新s的next指針,再繼續匹配 s.casNext(m, mn); // help unlink } } // 不知足上邊兩個條件,即此時棧頂爲匹配節點,還未匹配完成,這裏幫忙完成匹配出棧操做 // 注意,這裏只是幫助更新head和next並不作其餘操做,參考上面方法的處理 } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
與TransferQueue.awaitFulfill相似,在當前操做同以前操做相同時,未設置操做時間同時未被外部線程中斷則需阻塞等待匹配節點喚醒當前阻塞的線程,總體上很是類似,因爲match的存在使得判斷對應的匹配節點要比TransferQueue.awaitFulfill簡單許多
/** * Spins/blocks until node s is matched by a fulfill operation. * * @param s the waiting node * @param timed true if timed wait * @param nanos timeout value * @return matched node, or s if cancelled */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 獲取超時時間點 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 當前線程 Thread w = Thread.currentThread(); // shouldSpin判斷是否須要進行自旋,下一個方法進行說明 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 判斷當前線程是否中斷,外部中斷操做,至關於取消本次操做 if (w.isInterrupted()) // 嘗試將s節點的match設置爲s本身,這樣判斷的時候就知道這個節點是被取消的 s.tryCancel(); SNode m = s.match; // match非空則表示當前節點已經被匹配match匹配上 if (m != null) return m; // 超時配置處理 if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } // 自旋spins if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; // 設置等待線程 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter // 未設置超時,直接阻塞 else if (!timed) LockSupport.park(this); // 設置超時時間阻塞 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
判斷是否須要自旋操做,知足下列狀況之一即須要自旋:
/** * Returns true if node s is at head or there is an active * fulfiller. */ boolean shouldSpin(SNode s) { SNode h = head; return (h == s || h == null || isFulfilling(h.mode)); }
清理操做,清理棧節點s的關聯關係,同時會清理整個棧節點的取消操做節點,無cleanMe節點,比TransferQueue.clean操做要簡單許多
/** * Unlinks s from the stack. */ void clean(SNode s) { // item,waiter 置空 s.item = null; // forget item s.waiter = null; // forget thread // s的下一個節點處於取消操做狀態,則past指向past的下一個節點 SNode past = s.next; if (past != null && past.isCancelled()) past = past.next; // Absorb cancelled nodes at head // 頭節點被取消操做則進行將next節點更新爲頭節點 SNode p; while ((p = head) != null && p != past && p.isCancelled()) casHead(p, p.next); // Unsplice embedded nodes // 頭節點調整完畢,如今將棧節點中每一個節點都會進行檢查一遍,更新先後節點的關係,將取消操做的節點進行排除 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } }
參考公平模式下的代碼,經過下列最簡單的示例進行說明,一個線程take操做,一個線程put操做,畫圖進行說明
public class SynchronousQueueTest { public static void main(String[] args) { BlockingQueue<String> sq = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(sq.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { sq.put("test"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
1.建立非公平策略下的SynchronousQueue,new TransferStack<E>() 無參構造方法默認,變量上沒有進行任何操做
2.一線程執行take操做,以先執行take的線程爲例子進行說明,此時另外一線程put操做還未執行,take操做阻塞等待
3.另外一線程執行put操做,經過!isFulfilling判斷出當前棧頂未與其餘節點匹配,則其嘗試與棧頂節點匹配,成功則喚醒以前阻塞等待的take操做,同時處理完成
最終執行return (E) ((mode == REQUEST) ? m.item : s.item),獲取操做結果,固然,其中還有一個條件分支能夠幫助匹配互補更新操做,這部分自行讀者可自行畫圖理解
線程池使用Executors.newCachedThreadPool()方法建立可緩衝線程池,這裏看下源碼實現:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
核心線程設置爲0,最大線程池設置Integer.MAX_VALUE,存活時間60s,阻塞隊列使用SynchronousQueue,默認非公平模式,可緩衝線程池經過複用空閒線程提升效率,固然,若是咱們使用這種方式建立線程池可能會帶來一些問題
這會形成什麼問題呢?
這裏最大線程數設置爲Integer.MAX_VALUE,可能會建立很是多的線程,甚至致使OOM,因此阿里規範中說起了這部份內容,指出了其中存在的隱患,須要規避資源耗盡的風險,開發人員應直接使用ThreadPoolExecutor來建立線程池,每一個參數須要根據本身的需求進行設置
至此,SynchronousQueue的非公平策略的內部實現也已講解完畢,非公平策略下要注意其對於mode部分狀態的處理,經過高位和低位分別區分是否已匹配和是什麼類型的操做(生產者仍是消費者),理解了這部分,對於非公平模式下的總體操做流程也能很快熟悉,相對來講不是十分複雜,多畫圖觀察代碼執行過程能幫助更好的理解
SynchronousQueue做爲一個無數據緩衝的阻塞隊列,其內部經過兩個內部類(隊列和棧)分別實現了公平策略和非公平策略下的隊列操做,其實咱們須要記住的在於其操做必須是成雙成對的,在無超時無中斷的狀況下,一個線程執行入隊操做,必然須要另外一個線程執行出隊操做,此時兩操做互相匹配,同時完成操做,這也是其取名爲Synchronous(同時發生)的含義吧
以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝