在前面的三篇文章中前後介紹了ForkJoin框架的任務組件(ForkJoinTask體系,CountedCompleter體系)源碼,並簡單介紹了目前的並行流應用場景.ForkJoin框架本質上是對Executor-Runnable/Callable-Future/FutureTask的擴展,它依舊支持經典的Executor使用方式,即任務+池的配合,向池中提交任務,並異步地等待結果.java
毫無疑問,前面的文章已經解釋了ForkJoin框架的新穎性,初步瞭解了工做竊取依託的數據結構,ForkJoinTask/CountedCompleter在執行期的行爲,也提到它們必定要在ForkJoinPool中進行運行和調度,這也是本文力求解決的問題.算法
ForkJoinPool源碼是ForkJoin框架中最複雜,最難理解的部分,且由於交叉依賴ForkJoinTask,CountedCompleter,ForkJoinWorkerThread,做者在前面單獨用兩篇文章分析了它們,之前兩篇文章爲基礎,重複部分本文再也不詳述.編程
首先看類簽名.api
//禁止僞共享 @sun.misc.Contended //繼承自AbstractExecutorService public class ForkJoinPool extends AbstractExecutorService
前面的幾篇文章不止一次強調過ForkJoin框架的"輕量線程,輕量任務"等概念,也提到少許線程-多數計算,資源空閒時竊取任務.並介紹了基於status狀態的調度(ForkJoinTask系列),不基於status而由子任務觸發完成的調度(CountedCompleter系列),顯然它們的共性就是讓線程在正常調度的前提下儘可能少的空閒,最大幅度利用cpu資源,僞共享/緩存行的問題在ForkJoin框架中顯然會是一個更大的性能大殺器.在1.8以前,通常經過補位的方式解決僞共享問題,1.8以後,官方使用@Contended註解,令虛擬機儘可能註解標註的字段(字段的狀況)或成員字段放置在不一樣的緩存行,從而規避了僞共享問題.數組
創建ForkJoinPool能夠直接new,也能夠使用Executors的入口方法.緩存
//Executors方法,顯然ForkJoinPool被稱做工做竊取線程池.參數指定了並行度. public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, //默認線程工廠,前文中已提過默認的ForkJoinWorkerThread ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } //不提供並行度. public static ExecutorService newWorkStealingPool() { return new ForkJoinPool //使用全部可用的處理器 (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } //對應的,ForkJoinPool的構造器們. //不指定任何參數. public ForkJoinPool() { //並行度取MAX_CAP和可用處理器數的最小值. this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), //默認的線程工廠.無異常處理器,非異步模式. defaultForkJoinWorkerThreadFactory, null, false); } //同上,只是使用參數中的並行度. public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { //並行度須要校驗 this(checkParallelism(parallelism), //校驗線程工廠 checkFactory(factory), //參數指定的未捕獲異常處理器. handler, //前面的幾處代碼asyncMode都是false,會選用LIFO隊列,是true是會選用FIFO隊列,後面詳述. asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //線程名前綴 "ForkJoinPool-" + nextPoolId() + "-worker-"); //檢查許可,不關心. checkPermission(); } //檢查方法很簡單. //並行度不能大於MAX_CAP不能不大於0. private static int checkParallelism(int parallelism) { if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); return parallelism; } //線程工廠非空便可. private static ForkJoinWorkerThreadFactory checkFactory (ForkJoinWorkerThreadFactory factory) { if (factory == null) throw new NullPointerException(); return factory; } //最終構造器,私有.待介紹完一些基礎字段後再述. private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; //config初始化值,用並行度與mode取或,顯然mode是FIFO時,將有一個第17位的1. this.config = (parallelism & SMASK) | mode; //np保存並行度(正數)的相反數(補碼). long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
瞭解其餘線程池源碼的朋友能夠去回憶其餘線程池的構建,不管是調度線程池仍是普通的線程池或者緩存池,他們其實都設置了核心線程數和最大線程數.固然這要看定義"線程池分類"的視角,以Executors入口的api分類,或許能夠分類成固定線程池,緩衝池,單線程池,調度池,工做竊取池;但以真正的實現分類,其實只有ThreadPoolExecutor系列(固定線程池,單線程池都直接是ThreadPoolExecutor,調度池是它的子類,緩衝池也是ThreadPoolExecutor,只是阻塞隊列限定爲SynchronizedQueue)和ForkJoinPool系列(工做竊取池).安全
做者更傾向於用實現的方式區分,也間接參照Executors的api使用用途的區分方式.若是不使用Executors的入口api,不論哪一種ThreadPoolExecutor系列,咱們均可以提供線程池的大小配置,阻塞隊列,線程空閒存活時間及單位,池滿拒絕策略,線程工廠等,而所謂的緩存池和固定池的區別只是隊列的區別.數據結構
調度池的構造參數與ThreadPoolExecutor無異,只是內限了阻塞隊列的類型,它雖然是ThreadPoolExecutor的擴展,卻不只沒有拓充參數,反而減小了兩個參數:阻塞隊列和最大線程數.阻塞隊列被默認設置爲內部類DelayQueue,它實現了BlockingQueue,最大線程數則爲整數上限,同時新增的對任務的延時或重試等屬性則是依託於內部維護的一個FutureTask的擴展,並未增長到構造參數.併發
而到了ForkJoinPool,咱們看到的是大相徑庭於ThreadPoolExecutor系列的構建方式.首先根本沒有提供核心線程和最大線程數,線程空閒存活時間的參數和阻塞隊列以及池滿拒絕策略;線程工廠也僅能提供生產ForkJoinWorkerThread的工廠bean;還具有一些ThreadPoolExecutor沒有的參數,如未捕獲異常處理器,同步異步模式,工做線程前綴(其實別的類型的線程工廠也能夠提供線程前綴,默認就是常見的pool-前綴)等.框架
顯然從參數看即可猜想出若干不一樣於其餘線程池的功能.但咱們更關心其中的一些參數設置.
通常的參數都能見名知義,僅有config和ctl難以理解,此處也不詳細介紹,只說他們的初值的初始化.
config是並行度與SMASK取與運算再與mode取或,這裏並行度最大是15位整數(MAX_CAP=0x7FFF),而SMASK做用於整數後16位,mode在FIFO爲1<<16,LIFO是0.很好計算.
ctl實際上是一個控制信號,咱們後面會在具體源碼就地解釋,它的計算先經過了一個局部變量np.
np的計算方法是將並行度的相反數(補碼)轉換爲長整型.前面簡單分析,並行度不會大於MAX_CAP,所以np至少前49位所有是1.
計算ctl時,將np左移AC_SHIFT即爲取後16位,將np左移TC_SHIFT即取它的後32位,分別與AC_MASK和TC_SHIFT,表示取np的後16位分別放置於ctl的前16位和33至48位.而ctl的後32位初值爲0.
由於生成的ctl前16位和後16位相等,若是仔細用數學驗證,能夠發現,對前16位和後16位的末位同時加1,當添加了parallel次後,ctl將歸0.這也是添加worker限制的重要數理依據.
前面列舉了獲取ForkJoinPool實例的幾種方法,初步展現了構造一個ForkJoinPool的屬性,也暴露了一些實現細節,而這些細節依賴於一些字段和成員函數,咱們先從它們開始.
//ForkJoinWorkerThread的線程工廠. public static interface ForkJoinWorkerThreadFactory { //建立新線程要實現的方法. public ForkJoinWorkerThread newThread(ForkJoinPool pool); } //前面看到的默認線程工廠. static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } //建立InnocuousForkJoinWorkerThread的線程工廠,上一文已經介紹過. static final class InnocuousForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { private static final AccessControlContext innocuousAcc; static { Permissions innocuousPerms = new Permissions(); innocuousPerms.add(modifyThreadPermission); innocuousPerms.add(new RuntimePermission( "enableContextClassLoaderOverride")); innocuousPerms.add(new RuntimePermission( "modifyThreadGroup")); innocuousAcc = new AccessControlContext(new ProtectionDomain[] { new ProtectionDomain(null, innocuousPerms) }); } public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread) java.security.AccessController.doPrivileged( new java.security.PrivilegedAction<ForkJoinWorkerThread>() { public ForkJoinWorkerThread run() { return new ForkJoinWorkerThread. InnocuousForkJoinWorkerThread(pool); }}, innocuousAcc); } } //空任務 static final class EmptyTask extends ForkJoinTask<Void> { private static final long serialVersionUID = -7721805057305804111L; EmptyTask() { status = ForkJoinTask.NORMAL; } //狀態直接是已正常完成. public final Void getRawResult() { return null; } public final void setRawResult(Void x) {} public final boolean exec() { return true; } }
以上是線程工廠和一個默認的EmptyTask.接下來看一些跨池和工做隊列的公用常量.
// 與邊界有關的常量 static final int SMASK = 0xffff; // 後16位. static final int MAX_CAP = 0x7fff; // 前面在定並行度時參考的最大容量. static final int EVENMASK = 0xfffe; // 後16位驗偶數 static final int SQMASK = 0x007e; // 最大64個偶數槽,從第2位至7位共6位,2的6次方. // 與WorkQueue有關 static final int SCANNING = 1; // 對WorkQueue正在運行任務的標記 static final int INACTIVE = 1 << 31; // 標記負數 static final int SS_SEQ = 1 << 16; // 版本號使用,第17位1 // ForkJoinPool和WorkQueue的config有關常量. static final int MODE_MASK = 0xffff << 16; // 能濾取前16位. static final int LIFO_QUEUE = 0;//前面提到過的,非async模式(false),值取0. static final int FIFO_QUEUE = 1 << 16;//async模式(true),值取1. static final int SHARED_QUEUE = 1 << 31; // 共享隊列標識,符號位表示負.
以上的字段含義只是粗略的描述,先有一個印象,後面看到時天然理解其含義.
接下來看核心的WorkQueue內部類.
//前面的文章說過,它是一個支持工做竊取和外部提交任務的隊列.顯然,它的實例對內存部局十分敏感, //WorkQueue自己的實例,或者內部數組元素均應避免共享同一緩存行. @sun.misc.Contended static final class WorkQueue { //隊列內部數組的初始容量,默認是2的12次方,它必須是2的幾回方,且不能小於4. //但它應該設置一個較大的值來減小隊列間的緩存行共享. //在前面的java運行時和54篇java官方文檔術語中曾提到,jvm一般會將 //數組放在可以共享gc標記(如卡片標記)的位置,這樣每一次寫都會形成嚴重內存競態. static final int INITIAL_QUEUE_CAPACITY = 1 << 13; //最大內部數組容量,默認64M,也必須是2的平方,但不大於1<<(31-數組元素項寬度), //根據官方註釋,這能夠確保無需計算索引歸納,但定義一個略小於此的值有助於用戶在 //系統飽合前捕獲失控的程序. static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // unsafe機制有關的字段. private static final sun.misc.Unsafe U; private static final int ABASE; private static final int ASHIFT; private static final long QTOP; private static final long QLOCK; private static final long QCURRENTSTEAL; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> wk = WorkQueue.class; Class<?> ak = ForkJoinTask[].class; //top字段的句柄. QTOP = U.objectFieldOffset (wk.getDeclaredField("top")); //qlock字段的句柄. QLOCK = U.objectFieldOffset (wk.getDeclaredField("qlock")); //currentSteal的句柄 QCURRENTSTEAL = U.objectFieldOffset (wk.getDeclaredField("currentSteal")); //ABASE是ForkJoinTask數組的首地址. ABASE = U.arrayBaseOffset(ak); //scale表明數組元素的索引大小.它必須是2的平方. int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); //計算ASHIFT,它是31與scale的高位0位數量的差值.由於上一步約定了scale必定是一個正的2的幾回方, //ASHIFT的結果必定會大於1.能夠理解ASHIFT是數組索引大小的有效位數. ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } //插曲,在Integer類的numberOfLeadingZeros方法,果真一流的程序是數學. public static int numberOfLeadingZeros(int i) { // HD, Figure 5-6 if (i == 0) //i自己已經是0,毫無疑問地返回32.本例中i是2起,因此不會. return 32; //先將n初始化1.最後會減掉首位1. int n = 1; //i的前16位不存在非零值,則將n加上16並移除i的前16位.將i轉換爲一個以原i後16位開頭的新值. if (i >>> 16 == 0) { n += 16; i <<= 16; } //不論前一步結果如何,若此時i的前8位不存在非零值,則n加上8,i移除前8位.將i轉換爲原i的後24位開頭的新值. if (i >>> 24 == 0) { n += 8; i <<= 8; } //不論前一步結果如何,若此時i的前4位不存在非零值,則n加上4,i移除前4位.將i轉換爲原i的後28位開頭的新值. if (i >>> 28 == 0) { n += 4; i <<= 4; } //不論前一步結果如何,若此時i的前2位不存在非零值,則n加上2,i移除前2位.將i轉換爲原i的後30位開頭的新值. if (i >>> 30 == 0) { n += 2; i <<= 2; } //通過前面的運算,i的前30位的非零值數量已經記入n, //在前一步的基礎上,此時i的前1位若存在非零值,則n-1,不然n保留原值. n -= i >>> 31; return n; } //回到WorkQueue // 實例字段 volatile int scanState; // 版本號,小於0表明不活躍,註釋解釋奇數表明正在掃描,但從代碼語義上看正好相反. int stackPred; // 前一個池棧控制信號(ctl),它保有前一個棧頂記錄. int nsteals; // 偷盜的任務數 int hint; // 一個隨機數,用於決定偷取任務的索引. int config; // 配置,表示池的索引和模式 volatile int qlock; // 隊列鎖,1表示鎖了,小於0表示終止,其餘狀況是0. volatile int base; // 底,表示下一個poll操做的插槽索引 int top; // 頂,表示下一個push操做的插槽索引 ForkJoinTask<?>[] array; // 存聽任務元素的數組,初始不分配,首擴容會分配. final ForkJoinPool pool; // 包含該隊列的池,可能在某些時刻是null. final ForkJoinWorkerThread owner; // 持有該隊列的線程,若是隊列是共享的,owner是null. volatile Thread parker; // 在調用park阻塞的owner,非阻塞時爲null volatile ForkJoinTask<?> currentJoin; // 被在awaitJoin中join的task. volatile ForkJoinTask<?> currentSteal; // 字面意思當前偷的任務,主要用來helpStealer方法使用. //工做隊列構造器,只初始化線程池,owner等字段. WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { this.pool = pool; this.owner = owner; // Place indices in the center of array (that is not yet allocated) //base和top初始均爲INITIAL_QUEUE_CAPACITY的一半,也就是2的11次方. base = top = INITIAL_QUEUE_CAPACITY >>> 1; } //返回本隊列在池中的索引,使用config的2至4位表示.由於config的最後一位是奇偶位,忽略. final int getPoolIndex() { return (config & 0xffff) >>> 1; } //返回隊列中的任務數. final int queueSize() { //非owner的調用者必須先讀base,用base-top,獲得的結果小於0則取相反數,不然取0. //忽略即時的負數,它並不嚴格準確. int n = base - top; return (n >= 0) ? 0 : -n; } //判斷隊列是否爲空隊.本方法較爲精確,對於近空隊列,要檢查是否有至少一個未被佔有的任務. final boolean isEmpty() { ForkJoinTask<?>[] a; int n, m, s; //base大於等於top,說明空了. return ((n = base - (s = top)) >= 0 || //有容量,且剛好計算爲1,可能只有一個任務. (n == -1 && //計算爲1,再驗數組是否是空的. ((a = array) == null || (m = a.length - 1) < 0 || //取該位置元素的值判空,空則說明isEmpty. //取值的方式是取ForkJoinTask.class首地址加上偏移量(數組長度減一(最後一個元素位置,經典案例32-1)與運算top減一左移ASHIFT(索引大小有效位數)位)的值. U.getObject (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); } //將一個任務壓入隊列,前文提過的fork最終就會壓隊.但此方法只能由非共享隊列的持有者調用. //當使用線程池的"外部壓入"externalPush方法時,壓入共享隊列. final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; //保存當時的base top. int b = base, s = top, n; //若是數組被移除則忽略. if ((a = array) != null) { //數組最後一個下標.如長度32,則m取31這個質數.此時保存一個m,對於保存後其餘push操做至關於打了屏障. int m = a.length - 1; //向數組中的指定位置壓入該任務.位置包含上面的m和s進行與運算(數組中的位置),結果左移索引有效長度位(索引長度),再加上數組首索引偏移量(起始地址). U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); //將top加1. U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { //計算舊的任務數量,發現不大於1個,說明原來極可能工做線程正在阻塞等待新的任務.須要喚醒它. if ((p = pool) != null) //signalWork會根據狀況,添加新的工做線程或喚醒等待任務的線程. p.signalWork(p.workQueues, this); } else if (n >= m)//2. //任務數量超出了,對數組擴容. growArray(); } } //添加任務過程主流程無鎖,包括可能出現的growArray.當原隊列爲空時,它會初始化一個數組,不然擴容一倍. //持有者調用時,不須要加鎖,但當其餘線程調用時,須要持有鎖.在resize過程當中,base能夠移動,但top否則. final ForkJoinTask<?>[] growArray() { //記錄老數組. ForkJoinTask<?>[] oldA = array; //根據老數組決定新容量,老數組空則INITIAL_QUEUE_CAPACITY不然國倍. int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; if (size > MAXIMUM_QUEUE_CAPACITY) //新大小大於最大數組大小則拒絕. throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; //直接將原來的數組引用替換成新的. ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; //若是是初次分配,就此打住返回a,是擴容,且老數組非空則進入下面的循環拷貝. if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { //根據前面的運算,size必定是2的冪,減一用來哈希,這是經典處理辦法. int mask = size - 1; do { ForkJoinTask<?> x; //老數組base自增過若干次的獲得b,它表明的元素對應的索引. int oldj = ((b & oldMask) << ASHIFT) + ABASE; //用b在新數組中找出索引. int j = ((b & mask) << ASHIFT) + ABASE; //老數組中用索引取出元素. x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj); if (x != null && //老數組置空,放入新數組. U.compareAndSwapObject(oldA, oldj, x, null)) U.putObjectVolatile(a, j, x); //每處理完一個task,就將base自增1,直到top爲止. } while (++b != t); } //返回新數組. return a; } //存在下一個任務,彈出,順序是後進先出.此方法僅限非共享隊列的owner調用. final ForkJoinTask<?> pop() { ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m; //還有元素. if ((a = array) != null && (m = a.length - 1) >= 0) { //1.top至少比base大一.注意,每次循環都會讀出新的top,它是volatile修飾的. for (int s; (s = top - 1) - base >= 0;) { //top對應的索引. long j = ((m & s) << ASHIFT) + ABASE; //2.該索引沒有元素,break,返回null.並且就表明這個位置的確是null,與競態無關. //由於此方法僅owner線程使用,不會出現另外一個線程計算了一樣的j,且先執行了3的狀況. //出現這種狀況,則是此位置的任務當先被執行並出棧,或者就從未設置過任務,後續分析這種極端狀況. //故若是出現某個任務在數組的中間,提早被執行並置空(非pop或poll方式),那麼再對WorkQueue進行pop時將會中斷, //留下一部分null以後的任務不能出棧,因此能夠容許任務非pop或poll方式查出並執行,但爲了能pop出全部任務,不能中間置null. if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) break; //3.有元素,將該索引位置置null.若cas失敗,說明元素被取出了, //但下次循環即便在2處break並返回null,也不是由於競態,由於每次循環到1都會讀取新的top, //也就有新的j. if (U.compareAndSwapObject(a, j, t, null)) { //數組位置置null的同時top減1. U.putOrderedInt(this, QTOP, s); return t; } } } //循環退出,說明top位置沒有元素,也至關於說明數組爲空.顯然此方法的另外一個做用是將隊列壓縮,空隊列會將top先降到base+1,再循環最後一次將top降到base. return null; } //若是b是base,使用FIFO的次序嘗試無競態取底部的任務.它會在ForkJoinPool的scan和helpStealer中使用. final ForkJoinTask<?> pollAt(int b) { ForkJoinTask<?> t; ForkJoinTask<?>[] a; if ((a = array) != null) { //和前面同樣的的方式計算b對應的索引j int j = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null && //j對應位置有task且當前base==b,嘗試將task出隊. base == b && U.compareAndSwapObject(a, j, t, null)) { //出隊成功base增1.不須要額外的同步,由於兩個線程不可能同時在上面的cas成功. //當一切條件匹配(b就是base且j位置有元素),pollAt同一個b只會有一個返回非空的t. //若是多個線程傳入的b不相等,在同一時刻只有一個會等於base. base = b + 1; return t; } } return null; } //用FIFO的次序取下一個任務. final ForkJoinTask<?> poll() { ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; //1.循環從base取任務,當base增加到top或其餘操做重置array爲null則終止循環. while ((b = base) - top < 0 && (a = array) != null) { //前面已敘述過取索引的邏輯,使用一個top到base間的數與數組長度-1與運算並左移索引長度位再加上數組基準偏移量.後面再也不綴述. int j = (((a.length - 1) & b) << ASHIFT) + ABASE; //取出task t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); //2.若是發生競態,base已經不是b,直接開啓下一輪循環把新的base讀給b. if (base == b) { if (t != null) { //3.當前t是base任務,用cas置空,base+1,返回t. //若是此處發生競態,則只有一個線程能夠成功返回t並重置base(4). //不成功的線程會開啓下一輪循環,此時成功線程可能將來的及執行4更新base, //也可能已經更新base,則致使先前失敗的線程在2處經過,經5種或判隊列空返回,或非空再次循環,而 //在當前成功線程執行4成功後,全部前面失敗的線程能夠在1處讀到新的base,這些線程 //在下一次循環中依舊只會有一個成功彈出t並重置base,直到全部線程執行完畢. if (U.compareAndSwapObject(a, j, t, null)) { //4重置加返回 base = b + 1; return t; } } //5.t取出的是空,發現此時臨時變量b(其餘成功線程在此輪循環前置的base)已增至top-1,且當前線程又沒能成功的彈出t,說明必定會有一個線程 //將t彈出並更新base到top的值,當前線程不必再開下一個循環了,直接break並返回null. //t取出的是空,可是沒到top,說明只是被提早執行並置空了,那麼繼續讀取新的base並循環,且若沒有其餘線程去更改base,array的長度,或者把top降到 //base,則當前線程就永遠死循環下去了,由於每次循環都是125且每一個變量都不變.所以爲避免循環,每一個任務能夠提早執行,但必定不能提早離隊(置null). //也就是說:只能用poll或pop方式彈出任務,其餘方式得到任務並執行是容許的,但不能在執行後置null,留待後續源碼驗證一下. else if (b + 1 == top) // now empty break; } } //從循環退出來有兩種狀況,多是在5處知足退出條件,或者在2處發現b已是髒數據,下輪循環不知足循環條件所致.兩種都應該返回null. return null; } //根據mode來取下一個本隊列元素.根據模式. final ForkJoinTask<?> nextLocalTask() { //當前WorkQueue的配置了FIFO,則poll,不然pop. //儘管還未看到註冊worker的源碼,在此提早透露下,ForkJoinPool也有一個config(前面講構造函數提過) //該config保存了mode信息,並原樣賦給了WorkQueue的mode.注意,相應的任務會出隊. return (config & FIFO_QUEUE) == 0 ? pop() : poll(); } //根據模式取出下一個任務,可是不出隊. final ForkJoinTask<?> peek() { ForkJoinTask<?>[] a = array; int m; //空隊,返回null. if (a == null || (m = a.length - 1) < 0) return null; //根據mode定位要取的索引j. int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base; int j = ((i & m) << ASHIFT) + ABASE; //返回讀出的值,不出隊. return (ForkJoinTask<?>)U.getObjectVolatile(a, j); } //若是參數t是當前隊的top,則彈出. final boolean tryUnpush(ForkJoinTask<?> t) { ForkJoinTask<?>[] a; int s; if ((a = array) != null && (s = top) != base && //1.知足非空條件.嘗試用t去當看成計算出的索引位置的原任務的值並cas爲null來出隊. U.compareAndSwapObject (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { //cas成功,說明t確實是top,將top減一返回true. U.putOrderedInt(this, QTOP, s); return true; } //2.cas失敗或不知足1的條件,返回false. return false; } //移除並取消隊列中全部已知的任務,忽略異常. final void cancelAll() { ForkJoinTask<?> t; if ((t = currentJoin) != null) { //有currentJoin,引用置空,取消並忽略異常. currentJoin = null; ForkJoinTask.cancelIgnoringExceptions(t); } if ((t = currentSteal) != null) { //有currentSteal,引用置空,取消並忽略異常. currentSteal = null; ForkJoinTask.cancelIgnoringExceptions(t); } //除了上面兩個,就只剩下數組中的任務了.按LILO的順序彈出並依次取消,忽略全部異常. while ((t = poll()) != null) ForkJoinTask.cancelIgnoringExceptions(t); } // 如下是執行方法. //按FIFO順序從隊首彈出任務並執行全部非空任務. final void pollAndExecAll() { for (ForkJoinTask<?> t; (t = poll()) != null;) //很明顯,若是未按嚴格順序執行,先執行中間的一個任務, //再調用本方法,則會半路停止. t.doExec(); } //移除並執行完全部本隊列的任務,若是是先進先出,則執行前面的pollAndExecAll方法. //不然pop循環執行到空爲止.按前面的分析,只要堅持只能pop或poll彈出,其餘方式執行任務但不能置空的原則, //能夠保證pop或poll出現空的狀況只能是競態發生的狀況. final void execLocalTasks() { int b = base, m, s; ForkJoinTask<?>[] a = array; //初始知足條件,top至少比base大1.隊列非空. if (b - (s = top - 1) <= 0 && a != null && (m = a.length - 1) >= 0) { //不是FIFO模式. if ((config & FIFO_QUEUE) == 0) { for (ForkJoinTask<?> t;;) { //原子getAndSet,查出並彈出本來的task if ((t = (ForkJoinTask<?>)U.getAndSetObject (a, ((m & s) << ASHIFT) + ABASE, null)) == null) //彈出的task是空,break.說明整個工做流程中,若是未保證嚴格有序, //如先從中間的某個任務開始執行而且出隊了,再調用execLocalTasks,會致使中間停頓. //只執行不出隊,則至少不會中斷.出現t是null的狀況只能是競態或末尾. break; //top減一,執行任務. U.putOrderedInt(this, QTOP, s); t.doExec(); //若是base大於等於top,則停止. if (base - (s = top - 1) > 0) break; } } //是FIFO模式,pollAndExecAll. else pollAndExecAll(); } } //重點入口方法來了,前面留下諸多關於執行任務是否出隊的討論,下面來分析入口方法. //該方法的入口是每一個工做線程的run方法,所以只有一個線程. final void runTask(ForkJoinTask<?> task) { //傳入task是空直接不理會. if (task != null) { //標記成忙.scanState是WorkQueue的成員變量,每一個WorkQueue只有一個值, //前面說過,通常狀況下,每一個線程會有一個WorkQueue,因此某種狀況來說也能夠標記爲 //當前ForkJoinWorkerThread繁忙. //SCANNING常量值是1,這個操做實質上就是將scanState變量的個位置0,也就是變成了偶數並標記它要忙了. //顯然偶數才表示忙碌,這也是爲何前面以爲官方註釋scanState是奇數表示"正在掃描"很奇怪. scanState &= ~SCANNING; //將currentSteal設置爲傳入的任務,並運行該任務,若該任務內部進行了分叉,則進入相應的入隊邏輯. (currentSteal = task).doExec(); //執行完該任務後,將currentSteal置空.將該task釋放掉,幫助gc. U.putOrderedObject(this, QCURRENTSTEAL, null); //調用前面提到的,根據mode選擇依次pop或poll的方式將本身的工做隊列內的任務出隊並執行的方法. execLocalTasks(); //到此,本身隊列中的全部任務都已經完成.包含偷來的任務fork後又入隊到本身隊列的子任務. //取出owner線程.處理偷取任務有關的一些信息. ForkJoinWorkerThread thread = owner; if (++nsteals < 0) //發現當前WorkQueue偷來的任務數即將溢出了,將它轉到線程池. transferStealCount(pool); //取消忙碌標記. scanState |= SCANNING; if (thread != null) //執行afterTopLevelExec勾子方法,上一節中介紹ForkJoinWorkerThread時已介紹. thread.afterTopLevelExec(); } //方法結束,注意,沒有任何操做將task從所在的數組中移除,不論這個task是哪一個WorkQueue中的元素. //同時,此方法原則上講能夠屢次調用(儘管事實上就一次調用),入口處和出口處分別用忙碌標記來標記scanState,但重複標記顯然不影響執行. } //若是線程池中已經初始化了用於記錄的stealCounter,則用它加上當前WorkQueue的nsteals/或最大整數(發生溢出時). //並初始化當前WorkQueue的nsteals. final void transferStealCount(ForkJoinPool p) { AtomicLong sc; if (p != null && (sc = p.stealCounter) != null) { //線程池中存放了stealCounter,它是一個原子整數. int s = nsteals; nsteals = 0; //恢復0. //若nsteals是負,增長最大整數,不然增長nsteal sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s)); } } //若是task存在,則將它從隊列中移除並執行,發現有位於頂部的取消任務,則移除之,只用於awaitJoin. //若是隊列空而且任務不知道完成了,則返回true. final boolean tryRemoveAndExec(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; int m, s, b, n; //進入if的條件,存在非空任務數組,參數task非空. if ((a = array) != null && (m = a.length - 1) >= 0 && task != null) { //循環條件,隊列非空.從s開始遍歷到b,也就是從頂到底.後進先出. while ((n = (s = top) - (b = base)) > 0) { //1.內層循環. for (ForkJoinTask<?> t;;) { //2.從頂開始的索引j,每次向下找一個. long j = ((--s & m) << ASHIFT) + ABASE; if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) //3.取出的是空,返回值取決於top是否是內層循環是第一次運行,外循環每次會將s更新爲新top, //內循環則會每次將s減一.內循環只跑了一次的狀況,顯然會返回true. //顯然這種狀況下top也沒有被其餘線程更新,內循環又是第一次跑,那麼將足以說明當前隊列爲空,該爲false. //true的狀況,向下遍歷了幾個元素打到了底,未進入46 10這三種要重開啓一輪外循環的狀況,也沒找到task. //無論怎樣,發現空任務就返回. return s + 1 == top;// 比預期短,第一個或第n個出現了空值,但循環條件未false else if (t == task) { //找到的任務t不是空,且是目標任務. boolean removed = false; if (s + 1 == top) { //4.發現是首輪內循環,s+1==top成立,進行pop操做,將task彈出並將top減一. //顯然,task是最頂任務,能夠用pop方式,將它置空. if (U.compareAndSwapObject(a, j, task, null)) { U.putOrderedInt(this, QTOP, s); //5.置removed爲true. removed = true; } } //6.不是首輪循環,並且base沒有在處理期間發生改變. else if (base == b) //7.嘗試將task替換成一個EmptyTask實例.成功則removed是true, //這樣雖然該任務出了隊,但在隊上還有一個空的任務,而不會出現前面擔憂的中間null //的狀況,也不改變top或base的值. removed = U.compareAndSwapObject( a, j, task, new EmptyTask()); if (removed) //8.只要任務成功出隊(不管是4仍是7,則執行. task.doExec(); //9.只要找到任務,退出內循環,回到外循環重置相應的條件. break; } //10.本輪內循環沒找到匹配task的任務. else if (t.status < 0 && s + 1 == top) {//官方註釋是取消. //11.若t是完成的任務且是首輪內循環且top未變更,將該任務出隊並令top減一. if (U.compareAndSwapObject(a, j, t, null)) U.putOrderedInt(this, QTOP, s); //12.只要進入此分支就退出內循環. break; } if (--n == 0) //13.內循環每執行到此一次,就說明有一次沒找到目標任務,減小n(開始時的base top差值).達0時返回false中止循環. //即每一個內循環都只能執行n次,進入外循環時重置n. return false; } //14.結束了任何一輪內循環時,發現目標task已經完成,則中止外循環返回false. if (task.status < 0) return false; } } //15.task參數傳空,或者當前WorkQueue沒有任務,直接返回true. return true; } //簡單梳理一下tryRemoveAndExec的執行流程和生命週期. //a.顯然,一上來就判隊列的空和參數的空,若是第一個if都進不去,按約定返回true. //b.通過1初始化一個內層循環,並初始化了n,它決定內循環最多跑n次,若是內循環一直不break(9找到任務或12發現頂部任務是完成態),也假定通常碰不到14(發現目標任務完成了) //也沒有出現幾種return(3查出null,14某輪內循環目標task發現被完成了),那麼最終只會耗盡次數,遍歷到底,在13處return false(肯定此輪循環task不在隊列) //c.若是出現了幾種break(9,12),9其實表明查到任務,12表明頂部任務已完成(官方說取消),那就會中止內循環,從新開啓一輪外循環,初始化n,繼續重新的top到base遍歷(b). //但此時,可能找不到task了(它已經在上一輪內循環出隊或被替換成代理),但也可能實際上未出隊(該task不是top,即4,base也發生了改變形成7未執行),那麼可能在本輪循環 //找到任務,在b中進入相應的break,而且成功移除並會進入d,也可能沒進入break而是再重複一次b. //d.若是某一次break成功刪除了任務,那麼外循環更新了n,base,top,重啓了一次內循環,可是全部找到task的分支不會再有了,若是接下來再也不碰到被完成(取消)的頂部任務11-12, //一樣也沒發現目標task完成了(不進14),那麼最終的結果就是n次內循環後n下降到0,直接return false. //e.從b-d任何一次內循環在最後發現了task結束,當即返回false.不然,它可能在某一次內循環中彈出並執行了該任務,卻可能一直在等待它完成,所以這個機制可讓等待task完成前, //幫助當前WorkQueue清理頂部無效任務等操做. //此方法適用於不論共享或者獨有的模式,只在helpComplete時使用. //它會彈出和task相同的CountedCompleter,在前一節講解CountedCompleter時已介紹過此方法. //父Completer僅能在棧鏈上找到它的父和祖先completer並幫助減掛起任務數或完成root,但在此處 //它能夠幫助棧鏈上的前置(子任務),前提是要popCC彈出. final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) { int s; ForkJoinTask<?>[] a; Object o; //當前隊列有元素. if (base - (s = top) < 0 && (a = array) != null) { //老邏輯從頂部肯定j. long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; if ((o = U.getObjectVolatile(a, j)) != null && (o instanceof CountedCompleter)) { //當前隊列中存在類型爲CountedCompleter的元素.對該completer棧鏈開啓一個循環. CountedCompleter<?> t = (CountedCompleter<?>)o; for (CountedCompleter<?> r = t;;) { //對該CountedCompleter及它的completer棧元素進行遍歷,每個遍歷到的臨時存放r. //找到r==task,說明有一個completer位於task的執行路徑. if (r == task) { //mode小於0,這個mode其實有誤解性,它的調用者實際上是將一個WorkQueue的config傳給了這個mode. //而config只有兩處初始化,一是將線程註冊到池的時候,初始化WorkQueue, //二是外部提交的任務,使用externalSubmit時新建的WorkQueue,config會是負值且沒有owner. //它也說明是共享隊列,須要有鎖定機制.. if (mode < 0) { //另外一個字段qlock派上了用場,將它置爲1表示加鎖. if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { //加鎖成功,在top和array這過程當中未發生變更的狀況下,嘗試 //將t出隊,此時t是棧頂上的元素,它的completer棧鏈前方有task. if (top == s && array == a && U.compareAndSwapObject(a, j, t, null)) { U.putOrderedInt(this, QTOP, s - 1); U.putOrderedInt(this, QLOCK, 0); return t; } //不論出隊成功仍是失敗,解鎖. U.compareAndSwapInt(this, QLOCK, 1, 0); } } //非共享隊列,直接將t出列. else if (U.compareAndSwapObject(a, j, t, null)) { U.putOrderedInt(this, QTOP, s - 1); return t; } //只要找到,哪怕兩處cas出現不成功的狀況,也是競態失敗,break終止循環. break; } //r不等於task,找出r的父並開始下輪循環,直到root或找到task爲止. else if ((r = r.completer) == null) // try parent break; } } } //空隊列,頂部不是Completer或者不是task的子任務,返回null. return null; } //嘗試在無競態下偷取此WorkQueue中與給定task處於同一個completer棧鏈上的任務並運行它, //若不成功,返回一個校驗合/控制信號給調用它的helpComplete方法. //返回規則,成功偷取則返回1;返回2表明可重試(被其餘小偷擊敗),若是隊列非空但未找到匹配task,返回-1, //其餘狀況返回一個強制負的基準索引. final int pollAndExecCC(CountedCompleter<?> task) { int b, h; ForkJoinTask<?>[] a; Object o; if ((b = base) - top >= 0 || (a = array) == null) //空隊列,與最小整數(負值)取或做爲信號h h = b | Integer.MIN_VALUE; // to sense movement on re-poll else { //從底部取索引j long j = (((a.length - 1) & b) << ASHIFT) + ABASE; //用該索引取task取出null,說明被捷足先登了,信號置爲可重試. if ((o = U.getObjectVolatile(a, j)) == null) h = 2; // retryable //取出的非空任務類型不是CountedCompleter.說明不匹配,信號-1 else if (!(o instanceof CountedCompleter)) h = -1; // unmatchable else { //是CountedCompleter類型 CountedCompleter<?> t = (CountedCompleter<?>)o; for (CountedCompleter<?> r = t;;) { //基本同上個方法的邏輯,只是上個方法t取的是top,這裏取base. //r從t開始找它的父,直到它自己或它的父等於task.將它從底端出隊. if (r == task) { if (base == b && U.compareAndSwapObject(a, j, t, null)) { //出隊成功,由於咱們找的是base,且競態成功,直接更新base便可. base = b + 1; //出隊後執行該出隊的任務.返回1表明成功. t.doExec(); h = 1; // success } //base被其餘線程修改了,或者cas競態失敗.(實際上是一個狀況),信號2,能夠重新的base開始重試. else h = 2; // lost CAS //只要找到task的子任務就break,返回競態成功或可重試的信號. break; } //迭代函數,當前r不是task,將r指向它的父,直到某一個r的父是task或者是null進入else if. else if ((r = r.completer) == null) { //可以進來,說明r已經指向了root,卻沒有找到整條鏈上有這個task,返回信號爲未匹配到. h = -1; // unmatched break; } } } } return h; } //若是當前線程擁有此隊列且明顯未被鎖定,返回true. final boolean isApparentlyUnblocked() { Thread wt; Thread.State s; //前面提過的scanState會在一上來runTask時和1的反碼取與運算,直到運行完任務纔會反向運算. //這個過程,scanState的最後一位會置0,但這與此判斷條件關係不大. //前面對scanState有所註釋,小於0表明不活躍. return (scanState >= 0 && //隊列處於活躍態且當前線程的狀態不是阻塞,不是等待,不是定時等待,則返回true. (wt = owner) != null && (s = wt.getState()) != Thread.State.BLOCKED && s != Thread.State.WAITING && s != Thread.State.TIMED_WAITING); } }
到此終於WorkQueue內部類的代碼告一段落.
這一段介紹了WorkQueue的內部實現機制,以及與上一節有關的提到的CountedCompleter在幫助子任務時處於WorkQueue的實現細節(彷佛默認狀況下即asnycMode傳true時只會從當前工做線程隊列取頂部元素,從其餘隨機隊列的底部開取,有可能能夠重複取,具體細節到ForkJoinPool的helpComplete相關源碼再說),以及構建好的WorkQueue會有哪些可能的狀態和相應的字段,以及若干模式(同步異步或者LIFO,FIFO等),出隊入隊的操做,還提出了隊列中元素爲何中間不能爲空,若是出現要將中間元素出隊怎麼辦?別忘了答案是換成一個EmptyTask.
不妨小結一下WorkQueue的大體結構.
1.它規避了僞共享.
2.它用scanState表示運行狀態,版本號,小於0表明不活躍維護了忙碌標記,也用scanState在runTask入口開始運行任務時標記爲忙碌(偶數),結束後再取消忙碌狀態(奇數).註釋解釋奇數表明正在掃描,但從代碼語義上看正好相反
3.它維護了一個能夠擴容的數組,也維護了足夠大的top和base,[base,top)或許能夠形象地表示它的集合,pop是從top-1開始,poll從base開始,當任務壓入隊成功後,檢查若top-base達到了數組長度,也就是集合[base,top)的元素數達到或者超過了隊列數組長度,將對數組進行擴容,因使用數組長度-1與哈希值的方式,擴容先後原數組元素索引不變,新壓入隊列的元素將在此基礎上無縫添加,所以擴容也規避了出現中間任務null的狀況.初始容量在runWorker時分配.
4.它維護了偷取任務的記錄和個數,並在溢出等狀況及時累加給池.它也維護了阻塞者線程和主人線程.
5.它可能沒有主人線程(共享隊列),或有主人線程(非共享,註冊入池時生成)
6.它維護了隊列鎖qlock,但目前僅在popCC且當前爲共享隊列狀況下使用,保證爭搶的同步.
7.其餘一些字段如config,currentJoin,hint,parker等,須要在後續的線程池自身代碼中結合前面的源碼繼續瞭解,包含stackPred聽說保持前一個池棧的運行信號.
WorkQueue本質也是一個內部類,它雖然定義了一系列實現,但這些實現方法的調度仍是由ForkJoinPool來實現,因此咱們仍是要回歸到ForkJoinPool自身的方法和公有api上,遇到使用上面WorkQueue定義好的工具方法時,咱們再來回顧.
前面已經看了一些影響WorkQueue的位於ForkJoinPool的常量,再來繼續看其餘的ForkJoinPool中的一些常量.
//默認線程工廠.前面提過兩個實現 public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; //是否容許啓動者在方法中殺死線程的許可,咱們忽略這方面的內容. private static final RuntimePermission modifyThreadPermission; //靜態的common池 static final ForkJoinPool common; common池的並行度. static final int commonParallelism; //tryComensate方法中對構造備用線程的創造. private static int commonMaxSpares; //池順序號,建立工做線程會拼接在名稱上. private static int poolNumberSequence; //同步方法同,遞增的池id. private static final synchronized int nextPoolId() { return ++poolNumberSequence; } // 如下爲一些靜態配置常量. //IDLE_TIMEOUT表明了一個初始的納秒單位的超時時間,默認爲2s,它用於線程觸發靜止停頓以等待新的任務. //一旦超過了這個 時長,線程將會嘗試收縮worker數量.爲了不某些如長gc等停頓的影響,這個值應該足夠大 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec //爲應對定時器下衝設置的空閒超時容忍度. private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms //它是commonMaxSpares靜態初始化時的初值,這個值遠超普通的須要,但距離 //MAX_CAP和通常的操做系統線程限制要差很遠,這也使得jvm可以在資源耗盡前 //捕獲資源的濫用. private static final int DEFAULT_COMMON_MAX_SPARES = 256; //在block以前自旋等待的次數,它在awaitRunStateLock方法和awaitWork方法中使用, //但它事實上是0,所以這兩個方法其實在用隨機的自旋次數,設置爲0也減小了cpu的使用. //若是將它的值改成大於0的值,那麼必須設置爲2的冪,至少4.這個值設置達到2048已經能夠 //耗費通常上下文切換時間的一小部分. private static final int SPINS = 0; //種子生成器的默認增量.註冊新worker時詳述. private static final int SEED_INCREMENT = 0x9e3779b9;
上面都是一些常量的聲明定義,下面看一些與線程池config和ctl有關的常量,以及前面構造器提過的變量.
// 高低位 private static final long SP_MASK = 0xffffffffL;//long型低32位. private static final long UC_MASK = ~SP_MASK;//long型高32位. // 活躍數. private static final int AC_SHIFT = 48;//移位偏移量,如左移到49位開始. private static final long AC_UNIT = 0x0001L << AC_SHIFT;//1<<48表明一個活躍數單位. private static final long AC_MASK = 0xffffL << AC_SHIFT;//long型高16位(49-64) // 總數量 private static final int TC_SHIFT = 32;//移位偏移量,33位開始. private static final long TC_UNIT = 0x0001L << TC_SHIFT;//1<<32表明一個總數量 private static final long TC_MASK = 0xffffL << TC_SHIFT;//33-48位 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); //第48位 //與運行狀態有關的位,顯而後面的runState是個int型,這些移位數也明顯是int的範圍. //SHUTDOWN顯然必定是負值,其餘值也都是2的冪. private static final int RSLOCK = 1;//run state鎖,簡單來講就是奇偶位. private static final int RSIGNAL = 1 << 1;//2 運行狀態的喚醒. private static final int STARTED = 1 << 2;//4,啓動 private static final int STOP = 1 << 29;//30位,表明停. private static final int TERMINATED = 1 << 30;//31位表明終止. private static final int SHUTDOWN = 1 << 31;//32位表明關閉. //實例字段. volatile long ctl; // 表明池的主要控制信號,long型 volatile int runState; // 能夠鎖的運行狀態 final int config; // 同時保存了並行度和模式(開篇的構造函數) int indexSeed; // 索引種子,生成worker的索引 volatile WorkQueue[] workQueues; // 工做隊列的註冊數組. final ForkJoinWorkerThreadFactory factory;//線程工廠 final UncaughtExceptionHandler ueh; // 每個worker線程的未捕獲異常處理器. final String workerNamePrefix; // 工做線程名稱前綴. volatile AtomicLong stealCounter; // 表明偷取任務數量,前面提過,官方註釋說也用做同步監視器
僅僅看這些字段的簡單描述是沒法完全搞清楚它們的含義的,仍是要到應用的代碼來看,咱們繼續向下看ForkJoinPool中的一些方法.
//嘗試對當前的runState加鎖標誌位,並返回一個runState,這個runState多是原值(無競態)或新值(競態且成功). //不太準確的語言能夠說是"鎖住"runState這個字段,其實不是,從代碼上下文看, //該標誌位被設置爲1的期間,嘗試去lock的線程能夠去更改runState的其餘位,好比信號位. //而lockRunState成功的線程則是緊接着去更改ctl控制信號,工做隊列等運行時數據,故能夠稱runState在鎖標誌這一塊 //能夠理解爲運行狀態鎖. private int lockRunState() { int rs; //runState已是奇數,表示已經鎖上了,awaitRunState return ((((rs = runState) & RSLOCK) != 0 || //發現本來沒鎖,嘗試將原rs置爲rs+1,即變爲奇數. !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? //原來鎖了或者嘗試競態加鎖不成功,等待加鎖成功,不然直接返回rs. awaitRunStateLock() : rs); } //自旋或阻塞等待runstate鎖可用,這與上面的runState字段有關.也是上一個方法的自旋+阻塞實現. private int awaitRunStateLock() { Object lock; boolean wasInterrupted = false; for (int spins = SPINS, r = 0, rs, ns;;) { //每輪循環重讀rs. if (((rs = runState) & RSLOCK) == 0) { //1.發現rs仍是偶數,嘗試將它置爲奇數.(鎖) if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { //2,鎖成功後發現擾動了,則擾動當前線程,catch住不符合安全策略的狀況. if (wasInterrupted) { try { //2.1擾動.它將影響到後面awaitWork方法的使用. Thread.currentThread().interrupt(); } catch (SecurityException ignore) { } } //2.2返回的是新的runStatus,至關於原+1,是個奇數. //注意,此方法中只有此處一個出口,也就是說必需要鎖到結果. return ns; } } //在1中發現被鎖了或者2處爭鎖競態失敗. else if (r == 0) //3.全部循環中只會執行一次,若是簡單去看,nextSecondarySeed是一個生成 //僞隨機數的代碼,它不會返回0值.r的初值是0. r = ThreadLocalRandom.nextSecondarySeed(); else if (spins > 0) { //4.有自旋次數,則將r的值進行一些轉換並開啓下輪循環.默認spins是0,不會有自旋次數. //從源碼來看,自旋的惟一做用就是改變r的值,使之可能從新進入3,也會根據r的結果決定是否減 //少一次自旋. //r的算法,將當前r的後6位保留,用r的後26位與前26位異或被保存爲r的前26位(a). //再將(a)的結果處理,r的前21位保持不變,後11位與前11位異或並保存爲r的後11位(b). //再將(b)的結果處理,r的後7位保持不變,用前25位與後25位異或並保存爲r的前25位(c) //箇中數學原理,有興趣的研究一下吧. //顯然,自旋次數並非循環次數,它只能決定進入6中鎖代碼塊前要運行至少幾輪循環. r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift if (r >= 0) //通過上面的折騰r還不小於0的,減小一個自旋次數. //自旋次數不是每次循環都減一,但減到0以後不表明方法中止循環,而是進入2(成功)或者6(阻塞). --spins; } //某一次循環,r不爲0,不能進入3,自旋次數也不剩餘,不能進入4.則到此. else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) //5.線程池的runState表示還未開啓,或者還未初始化偷鎖(stealCounter),說明 //還沒完成初始化,此處是初始化時的競態,直接讓出當前線程的執行權.等到從新獲取執行權時, //從新循環,讀取新的runState並進行. Thread.yield(); // initialization race else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {//可重入 //6.沒能對runState加鎖,也不是5中的初始化時競態的狀況,嘗試加上信號位,以stealCounter進行加鎖. //顯然,這種加信號位的加法不會由於信號位而失敗,而會由於runState的其餘字段好比鎖標識位失敗,這時 //從新開始循環便可. synchronized (lock) { //明顯的double check if ((runState & RSIGNAL) != 0) { //6.1當前pool的runState有信號位的值,說明沒有線程去釋放信號位. try { //6.2runState期間沒有被去除信號位,等待. lock.wait(); } catch (InterruptedException ie) { //6.3等待過程當中發生異常,且不是記錄一個標記,在2處會因它中斷當前線程. if (!(Thread.currentThread() instanceof ForkJoinWorkerThread)) wasInterrupted = true; } } else //6.4當前runState沒有信號位的值,說明被釋放了,順便喚醒等待同步塊的線程.讓他們繼續轉圈. lock.notifyAll(); } } } } //解鎖runState,前面解釋過,這個鎖能夠理解爲對runState的鎖標誌位進行設定,而設定成功的結果就是能夠改信號量ctl. //它會解鎖runState,並會用新的runState替換. private void unlockRunState(int oldRunState, int newRunState) { //首先嚐試cas.cas成功可能會致使上一個方法中進入同步塊的線程改走6.4喚醒阻塞線程. if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { //cas不成功,直接強制更改. Object lock = stealCounter; runState = newRunState;// 這一步可能清除掉信號位.使上一個方法中已進入同步塊的線程改走6.4 if (lock != null) //強制更換爲新的運行狀態後,喚醒全部等待lock的線程. synchronized (lock) { lock.notifyAll(); } } }
上面的幾個方法是對runState字段進行操做的,並利用了信號位,鎖標識位,運行狀態位.
顯然,雖然能夠不精確地說加鎖解鎖是對runState的鎖標識位進行設置,嚴格來講,這倒是爲ctl/工做隊列等運行時數據服務的(後面再述),顯然精確說是對運行時數據的修改權限加鎖.
一樣的,加鎖過程採用自旋+阻塞的方式,整個循環中同時兼容了線程池還在初始化(處理方式讓出執行權),設定了自旋次數(處理方式,隨機數判斷要不要減小自旋次數,自旋次數降0前不會阻塞)這兩種狀況,也順便在阻塞被擾動的狀況下暫時忽略擾動,只在成功設置鎖標識位後順手負責擾動當前線程.
簡單剝離這三種狀況,加鎖過程是一輪輪的循環,會嘗試設置鎖標識位,成功則返回新標識,不成功則去設置信號位(可能已經有其餘線程設置過了),設置信號位也是使用原子更改,即便其餘線程設置過信號位,原子更改也會成功,惟一能形成失敗的是runState的其餘位發生變動,並且極可能是由於鎖標識位被釋放的緣由.
unlock操做也並不複雜,若是傳入的newRunState參數依舊錶明瞭鎖,那麼不會有任何效果.這裏只考慮有效果的狀況,即取消了鎖(註釋說必須取消鎖標識位,unlockRunState的方法確實都這麼作的),若沒有其餘線程競態修改runState,則解鎖直接經過一個cas成功,也不須要去喚醒其餘線程.不然在解鎖操做嘗試去用cas釋放鎖標識位的時候沒有成功,說明在unlock操做的線程讀取到runState後又有其餘線程對它進行了更改,那麼直接暴力重置爲newState並喚醒阻塞線程.
這彷佛存在一個bug.若是加鎖時出現了競態,若干個加鎖線程被阻塞,此時信號位和鎖位都有值,過了一會,有一個線程去解鎖,在解鎖前,它先讀取了如今的runState,而後用cas去修改,由於此時沒有競態,解鎖線程在讀取到runState到cas期間,沒有任何線程去改過runState,那麼解鎖線程直接就會cas成功,而不會去喚醒前面阻塞的線程.
不過還有一個解鎖的地方,去嘗試加鎖的線程,在同步代碼塊內發現runState的信號位被釋放,就當即喚醒全部阻塞的線程,但若是此後沒有新的加鎖線程進入呢?或者沒有人將runState的信號位取消呢?
因而做者仔細查看了runState的設置,沒有任何一個地方顯式地將信號位釋放,所以一度判斷在awaitRunStateLock方法中不會有任何線程去喚醒其餘等待加鎖的線程.
因此在上述方法內部這個底層的層面是不能解決線程鎖死的.但在加鎖lockRunState和解鎖unlockRunState兩個方法的調用者處來看,每個線程都是先加鎖後解鎖,而且在加鎖後記錄當時的rs,解鎖時嘗試用該rs去cas,若能成功,說明本身加鎖到解鎖期間沒有任何線程嘗試加鎖(嘗試加鎖不成會修改runState致使cas失敗).所以直接cas釋放就好了,但若是發生了其餘線程在它解鎖前的加鎖阻塞,則前一個線程在解鎖時會cas失敗,所以將強制轉換爲本身加鎖時記錄的rs去除鎖標識位的結果(思考:這個結果會不會包含信號位?)並喚醒等待線程.
涉及到信號位的狀況有明顯的邏輯暗坑,做者在註釋中提出大量問題(好比是否是一旦設置了信號位就不會取消,確實代碼中沒有明顯的信號位取消代碼),若是不看接下來的這一段,極可能會認爲道格大神終於寫出了bug.這一塊要費大腦分析,前面說過,沒有看到任何地方去明確釋放信號位,好比runState=rs&~RSIGNAL或者相應的unsafe機制.並且這個機制的安全性不能自我保證,它取決於調用者的實現,此處還未講到相關代碼,鑑於讀者可能與做者有一樣的急於尋得這個問題答案的心情,咱們先不去羅列更冗長的加解鎖使用代碼,而先用語言來形容這個過程,原來這也是一件實現很巧妙的事:
開始狀態,假定有多個線程去嘗試加鎖,操做完再解鎖,咱們來分析一波,看看信號位是否會重置.
1.首先,第一個線程毫無疑問會讀取到原來的runState,並把它賦給一個臨時變量rs(有關代碼後面會貼),而後在一系列操做後,將rs去除鎖標識位(哪怕它根本沒置過鎖標識位),做爲newState調用unlock.
2.若是在第一個線程加鎖到解鎖期間,rs從未變過,那麼該線程會直接cas成功.
3.若是第一個線程加鎖到解鎖期間,有n個線程嘗試加鎖,那麼他們會阻塞在lockRunState方法的同步塊(忽略前面提到的自旋等操做),而且會更改runState到新的值(加信號位),此時第一個線程記錄的rs會成爲髒數據.因此當第一個線程去解鎖時(由於做者看到全部加解鎖都是成對出現的,不會有後續的線程在第一個線程解鎖前去嘗試解鎖),cas會失敗,它會將本身原來髒數據記錄的rs去除鎖標識位後強制設置爲runState的新值,而後喚醒阻塞的線程.由於它沒有阻塞過,設置原來的rs不會包含信號位,至關於清除了阻塞線程設置的信號位.
4.由於剛纔cas失敗,阻塞的線程在同步塊中被喚醒,阻塞在synchronized外的線程進代碼塊double check發現rs中已經沒有信號位了,也幫助喚醒其餘線程.
5.阻塞的線程被喚醒後(4中已經進入if後的wait線程)開啓了新的循環,有的競態成功並返回了ns(它與第一個線程返回值的差距是鎖標識位是1,但它一樣沒有信號位),其餘線程再次競態失敗阻塞,並更改了runState(加上了標識位).
6.此時可能有(4)被喚醒的某些線程(4中lockRunState進同步塊判斷)發現runState仍是有信號位的(由於早於它喚醒的至少兩個線程,一個成功,一個失敗再次設置信號位),它進入if並wait.
7.不論後方阻塞了多少個線程,5中成功的線程再次嘗試使用本身在lock成功時返回的ns(rs)去除鎖標識位去cas掉當前runState,由於後面有競態,顯然它也會失敗,那麼強制使用本身前面保存的,乾淨地未受信號位污染的rs在去除鎖標識位後替換了已經被阻塞者設置的runState.而後再次進入喚醒操做.
顯然,加鎖解鎖必須成對出現,僅有加了鎖成功的線程才能夠解鎖,本身加鎖本身解,只解本身的鎖.萬古定律在此也要堅持.
道格大神邏輯強大,做者卻不善描述,寥寥幾行代碼,區區一個信號位問題,卻用了上面一整大段來論述.感嘆一下差距.
接下來終於能夠看工做線程的建立,註冊入池和從池中卸載的過程.
//建立工做線程的代碼.嘗試工做線程的構建和開啓,它假定已經有別處維護了預留的增長總數, //建立和啓動過程當中出現任何異常,就執行工做線程的卸載.建立線程成功返回true. private boolean createWorker() { //前面構造器傳入的factory. ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { //建立線程成功 if (fac != null && (wt = fac.newThread(this)) != null) { //啓動該線程. wt.start(); //啓動也成功,返回true return true; } } catch (Throwable rex) { //出現異常,保存 ex = rex; } //用前面的異常卸載. deregisterWorker(wt, ex); //返回false. return false; } //createWorker的調用者tryAddWorker //嘗試添加一個worker,並在完成前增長ctl裏面記錄的數量(還記得前面的AC參數嗎?),但這個 //增長過程是否進行決定於createWorker返回的true仍是false.參數c是一個進入控制信號ctl. //它的總計數爲負且沒有空閒worker,cas(增長ctl)失敗時,若ctl未改變,則能夠刷新重試. //不然說明被添加了一個worker,那麼它也就不須要再繼續了.從方法的實現上看,c彷佛能夠傳任何值, //若是c傳入的值不等於當前ctl,則會多一次循環重讀ctl到c. private void tryAddWorker(long c) { boolean add = false;//1.標記add成功與否. do { //2.根據參數c生成一個新的ctrl/nc.c源自參數或者某一次循環讀取的ctl. //nc的值計算結果:c加上一個活躍單位1<<48,並對結果保留前16位, //c加上一個總數單位1<<32,並對結果保留第二個16位(33到48位), //nc等於上兩步的結果和.顯然,nc的後32位所有是0. long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); //3.上述計算過程當中ctl未改變. if (ctl == c) { //4.阻塞加鎖並判斷是否已在終止. int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) //5.加鎖成功且未終止,嘗試cas掉ctl. add = U.compareAndSwapLong(this, CTL, c, nc); //6.加鎖成功,不論cas ctl是否成功,解鎖. unlockRunState(rs, rs & ~RSLOCK); //7.若是已stop,break退出添加worker的步驟. if (stop != 0) break; if (add) { //8.加鎖成功,cas也成功,線程池未進入終止流程,建立worker. createWorker(); //建立成功當即break. break; } } //9.在2到3中間計算nc過程當中ctl改變,或4處未終止且5處未成功cas,則c讀取新的ctl,並判斷是否還要添加worker(新ctl存在ADD_WORKER位)且新ctl的後32位不存在數值. //這時爲前32位發生了變化,只能在新一輪循環處理.注:ADD_WORKER位是第48位,前面已提到.它是TC_MASK能負責的最高位. } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); }
前面是嘗試建立worker和決定建立worker的調度流程.
顯然,createWorker沒有什麼可冗餘介紹的,只須要了解上一文中的工做線程和線程工廠相關知識,很明顯,它不維護線程數量等規範,只負責在失敗狀況下解除建立完線程的註冊.
核心的邏輯在tryAddWorker中.
顯然,tryAddWorker支持傳入非ctl的參數c,代價是必定會多一次循環.若是傳入的c就是方法外讀取的ctl,且未發生競態,效果一是少一輪循環,效果二是不去判斷while循環的條件,即不須要ctl具有ADD_WORKER標記位且ctl的後32位無值.
整體流程:
1.根據傳入的c,對它加上active 數單位,總數單位,並去除整數位,獲得的結果即nc.
2.判斷一次ctl和c是否相等,不相等可能一是競態,可能二是傳參錯誤.
3.ctl和c相等,阻塞加鎖(前面分析過,lockRunState必定要阻塞到加鎖成功爲止,所以本方法不會讓加鎖失敗的線程去釋放鎖),前面說過,runState加鎖這一塊,本人暫時描述其鎖定的對象爲"運行時狀態",可見,工做線程也是運行時狀態之一.
4.加鎖成功且加鎖過程當中沒有開始終止,嘗試將ctl用cas設置成nc,這樣至關於同時增長了一個活躍單位和一個總數單位,並忽略掉整數位.
5.若是加鎖期間(3)發生了線程池終止則退出,若是加鎖並設置nc成功,則建立線程.
6.在2判斷條件不成立時,或4中cas失敗,重初始化ctl並驗證是否符合循環開始條件,即ctl知足後32位爲0且存在ADD_WORKER信號位.若不知足這個條件則直接退出循環,終止添加嘗試.
顯然,此方法容許傳入一個非法的c,只要符合循環條件,它會在白白計算一次nc後開始第二輪循環(2條件失敗),while條件後置,而6處的while條件比2處要更加嚴格,顯然,當傳入的c就是ctl且在計算nc期間未發生改變,則有機會成功cas掉舊的ctl並建立工做線程,即便不知足ADD_WORKER條件和整數位全0的條件.這是爲第一次循環單獨開的綠燈,也是do while循環的緣由.那麼這個"第一次"的綠燈究意爲什麼而開?咱們稍後分析.
還有6中的循環條件的32位整數問題,咱們也稍後分析,循環條件的另外一個特殊意義:除第一次循環參數傳入c的狀況外,其餘若干次循環可以成功添加worker(突破2的驗證)的條件是前32位不變,顯然這取決於其餘線程是否成功改了ctl到它計算的nc(這個操做會同時改變前16位和前17-32位).
至於ADD_WORKER位,顯然涉及線程個數,到此須要回憶一下最開頭列出的構造器.它是第48位,默認是1,顯然每添加一個worker,會在第49位和33位加一個1,添加到parallel個時,ctl將歸0(重點是tc高位將由1變0),而48位也會由1變0.(沒有其餘方法干擾的狀況)
顯然,ForkJoinPool的幾種公有構造器最終須要依託一個私有構造函數進行構造,而它的構造器沒有顯式調用父類的構造方法,構造器參數也沒有coreSize之類的能決定核心線程和最大線程數的有關設置.
是否是能夠理解爲構造ForkJoinPool時傳入的並行度值就是線程池的最大線程數量?
不嚴格說,能夠這樣理解.但咱們已經講完tryAddWorker方法,顯然,只要在方法外讀出c=ctl,並調用tryAddWorker(c),只要此時沒有其餘線程修改ctl,也沒有人終止線程池,顯然必定會成功,隨後我再次讀取新的c=ctl,往復這個過程,每一次都會成功,而實際上的線程數早已大於併發度.
到此終於能夠簡單理解ADD_WORKER和TC_UNIT,AC_UNIT了.ac字面意思是活躍線程數,TC則是總數,所以用tc的最高位(48)表示,當加入的超過總數,它會溢出.
接下來看線程註冊入池的方法.
//將線程註冊入池,同時返回一個WorkQueue,前一篇文章中提到過此方法,工做線程會在內部記錄這個隊列. final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; //設置成守護線程,這樣保證用戶線程都已釋放的狀況下關閉虛擬機. wt.setDaemon(true); if ((handler = ueh) != null) //構造器提供了異常處理器 wt.setUncaughtExceptionHandler(handler); //構建工做隊列. WorkQueue w = new WorkQueue(this, wt); int i = 0; //取出池config的17-32位保存爲mode, //前面提過,config在構造時由並行度(後15位)和模式(第17位)表示,根據17位是否有值決定FIFO或LIFO. //這個與運算進行後,至關於濾掉了並行度信息. int mode = config & MODE_MASK; //建立完隊列以後要加鎖,尤爲後面涉及到可能的數組擴容拷貝,以及一些判斷和重設隨機數等. int rs = lockRunState(); try { WorkQueue[] ws; int n; //前面構造器咱們看過,沒有初始化workQueues,因此若是一個線程此時來註冊是被忽略的. //顯然,使用它們的方法必定作了相應的保證.咱們後續再看. if ((ws = workQueues) != null && (n = ws.length) > 0) { //隊列非空,遞增種子. //indexSeed值是0,SEED_INCREMENT是每次相加的增量,它的值默認是0x9e3779b9(2654435769) //這是一個特殊的值,它的使用不只此一處,後面稍微介紹.這樣減小碰撞可能性. int s = indexSeed += SEED_INCREMENT; int m = n - 1;//ws數組長度-1,數組長度必定是偶數(後面介紹). i = ((s << 1) | 1) & m;//奇數位i. if (ws[i] != null) { //知足這個條件就是發生碰撞了,i已被佔用.初始化probes爲0 int probes = 0; //定義步長,數組長度不大於4,步長2,不然取n一半的第2至16位的結果(偶數)再加上2做爲步長. int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; //開啓循環,每次對i加上步長並與m求與運算,直到無碰撞爲止. while (ws[i = (i + step) & m] != null) { //每次循環增長probes,表示對同一個數組最多隻循環n次,達到次數要進行擴容重試. if (++probes >= n) { //當前數組已經嘗試n次,尚未找到無碰撞點,擴容數組一倍,原位置拷貝. //此處沒有任何加鎖動做,與循環以外建立好隊列以後的代碼共享一個鎖,也是lockRunState //可見只有指派索引相關的動做才須要加鎖. workQueues = ws = Arrays.copyOf(ws, n <<= 1); //重置條件. m = n - 1; probes = 0; } } } //s值保存給隊列的hint做爲隨機數種子.可見,此處至少可說明每一個註冊線程時建立的隊列都會有不一樣的hint,它也算是一個標識. w.hint = s; //隊列的配置,i與mode取或,mode只能是0或1<<16 //這個結果是將mode可能存放在隊列config的17位,從而和池中的config在模式這一塊保持一致. //i必定是一個不大於m(n-1)的奇數,而n必定不超事後16位(後面敘述),它和mode互不影響. //故隊列的config至關於同時保存了在池的workQueues數組的索引和所屬池的FIFO或LIFO. w.config = i | mode; //初始化scanState,以奇數i(索引)當值.至關於發佈了初始屏障. //(不理解?參考runState方法,一上來就將它的末位置0成偶數) w.scanState = i; //新建的隊列置於i處. ws[i] = w; } } finally { //解鎖. unlockRunState(rs, rs & ~RSLOCK); } //線程名,這裏看到一個有趣的事,無符號右移,i必定是個奇數,假定右移後的值是j,則2*j=i. wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); //返回隊列給線程. return w; }
如今回過頭來梳理一下registerWorker方法.
方法自己並不難理解,代碼也較檢查,難處在於要串起來前面全部有關內容.包含ForkJoinPool的構造器,包含ForkJoinPool中的config究竟是如何存放的,它進一步影響了新建隊列的config初始化.
咱們總結一下有關信息:
1.把線程註冊入ForkJoinPool,首先在整個方法的週期內,有一些咱們不須要重點費時間的雜項.如守護線程,異常處理器等設置,緊接着它當即爲線程初始化了一個工做隊列.
2.在1完成以前,不會進行加鎖操做,但後續涉及到將1初始化的工做隊列加入到ForkJoinPool內部維護的工做隊列數組(workQueues)中,須要爲它計算一個索引,而且應對可能的擴容操做,而這些步驟均須要加鎖進行.
3.在加鎖狀態下,也能過了池內隊列數組的非空驗證,註冊方法會嘗試使用全局的indexSeed 遞增 SEED_INCREMENT來肯定一個變量s,並將2s+1與數組長度減1進行與運算肯定索引值.其中,每一個線程都只會爲indexSeed加上一次的SEED_INCREMENT,這也是爲了減小衝突,而與workQueues數組長度(必定偶數)減1的與運算結果必定會是一個限於後16位的奇數,這說明,當前線程註冊入隊時的隊列只會放入到ws數組的奇索引上,註冊方法最後的代碼在對線程取名時,也將線程的號進行了索引位的無符號右移,這也側面說明了這一點.
關於SEED_INCREMENT,簡單說一下,最簡單的說法就是just math,純粹的數學了,用它是減小衝突的一種辦法,毫無疑問,s必定是n倍的SEED_INCREMENT,而2s+1與一個奇數進行與運算必定很難衝突,可是個中數學原理,做者倒是無從理解,也但願有路過的大牛幫助解釋.另外SEED_INCREMENT並不是使用一處,至少在咱們曾經數次一瞥的工具ThreadLocalRandom中有所使用,咱們曾屢次見到使用ThreadLocalRandom生成隨機數,好比這段代碼.
//前面看過此方法. static final int nextSecondarySeed() { int r; Thread t = Thread.currentThread(); if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) { r ^= r << 13; // xorshift r ^= r >>> 17; r ^= r << 5; } else { //原來secondary是0,localInit一個值. localInit(); if ((r = (int)UNSAFE.getLong(t, SEED)) == 0) r = 1; // avoid zero } UNSAFE.putInt(t, SECONDARY, r); return r; } //localInit static final void localInit() { //生成器是一個AtomicLong,從0開始,每次加入PROBE_INCREMENT. int p = probeGenerator.addAndGet(PROBE_INCREMENT); int probe = (p == 0) ? 1 : p; // skip 0 long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT)); Thread t = Thread.currentThread(); UNSAFE.putLong(t, SEED, seed); UNSAFE.putInt(t, PROBE, probe); } //初值 private static final int PROBE_INCREMENT = 0x9e3779b9
顯然,它也出現了一個從0開始遞加的初值,用於後續規避衝突,而這些數是數學中的魔幻,PROBE_INCREMENT與SEED_INCREMENT是同一個值:0x9e3779b9.
做者對這個值十分陌生,也沒法從數學概念上對它作出過深理解,它爲何能保證n倍的結果乘2+1後可用於在與運算中規避衝突?只能強行理解了.
順嘴一提,高併發下的兩大神器:LongAdder與DoubleAdder,它們會用到一個PROBE,這也與它有關,在做者所在公司的分佈式主鍵生成器中,其實底層也是使用了它.繼續回到註冊線程.
4.當發生碰撞時,i的遞增步step也有神奇的算法,把n超過16位的那一半拿出來加上2,再去除到奇數屬性,就是步長了,這意味着步長至少是2(在n不超過16位的狀況,已經夠大了,步長只有多是2),顯然,n在足夠大(大於16位)前步長恆定爲2,大到16位以上時,步長會隨着n變大而變大.彷彿像中學時代的分段方程,或許這樣描述更好.
step={n<16位 2;n>16位,前16位的值+2去除奇偶位}
在計算好步長後,緊接着i=((i+step)&(n-1)),顯然,step大到16位以上,那麼它的前16位將會在新的索引i上有所體現.這個體現也許有什麼美妙的數學價值.
5.每當出現索引衝突,依舊重複34且記錄循環次數,當循環次數達到n時,由於加了運行狀態鎖,線程能夠放心地操做ForkJoinPool的狀態,故能夠擴容.當不衝突時,將i(一個奇數)賦給隊列的scanState,意味着它初始沒有人標記爲正在掃描,將隊列的config保存i和mode.最後解鎖並返回隊列給線程.創建相互引用.
註冊入池的簡短方法至此就簡單分析完畢了.它留給咱們一些疑問:
1.爲何註冊入池的線程生成的WorkQueue只佔用ForkJoinPool的workQueues的奇數位?這與工做竊取是否有關?
2.scanState的初始化終於明瞭,是一個奇數,且它也與前面的SEED_INCREMENT有關,它是否在後面是否也發揮了做用?
3.前面設置的異常處理器的做用?
4.此處解決碰撞,計算隊列位於數組的索引等都依賴於workQueues的初始大小(每次擴一倍),它的初始大小又是如何肯定的?建立時機?顯然在它未初始化的前提下,隊列入數組會靜默入池失敗,可是隊列卻成功建立並返回給線程了.
這些問題須要暫時記在大腦,當看到相應的方法時再予解釋.
//解除註冊操做.它是一個要終止的工做線程的最終回調,或者建立失敗時也會回調.在介紹ForkJoinWorkerThread和前面createWorker時提過. //這會從數組中移除worker記錄,調整數量.若是池已經處在關閉進行中,嘗試幫助完成池的關閉. //參數wt是工做線程,構建失敗會是null,ex是形成失敗的異常.它也能夠是null. final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { //1.處理隊列從池的隊列數組中的移除. WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { //存在工做線程且該工做線程有隊列的邏輯. WorkQueue[] ws; //前面說過,隊列的config後16位表示索引,第17位表示mode. int idx = w.config & SMASK; //加運行時狀態鎖. int rs = lockRunState(); if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) //簡單的置空操做. ws[idx] = null; //解鎖. unlockRunState(rs, rs & ~RSLOCK); } //2.處理控制信號中保存的數量. long c; //循環直到減數成功. 哪怕有別的線程在競態減小,當前方法也要在新的ctl中減小數量. do {} while (!U.compareAndSwapLong //第49位減1. (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | //第33位減1. (TC_MASK & (c - TC_UNIT)) | //保留後32位. (SP_MASK & c)))); //3.該工做線程有隊列,且已經在1出了數組. if (w != null) { //把隊列鎖設定負數. w.qlock = -1; //把隊列中記錄的偷取任務數加到池中.前面已論述過此方法. w.transferStealCount(this); //取消隊列中全部存活的任務. w.cancelAll(); } //4.進入循環嘗試幫助關閉池或釋放阻塞線程,補償線程等. for (;;) { WorkQueue[] ws; int m, sp; //4.1 tryTerminate後面介紹,第一個參數true表明無條件當即結束,第二個參數true //表明下次tryTerminate將能夠結束. if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) //進入if,說明正在結束或已結束,沒什麼可作的了. break; //4.2控制信號後32位有數值,進嘗試釋放邏輯.這不是第一次看到ctl的後32了.對於ctl的前32位, //咱們已經經過構造函數和前面的代碼說明,它初始化時與並行度有關,並在後面存放了添加worker數量 //的值(但不能說存放了並行度,由於添加worker會改變相應的位),後32位的真相也開始浮出水面, //在前面的tryAddWorker中,第二輪及之後的循環條件要求後32位不能存在值.並且添加成功也會 //將後32位置0,故tryAddWorker的第一輪循環會清空後32位,與此有所影響. if ((sp = (int)(c = ctl)) != 0) { //後32位有值,嘗試release,tryRealease方法會將activeCount數量添加第三個參數的值, //若是第二個參數表明的隊列是空閒worker的棧頂,則釋放其內的阻塞者. if (tryRelease(c, ws[sp & m], AC_UNIT)) break; //僅有此處釋放失敗的狀況下,開啓下一輪循環,其餘分支均會退出循環. } else if (ex != null && (c & ADD_WORKER) != 0L) { //5這次解除註冊是由於異常,且當前添加worker信號依舊知足,則添加一個worker代替原來並退出. tryAddWorker(c); break; } else // 6.不須要添加補償worker,退 break; } if (ex == null) //前面記錄的異常不存在,幫助清理髒異常節點. ForkJoinTask.helpExpungeStaleExceptions(); else //存在異常,重拋. ForkJoinTask.rethrow(ex); }
deregisterWorker邏輯並不複雜,把隊列移出池,減小count,清理queue中的任務,此處又見到了WorkQueue中的另外一個屬性的使用,qlock.顯然qlock值取-1時,表明隊列已經失效了.
隊列移除後,方法還會嘗試作一些非本職工做.如嘗試終結線程池,知足條件則退出循環(顯然每一個線程的卸載都嘗試觸發池的終結);線程池未進入終結過程,則嘗試釋放parker的邏輯(若是有),嘗試成功也會退出循環,此兩種狀況(tryTerminate或tryRelease)會形成忽略了異常等信息,只有在二者均未成功的前提下,前去考慮參數中的異常.異常的處理邏輯很簡單,存在即重拋.
//若是當前活躍線程數過少,嘗試去建立或活化一個worker. //參數ws是想要找到被喚醒者的隊列數組(也就是任何一個ForkJoinPool的成員變量), //參數q是個非空的隊列,則方法只嘗試一次,不會重試. final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; while ((c = ctl) < 0L) { //1.添加worker步驟 //ctl小於0,表示active的太少.但彷佛也只能最多加上並行度的數量. if ((sp = (int)c) == 0) { //取ctl的後32位,終於,終於看明白了,這裏有一個註釋,sp==0表明無閒置worker. //但不表明後32位所有與閒置worker有關. if ((c & ADD_WORKER) != 0L) //ADD_WORKER位有值,說明總worker數量未達到. //通過三重關,添加worker. tryAddWorker(c); //知足添加worker的第一個條件,無閒置worker,不論有沒有成功建立新的worker,就都必定會退出循環. break; } //2.不存在空閒worker,驗證不知足喚醒流程的狀況. if (ws == null) //2.1隊列數組都還沒初始化,顯然池不知足條件. break; if (ws.length <= (i = sp & SMASK))// // 2.2隊列數組長度不大於ctl後16位.說明已進入終止態.退出(多像數組的length必定要大於索引) //又一次大揭密,ctl後16位彷佛與隊列數組的長度有關,並且存放的是一個索引. //此處隱含條件,ctl後32位不是0,將它的後16位取出來當索引i,要結合1處的條件. break; if ((v = ws[i]) == null) //2.3隊列數組長度正常, 使用索引(ctl的後15位)從ws中取不出隊列. //說明正在終止,退出. break; //3.知足了喚醒流程. //3.1計算數據,第一個爲下一個scanState,在前面的addWorker流程,咱們看到 //scanState的第一個值是在隊列數組中的索引.顯然索引不能亂變. //新的scanState值計算,老ctl的整數位在17位加1(SS_SEQ)再取它的後31位.顯然每次被喚醒都會走一次這個邏輯. int vs = (sp + SS_SEQ) & ~INACTIVE; int d = sp - v.scanState; //屏蔽沒必要要的cas. //3.2計算nc,它用老的ctl加一個活躍位(48位),而後只取出前32位,對後32位取出隊列v在上次掃描時存放的值(也是當時ctl的後32位). //這裏咱們又見到一個熟人:stackPred,接下來會有重要的方法使用它. long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); //3.3d是0說明ctl的後32位相對於原來v中存放的scanState沒有變化,那麼也就不須要cas. //d不是0,須要cas,用nc替換掉c. if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { //把v的scanDate置換成vs,激活了v. v.scanState = vs; if ((p = v.parker) != null) //有線程阻塞,喚醒. U.unpark(p); //激活成功,退出循環. break; } //4.上述過程沒有成功,看q是否提供,若是提供了不循環第二次. if (q != null && q.base == q.top) break; } }
signal這個方法自己邏輯並不複雜,重點在於咱們的幾點意外發現.
1.ctl的後16位原來能夠表示非終止狀態下線程池中的一個WorkQueue的索引.
2.scanState每次singal知足喚醒流程都會被嘗試置換(新值取決於sp,會在它的17位加1,並只取後31位.),前提是此時ctl的後32位(sp)與v中的scanState一致(差值d爲0)或可以替換ctl爲新值.置換新的scanState前會根據d來決定是否更換ctl,如有改變則cas掉ctl,將它替換成新值(原值增長了一個活躍數,並將後32位置爲v中上次scan保存的ctl後32位),d是0或d不是0且置換ctl成功,方纔去將v的scanState換爲vs.這一步成功,發現有阻塞線程則喚醒.
3.參數q全程醬油,惟一做用是信號,上述過程出現失敗,如cas不成功等,它非空則不循環第二次.
4.此處也是首次發現只增長活躍數不增長線程總數的狀況
此方法的執行只有三種結果:第一是知足活躍數/總數未達到最大,且無閒置數,則建立worker;第二是前一個不知足,且未在終止週期,知足嘗試喚醒的條件,會嘗試喚醒一個阻塞線程(它是ctl控制信號後32位索引去ws中取的w);第三種狀況,不知足前兩個條件,也不知足進入下個循環的條件,至關於什麼也沒作.
下面先來看tryRelease方法和runWorker方法,逐個分析未知字段的含義.
//喚醒並釋放worker v(隊列),若是它處於空閒worker棧的頂部,此方法是當至少有一個空閒worker //時的一個快速喚醒方式. //它的參數,c應當傳入此前讀取的ctl,v是一個工做隊列,若是不傳空,應當傳一個worker過來,inc表明活躍數的增長數. //若是成功釋放,則返回true. private boolean tryRelease(long c, WorkQueue v, long inc) { //相似上面的signalWork方法的計算方式,sp保存ctl的後32位,vs爲隊列v的下一個scanState. //值依舊是sp在17位加1並只取結果的31位. int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p; //判斷是否知足條件,v存在且v的scanState是sp //(言外之意sp保存的是一個v的scanState,別急,咱們離真相愈來愈近了,註釋說此條件表明v是棧頂) if (v != null && v.scanState == sp) { //知足了前述的條件,v是當前"棧頂",這個棧頂的含義有些奇怪,沒有棧,何來棧頂?別急. //計算新的ctl,算法同上,老ctl加上inc的結果的前32位給nc的前32位,v保存的stackPred做爲nc的後32位. //在前面deregisterWorker中,tryRelease方法傳入的inc爲一個AC_UNIT.至關於增長一個活躍數. long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred); //嘗試用前面計算的結果更新爲新值. if (U.compareAndSwapLong(this, CTL, c, nc)) { //控制信號成功更新爲nc,則將v的scanState保存爲vs. v.scanState = vs; if ((p = v.parker) != null) //存在parker,喚醒. U.unpark(p); return true; } } return false; }
在deregisterWorker方法中咱們簡單提過tryRelease方法(有調用,在完成),回憶一下該方法在這一塊的邏輯,在嘗試解除worker註冊時,會在完成解註冊自己操做後嘗試tryTerminate,若是執行後未進入terminate流程,則進入tryRelease,若是拋去中間這些雜在其內的步驟,能夠粗放理解爲釋放一個worker會嘗試一次釋放,並且條件是sp(即ctl的後32位)非0,表明有"空閒"線程須要釋放,並且會用sp做索引調用tryRelease,試圖釋放掉棧點worker並增長一個活躍數.
顯然,tryRelease方法有一個重大的不一樣,它徹底是一個"try",對於釋放操做只嘗試一次,真正重要的操做是否進行徹底取決於一個對ctl的cas,只有它成功,纔會進行scanState的更新和線程的喚醒.所以該方法是一個不須要加鎖更改ctl的方法.
結合前面總結過的若干方法,咱們再次理解ctl,scanState等字段.
1.ctl的前32位與線程池中的線程數有關,而在這前32位中,前16位是活躍數,後16位記錄總數.
2.ctl的後32位稍有些複雜,第32位是表明不活躍的字段(INACTIVE&它不是0).同時它的後16位又能夠表明某個隊列在隊列數組中的索引(signalWork方法中嘗試取出它的後16位並從隊列數組中取值),奇特的是它的後32位彷佛也能夠表明某個隊列在隊列數組中的索引,並且這個還應該是"棧頂",在deregisterWorker方法中直接用了它的後32位做爲索引.
可見,ctl的真相愈來愈近了,但還不夠近.
3.隊列的scanState也愈來愈明顯了,它的初值絕對是在當前隊列在池中隊列數組的索引(前面register方法中直接將i給了v.scanState),每次的釋放的更新則是直接忽略掉sp的首位(不活躍位,即32位置0)並在17位加1.那麼此時它還能表明索引嗎?
關鍵點在於,scanState的後16位會不會增長,其實最終的答案將在externalSubmit方法中獲得一個重要的補充,你會發現,線程池中的workQueues大小初值絕對不會超過16位,同時在上面的registWorker方法中每當出現衝突碰撞,會嘗試對workQueues擴容一倍,並且它並未作出限制,可是registerWork方法須要經tryAddWorker方法建立線程,再由線程對象調用它註冊入池,而後你會發現tryAddWorker只能由前面已介紹的deregistWorker與signalWorker調用,後者限定它可以成功的前提是均是sp=(int)c==0,也就是說,只有後ctl的後32位無值才能夠添加worker(添加worker時後32位直接被無視,結果直接置0)前者限定爲不滿後者的條件時,必須是異常的狀況且知足ADD_WORKER位未被重置,那麼它或許有一次機會添加worker,一旦失敗,在tryWorker的循環第二次將不成功.
繞了這麼半天,仍是在上一段的開頭,就是爲了說明scanState既然初始值是一個只有16位的奇數(add worker成功時初始化,非worker的隊列還未介紹到),儘管在每次release都在17位嘗試加1,目前來看,並不影響它保存本身的索引(後16位),問題就在於這後16位是否足夠,每次添加worker,確定要佔用一個新的索引,而添加worker完成並在註冊時發生衝突必定次數後會擴容(tryAddworker→registWorker),前面論述過,externalSubmit方法將workQueues數組初始化一個堅定不大於short型長度的數組(後面論述externalSubmit會簡單再提),而從signalWorker調用tryAddWorker時,ctl的後32位必須爲0,從deregisterWorker中雖然容許第一次不判斷int(c)==0,但它實際上已經卸載了一個worker.那麼至少從目前來看,在tryRelease和signalWork方法中使用ctl的後32位去計算新的scanState,隱含的意思彷佛是ctl的後32位此刻包含了"類"原來的scanState,並且新的scanState只用ctl的後32位的首位和17位,潛臺詞彷佛就是此時的ctl的後16位絕對能表示v的索引,不然咱們沒法用這個新的scanState(vs)去從數組中找到隊列.
提示:必定要結合上面signalWork源碼註釋中的(2.2隱藏條件),綜合tryRelease的註釋要求參數v是"棧頂",調用它的deregisterWork方法傳入的v是vs中索引爲sp&(n-1)所得,所以大膽的推測已經成熟.
1.ctl的後16位若是有值,將會是"棧頂"隊列在ws中的索引.它被用於釋放棧頂資源(如parker)時找到queue,以及對queue中的scanState若干步驟的從新設置,而從新設置時也不會影響後16位.所以索引在註冊依始就不會再變,擴容也不會改變它的位置.
2.scanState字面意思是掃描狀態,ForkJoinPool還有scan方法未介紹.
那麼,就用後面的代碼來驗證這個繞腦的推理吧.
//runWorker方法是線程運行的最頂層方法,它由ForkJoinWorkerThread在註冊成功後調用,也是所有生命週期. final void runWorker(WorkQueue w) { //1.一上來初始化數組,前面說過,WorkQueue內部的任務數組初始化是null. w.growArray(); //2.用seed保留構建時的hint隨機數,在registerWorker方法中曾介紹過, //會有一個隨機數s是保證每一個隊列不一樣的,且其中有一個每次增長一個值的成份,該值是個數學中很奇異的數字. //而hint的初值即這個s,它同時也被用於肯定隊列在ws中的索引,間接決定是否擴容. int seed = w.hint; //3.初始化r,並避免異或時出現0. int r = (seed == 0) ? 1 : seed; //4.循環. for (ForkJoinTask<?> t;;) { //5.嘗試"scan"(終於出現了有沒有)隊列w,使用隨機數r if ((t = scan(w, r)) != null) //6.scan到了一個任務t,則運行它.這是進入一個任務處理的主流程,前面已介紹過WorkQueue的runTask方法. //回憶一下,它會在過程當中把scanState標記爲忙碌. w.runTask(t); else if (!awaitWork(w, r)) //7.scan不到,嘗試等待任務,若是等待過一段時間還未等待,進入8重置r,繼續下輪循環scan.若awaitWork返回false表明應break結束worker. //關於awaitWork的返回咱們後面詳解. break; //8.或許只能說just math. r ^= r << 13; r ^= r >>> 17; r ^= r << 5; } }
runWorker是線程run方法中直接調用的,進入業務主邏輯,也結合了前面的runTask方法,初始化數組等方法和將要介紹的與scan有關的邏輯,還再一次用到了那個魔法數字.
傳給scan方法的r每一次循環(scan成功並運行,又await到了新的任務)都會從新賦個新值,做者看不懂新值的算法,但在咱們即將去了解的scan方法中使用了r來計算索引,所以做者更關心它的奇偶性.
很明顯,r不論初始是奇是偶,新計算的r值能夠是奇數也能夠是偶數.也就是說,使用r&(n-1)取出的ws中的一個WorkQueue,多是線程註冊時生成的一半之一,也可能不是.
接下來介紹scan方法.
//嘗試掃描隊列,並偷取一個"頂級"的任務.掃描開始於一個隨機位置(與r有關),若是在掃描過程當中發生了 //競態,則移動到一個隨機的位置繼續,不然線性地掃描,這個過程持續到在全部相同校驗和 //(校驗和的計算會採樣每一個隊列的base索引,而base索引會在每次偷的時候移動)的隊列上有兩次 //連續的空傳遞,此時worker會嘗試對隊列進行滅活並從新掃描,若是能找到一個task,則嘗試從新 //激活(從新激活能夠由別的線程完成),若是找不到task,則返回null用於等待任務.掃描過程應當減小內 //存使用,以及與其餘正在掃描的線程的衝突. //參數w爲目標隊列,r是前面傳遞的種子,返回task或null. private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; //當前工做線程必須是已經完成註冊的,即存在工做隊列,且r&m能取得它的隊列,不然直接返回null. if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { //1.scan方法是在runWorker的循環中調用的,初次調用時,scanState的值是i(前面說過),是個非負值. int ss = w.scanState; //scan方法內部開始循環. 用r&m,即w的索引給origin和k,初始化oldSum和checkSum爲0. for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; //2.選擇隊列q存在的邏輯. if ((q = ws[k]) != null) { //2.1 目標隊列q非空(自己base到top間至少存在1個,任務數組非空. if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { //計算任務數組的base索引(參考WorkQueue源碼). long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //2.2數組中取出base對應task存在,base未改變的邏輯. if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { //2.3 初始記錄的scanState不小於0,表明存活的邏輯. if (ss >= 0) { //2.4嘗試cas掉base處的任務,注意,必定只能從base開始,不會將任務數組中間的元素置空. if (U.compareAndSwapObject(a, i, t, null)) { //cas成功,更新base. q.base = b + 1; if (n < -1) //2.5發現隊列q的base到top間不止一個任務元素,則喚醒它可能存在的parker. //重溫一下signalWork的簡要邏輯,ctl後32位0且知足加worker條件,tryAddWorker, //條件不知足(忽略終止等判斷邏輯),則計算新的scanState(使用到原ctl的後32位)和ctl(使用原ctl的前32位和q的stackPred), //在cas爲新的ctl成功的前提下,換掉新的scanState. signalWork(ws, q); //2.6 只要2.4成功,返回彈出的任務. return t; } } //2.7 從scanState看已是inactive的狀況.嘗試活化. else if (oldSum == 0 && w.scanState < 0) //tryRelease前面已介紹過.嘗試釋放掉棧頂,顯然ws[m&(int)c]被視爲棧頂,即ctl的後32位(嚴格來講彷佛是後16位)表明棧頂的索引. //釋放時對ctl的增量是一個AC_UNIT. tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } //2.8 只要沒有進入2.4->2.6,重置origin,k,r,校驗和等參數並開啓下輪,但整個2工做線用不到,進入3工做線纔有用. if (ss < 0) // refresh //可能會有其餘搶到同一個隊列的worker在2.5/2.7處重活化了scanState,所以當它是inactive的狀況,重刷新一次. ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } //2.9校驗和增長b checkSum += b; } //3.持續迭代到穩定的邏輯. //這個表達式大概能夠理解,線性的增長k,每次加1,直到發現已經從一個origin轉滿了一圈或n圈. if ((k = (k + 1) & m) == origin) { //條件:scanState表示活躍,或者知足當前線程工做隊列w的ss未改變,oldSum依舊等於最新的checkSum(校驗和未改變) if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive //3.1知足前面註釋的條件,且w已經inactive,終止循環,返回null. break; //3.2又是這一段計算和替換的邏輯,只不過ns(new scanState)要加上非active標記. int ns = ss | INACTIVE; // try to inactivate //3.3嘗試計算用來替換的ctl,它的後32位爲ss加上非活躍標記,前32位減去一個活躍數單元.(終於到這了,參考前面分析的ctl前32後32位,驗證了) long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); //原來ctl的後32位存給隊列的stackPred. //注意,此時w.stackPred和新的ctl的後32位都有一個共性,那就是它們的後31位均可以用來運算並計算得w在ws的索引. w.stackPred = (int)c; // hold prev stack top //3.4先把w的scanState換成ns,再用cas換ctl爲nc. U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) //替換ctl成功,ss直接指向ns,省去一次volatile讀. ss = ns; else //3.5替換失敗,再把w的scanState設置回ss. w.scanState = ss; // back out } //3.6每發現回了一輪,校驗和置0. checkSum = 0; } } } return null; }
scan方法是個比較重要的方法,結合前面提過的runWorker,再次提示scan方法是在runWorker中循環調用的,固然每一次都伴隨着完成一波任務和等待新任務的到來.
單輪scan中也是處在循環的,沒有競態也找到了非空隊列的狀況下,顯然會很容易從base處出隊一個合法的任務,即從1->2->2.1->2.2->2.3->2.4(->2.5可選)→2.6,會返回正常查找的任務.
進入2表明某一輪循環開始時找到了隊列,這以後主要有三條線,其中一條線是前面說的1->2->2.1->2.2->2.3->2.4(→2.5可選)→2.6的正常線.
第二條線,1->2->2.1->2.3->2.4->2.7(在2.4的cas時發生競態失敗),或1->2→2.1→2.2→2.7(在2.2判斷任務爲空或者已經有其餘線程捷足先登地取走了base),最終結果是執行2.7的歸置操做(出現競態,隨機改變索引).
第三條線,1->2->2.1->2.9(2.1處發現目標WorkQueue已是空隊列),則增長一次校驗和.
這三條線都可以忽略起點(1),由於能夠在某次循環時從2開始.
任何一次循環到達3相應的有兩處入線,第一條1->2->3(在2處發現ws數組的k索引處尚且沒有隊列元素,這說明連工做線程註冊都沒有完成;第二條線是基於前面的第三條線,即1->2->2.1->2.9→(即2.1失敗,前面2的步驟,只要進入到2.1,就必定是return或者continue開啓下一輪).
單次循環到達3的兩條路線的區別是第二種狀況是校驗和會增長最初獲取到該隊列時讀取的base值(2.1).
從到達3開始分析,有兩條大分支,將取ws元素的下標k進行增1操做,判斷是否已經完成了一輪(等於上輪記錄的origin),未完成一輪,直接再開啓一次循環(由於一開始就沒讀出WorkQueue,因此無競態,線性加1),從上次記錄origin到如今已完成一輪的狀況進入3內的分支.
進入3,最後都會將校驗和歸0(3.6),也就是說每查完一整輪就會讓校驗和復位0.可能會根據scanState決定是否進入3.1,此處也有分線.
線路1,若當前隊列q已是非存活態(scanState是負數且這輪循環未更新,且校驗和在本輪循環中未改變),或發現隊列的qlock已經被鎖定負(前面講過要滅活),直接break,執行路徑3→3.1.
線路2,當q是存活態,由於已經找了一輪,沒有意義再去找了,將q滅活,相應的計算新的ctl和scanState的邏輯與前面tryRelease/signalWork的方式正好相反,scanState直接加上滅活標記位並存爲ns,且ns將交給新的ctl的後32位,新ctl的前32位則減去一個活躍單元,並把ctl原來後32位的狀態存給q的stackPred,這也就是ctl後32位能表示當前"棧頂"的緣由.並且這一過程當中,涉及q在ws中的索引有關的值不受影響,依舊能夠用ctl的後32位來找到它.前面說過,當進行release等操做時,能夠將棧頂(就是用ctl後32來取)的stackPred取出復位,正是由於這個原理(滅活時存放了此前的ctl後32位).
形象一點理解:
1.滅活一個q,則將它的scanState加上非存活標記位(不影響後面的索引標識,它是32最高位),將ctl的後32位變化後存給它的stackPred,將ctl設置新值,前32位進行減活躍數的邏輯,後32位用新的ns來替換.
2.再滅活一個新的w,重複上一個邏輯,則新的w是"滅活棧"的棧頂,新w的索引會保存在新的ctl裏,原ctl中存放的上一個q的索引被置爲當前w的stackPred.
3.release或者signal,對棧頂元素有一個相應的操做,將它從新激活,會將它的stackPred通過反向算法交給ctl,而它本身的scanState又簡單恢復成包含索引的合法結果(ctl後32位加一個標識位並去除非活躍位的1).release以後的ctl依舊可能存在非零的後32位(這取決於剛出的棧頂是否是棧底),一樣signal中的tryAddWorker,只有在ctl後32位乾淨時纔會調用,也說明了這一點.
注意,3.4對ss進行了加上INACTIVE標記位的操做,即令ss變成負值,但在3中並不會在此退出循環,下一輪循環中可能再次進入3.1知足了break條件並退出循環返回null,也可能進入2.9增長校驗和,或者在下一次循環中進入2.8從新刷新runState,這時若是此前已經有別的線程在2.7進行了當前worker的釋放或者tryRelease/signal等操做(共同的要求:當前worker此時是棧頂),會所以令下一次循環有機會從2.4返回.
終於串了起來,終於搞懂了WorkQueue之間這個又是數組又是棧的結構了.
顯然,這種棧的實現方式真是夠少見...它確實是個棧的結構,不過棧元素自身維護的數據須要不停地和外界(池)中的ctl進行後面位的交換.
到此,前面的難點與疑問終於清楚了.
同時也能夠發現,scan方法,runWorker方法和前面WorkQueue的runTask等方法共同組成了"工做竊取"的調度機制,明顯一個線程在註冊入池後啓動,每一輪大循環都會先從scan到一個task開始,獲取不到直接awaitWork,獲取到,則先執行task,再執行本身localTask,由於咱們提早介紹了ForkJoinTask和它的若干子類以及doug在官方文檔中給咱們的用例,所以你們很容易理解這一點,一個任務在運行過程當中,極可能會有新的任務由它而生併入池,如今沒有看到入池的源碼,但在前面介紹過,ForkJoinWorkerThread來fork出的任務入池時是入本身的隊列,外部線程提交入池的任務則是externalSubmit一類,這兩部分的源碼都會在後面介紹.
顯然一個線程剛剛啓動時,它的workQueue徹底是空的,相應的另外一個線程在scan時若獲取了它的隊列必然會忽略,當前線程也必須先從scan開始,從隨機的隊列中按必定的規則(衝突時重置一個隨機的索引位置,不衝突但發現未註冊worker等狀況時直接索引加1)去偷取一個原始的任務,並且從base(先入任務)開偷,而後先運行這個偷來的task,再運行本身的本地任務,在運行該task時,可能就會fork出多個子任務入了本身的任務數組,所以再運行本身的本地任務時纔有活可幹,完成全部本地任務後,runWorker進入下一輪循環,繼續scan->waitScan→runTask的流程.
接下來繼續看awaitWork方法.
//字面意思,等待有工做可作. //它其實可能會阻塞一個worker偷取任務的過程,若是worker應當關閉則直接返回false. //若是worker已經處於非活睡在態,且引發了線程池的靜寂,則檢查線程池的終結態,只要當前worker //不是惟一一個worker就等待一段時間.若是等待超時後ctl未改變(前32位的數量信息未變,後32位的棧信息也未變), //則終止當前worker,它可能會喚醒另外一個可能重複這個過程的worker //參數w,調用者worker,r是一個自旋用的隨機數種子,若是worker應當關閉,返回false. private boolean awaitWork(WorkQueue w, int r) { if (w == null || w.qlock < 0) // w is terminating //一個線程從註冊入池起就有隊列,若是它爲空或者qlock被置爲負(-1),應當終結. //前面提過,在deregisterWorker或tryTerminate時會將qlock置-1. return false; //初始化相關值,保留隊列中保存的前一個棧,取出隊列的ss,賦值自旋數.SPINS在前面分析 //運行狀態加鎖時介紹過,它的值當前就是0,參考awaitRunState方法,在等待runState鎖的時候,也能夠根據它先自旋. for (int pred = w.stackPred, spins = SPINS, ss;;) { if ((ss = w.scanState) >= 0) //1.隊列的scanState大於0,回憶一下,前面介紹tryRelease和signal中計算vs的方法,其中一步是與~INACTIVE,而INACTIVE是1<<31 //在前面的scan方法中已經遍歷一輪且未找到task又未出現競態未更改校驗和的狀況,會將scanState加上INACTIVE. //所以此處scanState忽然不小於0,說明是經歷過相似tryRelease或signal的釋放喚醒動做,退出循環等待. break; else if (spins > 0) { //2.當前未被活化,依舊處於INACTIVE態,則首先嚐試自旋.使用r這個隨機數來決定是否對自旋次數減1. r ^= r << 6; r ^= r >>> 21; r ^= r << 7; if (r >= 0 && --spins == 0) { // randomize spins //2.1自旋次數達到0時作了勾子操做. WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; if (pred != 0 && (ws = workQueues) != null && (j = pred & SMASK) < ws.length && (v = ws[j]) != null && // see if pred parking (v.parker == null || v.scanState >= 0)) //2.2自旋次數降到0時,若知足幾個條件: //當前隊列保存的棧下一個隊列的索引(pred)存在,線程池隊列非空,pred未溢出隊列數組, //取出pred對應的ws的隊列(它實際上是當前w在棧向棧底前進一個的元素,它存在說明當前w不是棧底. //若是該元素存在,且它沒有阻塞者或它還保持active,則重置自旋次數,繼續自旋. spins = SPINS; // continue spinning } } else if (w.qlock < 0) // recheck after spins //3.自旋結束後,再次檢查w的隊列鎖,看它是否是已經被終止了.(deregisterWorker或tryTerminate). return false; else if (!Thread.interrupted()) { //4.若是當前線程還未被擾動. //目前咱們只在一個地方看到過線程擾動的狀況:awaitRunStateLock,即當一個線程嘗試去修改池的運行時狀態,它會去獲取一個runState鎖, //獲取失敗,發生競態,也通過自旋等輔助策略無效的階段,則會嘗試使用stealCounter來看成鎖加鎖,unlock時也會在確認競態的狀況下去用它喚醒. //而在awaitRunStateLock中阻塞的線程若是正在進行stealCounter.wait時,wait操做被中斷,則會擾動當前線程,這將去除進入此分支的可能. //此外,tryTerminate自己也有擾動其餘工做線程的步驟.若是用戶不在相應的實現代碼(如ForkJoinTask的exec函數或CountedCompleter的compute函數) //中手動去擾動當前工做線程,能夠理解awaitRunStateLock的擾動事件可能與tryTerminate有關. long c, prevctl, parkTime, deadline; //計算新的活躍數,它是原ctl的前16位(負)加上並行度. int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK); if ((ac <= 0 && tryTerminate(false, false)) || (runState & STOP) != 0) // pool terminating //5.發現活躍數已降至0,嘗試調用tryTerminate,方法返回true代表已終止或正在終止;或發現runState已經進入終結程序. //這兩種狀況直接返回false,線程執行完畢終止. return false; if (ac <= 0 && ss == (int)c) { // is last waiter //6.前面分析scan方法時討論過,棧頂元素的scanState體如今ctl的最新後32位,它的stackPred則是ctl以前的後32位值. //進入6,說明當前worker是棧頂,即最後一個等待者. //用pred計算出以前的ctl. prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); //取ctl的17-32位,即worker總數. int t = (short)(c >>> TC_SHIFT); // shrink excess spares if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) //6.1若是發現線程總數大於2,將ctl回滾,返回false讓線程終止. return false; // else use timed wait //6.2計算deadLine和parkTime,用於後續的定時等待,暫不終結當前線程,而是做爲parker. //IDLE_TIMEOUT最開始說過,它就是起這個做用的一個時間單位,把gc時間也考慮在內,默認爲2秒. parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; } else //7.存在active的worker或當前w不是棧頂. prevctl = parkTime = deadline = 0L; //8.作線程停段的工做. Thread wt = Thread.currentThread(); //把當前線程池做爲parker設置給線程,當使用LockSupport.park時,它將被看成一個參數傳遞(參考Thread類註釋,在java方法簽名處看不出來). U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport //設置parker. w.parker = wt; if (w.scanState < 0 && ctl == c) // recheck before park //8.1從新檢查非active.合格則停頓. U.park(false, parkTime); //歸置. U.putOrderedObject(w, QPARKER, null); U.putObject(wt, PARKBLOCKER, null); if (w.scanState >= 0) //8.2停頓(或未停頓)重檢查發現w被從新active,則退出循環返回true(非false表明不能終結當前線程). break; if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, c, prevctl)) //8.3發現沒有時間了,ctl也未在等待的時間發生變化,將ctl設置爲w入棧前的結果,返回false讓終結此線程(相似出棧). return false; // shrink pool } } return true; }
awaitWork方法稍長一些,可是內容大可能是前面已經介紹過的字段,若是前面的有關方法和字段較熟悉,這一塊不難理解.
同時,它也驗證了我前面對runState,ctl和stackPred的猜想正確.
它的返回值也值得注意,從返回值來講,它的做用也不止是簡單的"等待操做",它返回false會形成線程的終結,而返回true時,runWorker方法會重開一輪,再一次嘗試獲取任務,
而返回true只能發生在兩個break(1和8.2)檢查scanState時,這說明w被活化.
接下來看一些與join有關的操做,這些操做大可能是由外部(工做線程以外,甚至線程池以外的線程)調用,也能由其餘類(非ForkJoinPool,如已經介紹過的ForkJoinTask和CountedCompleter)進行調用和調度.
//幫助完成,調用者能夠是一個池中的工做線程,也能夠是池外的.在JDK8版本中,有三處調用: //1.CountedCompleter::helpComplete,該方法的調用由咱們決定. //2.ForkJoinPool::awaitJoin,等待結果的同時能夠嘗試幫助完成,只由池中線程調用,傳入的隊列是該線程的隊列.該方法由ForkJoinTask的join/invoke/get調用. //3.ForkJoinPool::externalHelpComplete,用於外部線程操做,前面在CountedCompleter的文章已粗略介紹,傳入的w爲ws中用一個隨機數與n-1和0x007e取與運算 //的結果,很明顯,即便w不是null,也只能是一個偶數位的元素,這意味着w不會是registerWoker時生成的帶有工做線程的WorkQueue.也就是不能幫助池中線程完成本身的隊列. //本方法會嘗試從當前的計算目標以內偷取一個任務,它使用頂層算法的變種,限制偷出來的任務必須是給定任務的後代,可是也有一些細節要注意. //首先,它會嘗試從本身的工做隊列中找合格的任務(用前面講過的WorkQueue::popCC),若不能找到則掃描其餘隊列,當發生競態時隨機移動指針,依照校驗和機制決定是否放棄 //幫助執行(這取決於前面介紹的pollAndExecCC的返回碼).參數maxTasks是對外部使用的支持參數,內部調用它會傳入0,容許無界的次數(外部調用時,捕獲非法的非正數). //參數w,隊列,在內部調用的狀況下能夠理解爲當前線程的工做隊列,參數maxTasks若是非0,指代最大的可運行的其餘任務.退出時方法返回任務狀態. final int helpComplete(WorkQueue w, CountedCompleter<?> task, int maxTasks) { WorkQueue[] ws; int s = 0, m; //變量初始化和驗證,隊列和參數w必須非空才能進入if. if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && task != null && w != null) { //介紹popCC時曾專門強調過這個mode實際上是config. int mode = w.config; // for popCC int r = w.hint ^ w.top; // arbitrary seed for origin int origin = r & m; // first queue to scan //初始時賦h爲1,在每一輪循環中,它取1表明正在正常運行,大於1表明發生了競態,小於0將增長到校驗和,表明pollAndExecCC達到了根元素. //詳細參考前面論述過的pollAndExecCC. int h = 1; // 1:ran, >1:contended, <0:hash //初始化條件循環條件,記錄origin的值,初始化oldSum和checkSum for (int k = origin, oldSum = 0, checkSum = 0;;) { CountedCompleter<?> p; WorkQueue q; //1.傳入的任務已是完成的,break返回s(負). if ((s = task.status) < 0) break; //2.h未通過更改或經歷過若干次更改,但在上一輪循環表明了pollAndExecCC成功執行task(h取1),則 //在當輪循環嘗試對w進行popCC,並根據mode決定從base仍是top出隊. if (h == 1 && (p = w.popCC(task, mode)) != null) { //2.1本隊列有知足條件的任務,執行之. p.doExec(); // run local task if (maxTasks != 0 && --maxTasks == 0) //減小maxTask並在它降到0時break.(前提是傳入了正數的maxTasks). break; //2.2沒降到0,把origin和校驗和參數重設爲循環初始化的值. origin = k; // reset oldSum = checkSum = 0; } //3.某輪循環h表明出現競態等問題或不能使用popCC方式從本地隊列出任務執行.嘗試從其餘隊列poll執行. else { // poll other queues //3.1 找不出任務,h置0,這將使它在隨後的循環中不會再進入2 if ((q = ws[k]) == null) h = 0; //3.2嘗試從q的base處poll並執行task.返回-1表明不匹配,對校驗和增長h(負數). else if ((h = q.pollAndExecCC(task)) < 0) checkSum += h; //3.3 h大於0,多是等於1但popCC未成功的狀況.也多是pollAndExecCC成功了一次或cas失敗. if (h > 0) { if (h == 1 && maxTasks != 0 && --maxTasks == 0) //h是1減maxTask,當它達到0終止循環.(前提是沒傳了正數的maxTasks) break; //h不等於1,通常是poll時cas失敗,重置r,origin,checkSum等,開下一輪循環. r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift origin = k = r & m; // move and restart oldSum = checkSum = 0; } //3.4前面見過相似的代碼,發現已經轉完了一輪,校驗和未改變過(任何一個隊列都未進3.2/3.3,也就是查找任何一個下標ws[k]都是null),break. else if ((k = (k + 1) & m) == origin) { if (oldSum == (oldSum = checkSum)) break; //發現校驗和有變動,說明有一輪循環未進入3.1,再次循環. checkSum = 0; } } } } //返架退出循環是task的status. return s; }
顯然,task在整個help過程當中不會被執行,一旦某一輪循環發現task已經完成了,那麼當即結束循環.此方法可讓進行join/get等操做的線程幫助完成一些有關(子任務)的任務.
只要存在非空隊列,task未完成,當前線程未能幫助完成maxTask個任務(或初始就指定了0),當前線程就會一直循環去找任務,直到發現task完成了爲止.
簡單分析一下一輪循環的工做流程.
顯然首輪循環(不考慮一上來task的status就小於0的狀況)必然能從2進入,在若干輪後某一輪未能pop成功而進入了3,3中若干輪後poll成功,則h從新被置爲1,形成下一輪循環又能夠進入2的流程.
直到:某一輪task完成了;某一輪maxTask完成了(指定的狀況);某一輪再次發現h爲0且發現已經對ws全部隊列轉滿了一圈.
仍是再邏輯一下該方法的使用.
1.外部使用ForkJoinTask的get/join等方法時,引用到ForkJoinPool::externalHelpComplete,它調用helpComplete傳入的隊列必定是偶數索引的隊列.非工做線程維護的隊列;或引用到ForkJoinPool::awaitJoin,調用helpComplete傳入maxTasks是0,意味着可能循環到直到task完成爲止.
2.內部線程可能在CountedCompleter::helpComplete中使用此方法,這種狀況下,須要咱們在compute方法中進行調用.
接下來看helpStealer方法.
//字面意思:嘗試幫助一個"小偷". //本方法會嘗試定位到task的偷盜者,並嘗試執行偷盜者(可能偷盜者的偷盜者)的任務.它會追蹤currentSteal(前面runTask時提過,會將參數task置爲currentSteal)-> //currentJoin(當前隊列等待的任務,後面會介紹awaitJoin方法),這樣追尋一個線程在給定的task的後續工做,它會使用非空隊列偷回和執行任務.方法的第一次從等待join調用 //一般意味着scan搜索,由於joiner沒有什麼更適合作的,這種作法也是ok的.本方法會在worker中留下hint標識來加速後續的調用. //參數w表明caller的隊列,task是要join的任務. //方法共分三層循環,最外層是一個do-while循環,其內是兩個for循環. private void helpStealer(WorkQueue w, ForkJoinTask<?> task) { //初始化和進入if的條件,沒什麼可說的. WorkQueue[] ws = workQueues; int oldSum = 0, checkSum, m; if (ws != null && (m = ws.length - 1) >= 0 && w != null && task != null) { //循環一:方法最外層的while循環.它的條件是task未完成且校驗和未發生變化. do { // restart point //每次循環一的起點,校驗和重置爲0.用j保存w. checkSum = 0; // for stability check ForkJoinTask<?> subtask; WorkQueue j = w, v; // v is subtask stealer //循環二:外部for循環,subtask初始指向參數task,循環條件是subtask未完成. //在每次循環四中會校驗當前小偷的隊列是否空了,若是空了則換它的小偷繼續偷(交給subtask指向). descent: for (subtask = task; subtask.status >= 0; ) { //循環三:內部第一個for循環,初始化變量h,用hint加上奇數位,保證從奇數索引取隊列.k初始爲0,每次循環結束加2. for (int h = j.hint | 1, k = 0, i; ; k += 2) { //1.循環三內的邏輯. //1.1發現k已經遞增到大於最大索引m了,直接終止循環二,若發現task還未完成,校驗和也未更改,則進行上面的重置操做並從新開始循環二. if (k > m) // can't find stealer break descent; //1.2i位置標記爲h+k的結果與運算m,由於k每次增2,h又是奇數,故保證只取有線程主的隊列. if ((v = ws[i = (h + k) & m]) != null) { if (v.currentSteal == subtask) { //發現偷取currentSteal的worker v,將它的索引i交給j(初始爲w,在2內會更改成"等待隊列"的元素,在while循環中會重置爲w)的hint, //方便下一次再進入循環三時的查找.並終止循環三,進入循環四的判斷入口. j.hint = i; break; } //1.3存在非空v,可是v未偷取subtask,將v的base加給校驗和.這將影響到循環一的判真條件.顯然從循環三退出循環二,或後續循環四退出循環二 //將致使循環一也一併因while條件不滿而退出. checkSum += v.base; } } //循環四:迭代subtask的循環,它必須經循環三中的1.2走出. //2.到達循環四,必定已經在循環三中找到了一個v,此處會嘗試幫助v或者它的產生的"後裔". for (;;) { // help v or descend ForkJoinTask<?>[] a; int b; //2.1相似1.3的邏輯,校驗和增長v.base,初始化next,增長校驗和意味着,只要從循環四退出了循環二,則最外的循環一的while條件將不知足. checkSum += (b = v.base); //next取v的currentJoin. ForkJoinTask<?> next = v.currentJoin; if (subtask.status < 0 || j.currentJoin != subtask || v.currentSteal != subtask) // stale //2.2若是subtask已經是完成態,或發現競態等狀況形成數據已髒,如發現本輪循環中j的當前join已不是當前subtask, //或v的當前steal不是subtask,說明出現了髒數據,直接終止循環二,從新進入while循環重初始化jv. break descent; //2.3發現隊列v已空的邏輯. if (b - v.top >= 0 || (a = v.array) == null) { if ((subtask = next) == null) //2.3.1 v已空,且不存在next,即"等待隊列"已空,退出循環二,從新while斷定循環條件,重初始化jv. break descent; //2.3.2 還有next,將subtask指向next的同時,用v替換掉j.這是明顯的迭代語句. //在前面的代碼中能夠看出,循環一就是爲subtask找出小偷v的,關係是v.currentSteal=subtask.同時j.currentJoin=subtask. //由於next=v.currentJoin,將v賦給j後,仍舊知足j.currentJoin=next=subtask,此時break掉循環四,從新開啓循環二的新一輪 //正好對v進行從新初始化,而找到v的條件又是v.currentSteal=subtask,也即等於j.currentJoin. //此處break掉的循環四將致使循環二的下輪將在循環三處從新爲新的j找到v(v.currentSteal==subtask). j = v; break; } //2.4未進入2.3.1/2.3.2的狀況,顯然進入這二者會break到循環一或二. //取出base索引位置i和相應的任務元素. int i = (((a.length - 1) & b) << ASHIFT) + ABASE; ForkJoinTask<?> t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i)); //2.5,接2.4,判斷競態,v.base!=b說明已經被別的線程將base元素出隊.這種狀況下直接進入下一輪的循環二. if (v.base == b) { //2.5.1 取出任務t,發現空爲髒數據,從while循環從新初始化. if (t == null) // stale break descent; //2.5.2,將t出隊並進行後續流程. if (U.compareAndSwapObject(a, i, t, null)) { //2.5.3首先將v的base增1. v.base = b + 1; //2.5.4取出w(方法參數,當前worker)的currentSteal保存到ps. ForkJoinTask<?> ps = w.currentSteal; int top = w.top; do { //2.5.5此循環不和循環一至四一塊羅列,由於它本質上只是任務的出隊與執行. //首先會嘗試將w隊列的currentSteal置爲剛剛從v的任務數組中出隊的t U.putOrderedObject(w, QCURRENTSTEAL, t); //執行t.執行後順帶循環處理本身剛壓入隊列w的任務.執行後,也跳出當前while循環的狀況下會在下次從新判斷2.3, //非空繼續找base(i),爲空則迭代v爲next(2.3.2). t.doExec(); // clear local tasks too //2.5.6循環條件,只要參數task還未完成,w新壓入了任務,則依次嘗試從w中pop元素,和前面的t同樣按序執行(此處順帶執行本身的任務). } while (task.status >= 0 && w.top != top && (t = w.pop()) != null); //2.5.7偷了小偷v的base任務並執行成功,則恢復w的currentSteal. U.putOrderedObject(w, QCURRENTSTEAL, ps); if (w.base != w.top) //2.5.8偷完並執行完當前v的base任務或者某一輪的等待隊列上的元素v的base任務後,發現本身的隊列非空了,就再也不幫助對方,方法return. //能夠參考awaitJoin方法,由於helpStealer只在awaitJoin中調用,調用的前提就是w.base==w.top. //這顯然與2.5.6有所糾結(儘管一個判斷top,一個判斷top和base的相等),只要到了2.5.8,隊列非空將返回. return; // can't further help } //出隊失敗同2.5.1同樣,競態失敗從新循環二,但在下一輪循環中會在2.5.1break回while循環. } } } //最外層的while循環條件,task未完成,校驗和未發生更改. } while (task.status >= 0 && oldSum != (oldSum = checkSum)); } }
helpStealer方法不短,內容和信息也很多,但鑑於前面已經不停地滲透與它有關的"竊取","外部幫助"等概念,此處只再囉嗦一點細節,也解釋一些註釋中的疑惑.
1.回顧一下currentSteal,在scan方法中會把參數task設置爲自身隊列的currentSteal.並且runTask在每輪循環會先運行這個task,再運行隊列的本地任務,每輪循環都會更新它.
2.若是我創建了一個ForkJoinTask,並fork出若干子任務並join之,或者在外部ForkJoinTask::join等方式,至關於造成了一個"等待隊列",即任務之間彼此join,用currentJoin標識(這一塊在awaitJoin方法詳解).
3.僅有w.base==w.top時才能執行此方法,若是執行過程當中發生條件變化,則在執行完當前小偷v的某個任務後進行檢查會發現, 就會迴歸到本身的任務隊列.
4.同1,其實helpStealer方法至關於沿着currentJoin隊列進行幫助,首先找到本身w的小偷,幫他執行完剩下的任務,而後順着它們join的任務去執行.對於等待隊列的逐個迭代過程,依靠currentJoin和currentSteal二者的配置,經過currentJoin找到next,也即下一個subtask,再遍歷ForkJoinPool中的ws找到currentSteal是subtask的worker,如此迭代並重覆上面的全部過程.
5.玩笑式的聊一下這個"反竊取",甚至有點"賠償"的概念了,一個worker發現/或線程池外的線程去幫助偷了我任務的工做線程(worker,或WorkQueue,即ws的奇數索引位元素)從base處執行,直到執行乾淨,再找它currentJoin的任務所屬於的隊列,繼續這個完成過程,直到發現本身的任務被完成了爲止.可見,我發現你是我這個任務的小偷,我不但要偷你的所有身家,還要偷走偷了你的任務的小偷的所有身家,若是他也被偷過,那我再找他的小偷去偷,直到找回失主(我丟的任務)爲止.
由於一個任務的完成要先從出隊開始,所以不會出現兩次執行的狀況,能夠放心大膽的竊取.
上面的45是比較簡單的形象例子,但也不妨再加上一小段僞代碼.
好比我先建立了一個ForkJoinTask子類並new一個對象,在它的exec方法中我fork了多個任務,那麼當我去submit到一個ForkJoinPool後,我使用get方法去獲取結果,此時美妙的事情就發生了.
1.我提交它入池,它進入了隊列,並被一個工做線程(小偷)偷走.
2.它被偷走後,我纔去get結果,此時發現task還未結束,我須要等.可是我就這樣乾等嗎?不我要找小偷,我要報復.
3.開始報復,可是我只能偷他的錢庫(array),小偷對本身偷來的任務很是重視,它放在currentSteal裏面了,我偷不到,只好把他的錢庫偷光.
4.偷完他的錢庫,我發現個人task(失物)尚未完成,我仍是不能閒着,一不作二不休,我發現小偷也有join的任務,這個被join的任務不在他的隊列,也被其餘小偷偷走了,那麼我找到新的小偷,再偷光它的財產.
5.若是個人task仍是未執行完畢,我再找新的小偷;不然返回便可.或者我每偷走一個小偷的任務時,忽然發現個人倉庫提交了新任務,那我就不能再去偷了.
這是外部線程的執行結果.但若是幫助者自己是一個工做線程,那麼流程也類似,讀者自行捊順吧.
//字面意思:嘗試補償. //方法會嘗試減小活躍數(有時是隱式的)並可能會因阻塞釋放或建立一個補償worker. //在出現競態,發現髒數據,不穩定,終止的狀況下返回false,並可重試.參數w表明調用者. //方法實現比較簡單,爲簡單的if else模式,只有一個分支能夠執行. private boolean tryCompensate(WorkQueue w) { //canBlock爲返回值. boolean canBlock; WorkQueue[] ws; long c; int m, pc, sp; //1.發現調用者終止了,線程池隊列數組爲空,或者禁用了並行度,則返回false. if (w == null || w.qlock < 0 || // caller terminating (ws = workQueues) == null || (m = ws.length - 1) <= 0 || (pc = config & SMASK) == 0) // parallelism disabled canBlock = false; //2.發現當前ctl表示有worker正等待任務(空閒,位於scan),則嘗試釋放它,讓它回來工做. else if ((sp = (int)(c = ctl)) != 0) // release idle worker canBlock = tryRelease(c, ws[sp & m], 0L); else { //3.當前全部worker都在忙碌. //3.1計算活躍數,總數,計算方法前面已經論述屢次. int ac = (int)(c >> AC_SHIFT) + pc; int tc = (short)(c >> TC_SHIFT) + pc; //記錄nbusy,註釋表示用於驗證飽合度. int nbusy = 0; // validate saturation for (int i = 0; i <= m; ++i) { // two passes of odd indices WorkQueue v; //3.2nbusy的計算方法,遍歷線程池的隊列數組(每次增1),驗證則以1-3-5這個順序開始,發現有處於SCANNING態的,就停掉循環,不然加1. if ((v = ws[((i << 1) | 1) & m]) != null) { if ((v.scanState & SCANNING) != 0) break; ++nbusy; } } //3.3若是非穩態(飽合度不是tc的2倍),或者ctl髒讀,則返回false. if (nbusy != (tc << 1) || ctl != c) canBlock = false; // unstable or stale else if (tc >= pc && ac > 1 && w.isEmpty()) { //3.4處於穩態且ctl還有效,總worker數大於並行度且活躍數大於1並且當前w又是空的.嘗試將ctl減去一個活躍位. long nc = ((AC_MASK & (c - AC_UNIT)) | (~AC_MASK & c)); // uncompensated 反補償,初看莫名其妙,調用者會在以後增長ac. //返回值爲cas是否成功. canBlock = U.compareAndSwapLong(this, CTL, c, nc); } else if (tc >= MAX_CAP || (this == common && tc >= pc + commonMaxSpares)) //3.5普通ForkJoinPool,總worker數達到MAX_CAP,或common池,總worker數量達到並行度+commonMaxSpares(默認256),拋出拒絕異常. throw new RejectedExecutionException( "Thread limit exceeded replacing blocked worker"); else { // similar to tryAddWorker boolean add = false; int rs; // CAS within lock //3.6.計算新的ctl,增長一個總worker數. long nc = ((AC_MASK & c) | (TC_MASK & (c + TC_UNIT))); //加運行狀態鎖,池未進入終止態的狀況下,進行cas,隨後解鎖. if (((rs = lockRunState()) & STOP) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); //cas成功,則建立worker canBlock = add && createWorker(); // throws on exception } } return canBlock; }
此方法相對簡單許多,只是根據不一樣的當前線程池和參數隊列的狀態進行不一樣的操做.
1.調用者終止或線程池空態,返回false結束.
2.發現當前有worker正在空閒(阻塞等待新任務),釋放等待棧頂(前面已經論述並欣賞過這種奇怪的"棧").
3.線程池未進入穩態或者進入時讀取的ctl失效,返回false.
4.存在活躍worker且總worker數大於tc,調用者隊列實際又是空的,則減去一個活躍位.
5.總線程數超限,拋出異常.
6.其餘狀況,增長一個總worker數並建立worker.
//前面提過它不少次了,awaitJoin方法會在指定任務完成或者超時前嘗試幫助或阻塞自身. //參數w表明調用者,task爲目標任務,參數deadline是超時目標(非0).它會返回退出時的任務狀態. final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { //返回值. int s = 0; if (task != null && w != null) { //1.若未進入if必然返回0,進入條件是提供了task和w. //保存currentJoin ForkJoinTask<?> prevJoin = w.currentJoin; //將w的currentJoin暫時設置爲task. U.putOrderedObject(w, QCURRENTJOIN, task); //若是task是CountedCompleter類型,轉化並存放到cc. CountedCompleter<?> cc = (task instanceof CountedCompleter) ? (CountedCompleter<?>)task : null; //2.循環. for (;;) { if ((s = task.status) < 0) //2.1目標task已完成,返回task的status. break; if (cc != null) //2.2目標task是CountedCompleter,調用前面介紹過的helpComplete方法,maxTasks不限(0). helpComplete(w, cc, 0); //2.3不然發現隊列w已空,或者非空,則嘗試從w中移除並執行task,若出現隊列w是空且任務不知道是否完成的狀況(t.doExec只是執行,不等結果), //此處也會拿到一個true,則調用前面介紹過的helpStealer去幫助小偷. else if (w.base == w.top || w.tryRemoveAndExec(task)) helpStealer(w, task); //2.4.幫助須要時間,double check,同2.1. if ((s = task.status) < 0) break; //2.5計算deadline有關的停頓時間ms. long ms, ns; if (deadline == 0L) ms = 0L;//未指定deadline,ms爲0 else if ((ns = deadline - System.nanoTime()) <= 0L) break;//要指定的deadline已經早於當前時間了,break返回上面的status else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) ms = 1L;//用上面的ns計算ms發現負數,重置ms爲1 //2.6,調用上面提到過的tryCompensate方法,傳入當前worker,若是獲得true的返回值,等待超時, //超時結束增長一個活躍位(前面提到tryCompensate方法最後加增長tc並建立worker,不增長ac,或者莫名其妙地減去了一個ac). if (tryCompensate(w)) { task.internalWait(ms); U.getAndAddLong(this, CTL, AC_UNIT); } } //3.最後恢復原來的currentJoin. U.putOrderedObject(w, QCURRENTJOIN, prevJoin); } return s; }
await方法只會在2.1,2.4,2.5三處結束,前兩處爲發現task結束,後一處是超時.返回的結果必定是返回時task的狀態.
接下來看一些專門針對掃描的方法.
//簡單的方法,嘗試找一個非空的偷盜隊列.使用相似簡化的scan的方式查取,可能返回null. //若是調用者想要嘗試使用隊列,必須在獲得空後屢次嘗試. private WorkQueue findNonEmptyStealQueue() { WorkQueue[] ws; int m; //隨機數r int r = ThreadLocalRandom.nextSecondarySeed(); //線程池不具有隊列,直接返回null. if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { //循環開始. for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; int b; if ((q = ws[k]) != null) { if ((b = q.base) - q.top < 0) //查到q這個WorkQueue,而且q非空,則將q返回. return q; //查到了空隊列q,則校驗和加上q的base. checkSum += b; } //前面屢次見過的判斷進入第二輪的辦法.發現ws從頭至尾都是null則break返回null,不然出現非null的空隊列則將校驗和置0繼續循環. if ((k = (k + 1) & m) == origin) { if (oldSum == (oldSum = checkSum)) break; checkSum = 0; } } } return null; } //運行任務,直到isQuiescent,本方法順帶維護ctl中的活躍數,可是全過程不會在任務不能找到的狀況下 //進行阻塞,而是進行從新掃描,直到全部其餘worker的隊列中都不能找出任務爲止. final void helpQuiescePool(WorkQueue w) { //保存當前偷取的任務. ForkJoinTask<?> ps = w.currentSteal; // save context //循環開始,active置true. for (boolean active = true;;) { long c; WorkQueue q; ForkJoinTask<?> t; int b; //1.先把本地任務執行完畢(每次循環掃描). w.execLocalTasks(); //2.查找到非空隊列的狀況,除了2之外的34在cas成功的狀況下都會終止循環. if ((q = findNonEmptyStealQueue()) != null) { //2.1通過3中成功減小了活躍數的狀況,下一次循環又掃描到了新的非空隊列,須要重激活. if (!active) { active = true; //活躍數從新加1. U.getAndAddLong(this, CTL, AC_UNIT); } //2.2再次判斷隊列非空,並從隊列內部數組的base起取出task if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { //將task置於currentSteal U.putOrderedObject(w, QCURRENTSTEAL, t); //執行task t.doExec(); //若是w的偷取任務數溢出,轉到池中. if (++w.nsteals < 0) w.transferStealCount(this); } } //3.仍舊保持active的狀況. else if (active) { //3.1能進到這裏,確定本輪循環未能進入2,說明未能發現非空隊列,計算新的ctl即nc,它是原ctl減去一個活躍單位. long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c); if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0) //3.2新的活躍數加上並行度還不大於0,即不能溢出,說明沒有活躍數,不進行cas了,直接break. //很明顯,上面計算nc的方法,首先ctl正常自己是負,若上面表達式爲正,惟一的解釋是線程池有活躍線程(前面講過,活躍一個加一個活躍單元,直到並行度爲止) //由於兩個表達式分別是前16位(在前面再補上16個1)和後16位求和. break; //3.3未能從3.2退出,說明nc表示當前有活躍數存在,進行cas,成功後active置false,不退出循環. //若下輪循環發現新的非空隊列,會在2.1處增長回來.若未能發現,會在4處加回來. if (U.compareAndSwapLong(this, CTL, c, nc)) active = false; } //4.前一輪循環進了3.3,當前循環未能進入2.1的狀況,判斷當前ctl活躍數加上並行度是非正,說明再建立並行度個數的worker也不能溢出.則再加回一個活躍數. else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 && U.compareAndSwapLong(this, CTL, c, c + AC_UNIT)) break; } //5.從新恢復currentSteal U.putOrderedObject(w, QCURRENTSTEAL, ps); } //獲取並移除一個本地或偷來的任務. final ForkJoinTask<?> nextTaskFor(WorkQueue w) { for (ForkJoinTask<?> t;;) { WorkQueue q; int b; //首先嚐試nextLocalTask本地任務. if ((t = w.nextLocalTask()) != null) return t; //獲取不到本地任務,嘗試從其餘隊列獲取非空隊列,獲取不到非空隊列,返回null. if ((q = findNonEmptyStealQueue()) == null) return null; //獲取到了非空隊列,從base處取任務,非空則返回,爲空則重複循環. if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) return t; } }
上面是一些針對掃描的方法,有前面的基礎,理解實現並不困難,再也不綴述.
接下來關注與終止有關的函數.
//前面不止一次提到過tryTerminate,說過它會嘗試終止或完成終止. //參數now若是設置爲true,則表示在runState進入SHUTDOWN關閉態(負)時無條件終止,不然須要在進入SHUTDOWN同時沒有work也沒有活躍worker的狀況下終止. //若是設置enable爲true,則下次調用時runState爲負,可直接進入關閉流程(若是有now爲true,則當即關). //若是當前線程池進入終止流程或已終止,返回true. private boolean tryTerminate(boolean now, boolean enable) { int rs; //common池不可關. if (this == common) // cannot shut down return false; //1.runState攔截和處理. if ((rs = runState) >= 0) { if (!enable) //1.1對於當前runState非負的狀況,若是沒有指定enable,返回false. return false; //1.2若是指定了enable,將加運行狀態鎖並更新runState的首位爲1,即runState下次進入時爲負.再也不進入1的攔截處理流程. rs = lockRunState(); unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN); } //2.終止的處理流程. if ((rs & STOP) == 0) { //2.1.沒有指定即刻關閉,檢查是否線程池已進入靜寂態. if (!now) { //循環重複直到穩態.初始化校驗和機制. for (long oldSum = 0L;;) { WorkQueue[] ws; WorkQueue w; int m, b; long c; long checkSum = ctl;//校驗和取ctl if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0) //2.1.1當前線程池還有活躍的worker(前面解釋過).此時應返回false return false; if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) //2.1.2線程池已經沒有隊列,直接break進入後續流程. break; //2.1.3從0開始遍歷到ws的最後一個隊列. for (int i = 0; i <= m; ++i) { if ((w = ws[i]) != null) { if ((b = w.base) != w.top || w.scanState >= 0 || w.currentSteal != null) { //只要發現任何一個隊列非空,或隊列未進入非活躍態(負)或隊列仍有偷來的任務未完成. //嘗試釋放棧頂worker並增長一個活躍數.並返回false,能夠據此從新檢查. tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); return false; } //隊列非null可是空隊列,給校驗和增長base. checkSum += b; if ((i & 1) == 0) //發現非worker的隊列,直接讓外部禁用. w.qlock = -1; } } //2.1.4校驗和一輪不變,break掉進入後置流程.即2.1.3中每一次取ws[i]都是null. if (oldSum == (oldSum = checkSum)) break; } } //2.2.到這一步,已經保證了全部的關閉條件.若尚未給運行狀態鎖加上stop標記, //則給它加上標記.此時再有其餘線程去嘗試關閉,會進不來2這個分支. if ((runState & STOP) == 0) { rs = lockRunState(); // enter STOP phase unlockRunState(rs, (rs & ~RSLOCK) | STOP); } } //3.通過前面的階段,已完成預處理或now檢查,可進入後置流程. int pass = 0; // 3 passes to help terminate for (long oldSum = 0L;;) { // or until done or stable WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m; long checkSum = ctl; //3.1前面解釋過這個狀態表示當前無活躍worker. if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 || (ws = workQueues) == null || (m = ws.length - 1) <= 0) { if ((runState & TERMINATED) == 0) { //在確保無worker活躍的狀況,直接將線程池置爲TERMINATED.並喚醒全部等待終結的線程. rs = lockRunState(); unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED); synchronized (this) { notifyAll(); } } //到此必定是終結態了,退出循環,結束方法返回true. break; } //3.2內循環處理存在活躍worker的狀況.從第一個隊列開始遍歷. for (int i = 0; i <= m; ++i) { if ((w = ws[i]) != null) { //3.2.1對每一個非null隊列,增長一次校驗和並禁用隊列. checkSum += w.base; w.qlock = -1; if (pass > 0) { //3.2.2內循環初次pass爲0不能進入. //pass大於0,取消隊列上的全部任務,清理隊列. w.cancelAll(); if (pass > 1 && (wt = w.owner) != null) { //3.2.3 pass大於1而且隊列當前存在owner,擾動它. if (!wt.isInterrupted()) { try { wt.interrupt(); } catch (Throwable ignore) { } } if (w.scanState < 0) //3.2.4若是w表明的worker正在等待任務,讓它取消停頓,進入結束流程. U.unpark(wt); } } } } //3.3若是校驗和在幾輪(最大爲3或m的最大值)循環中改變過,說明並未進入穩態.將oldSum賦值爲新的checkSum並重置pass爲0. if (checkSum != oldSum) { oldSum = checkSum; pass = 0; } //3.4pass從未被歸置爲0,穩態增長到大於3且大於m的狀況,不能再幫助了,退出循環返回true. else if (pass > 3 && pass > m) break; //3.5pass未到臨界值,加1. else if (++pass > 1) { long c; int j = 0, sp; //每一次進入3.5都會執行一次循環.若是ctl表示有worker正在scan,最多m次嘗試release掉棧頂worker. //由於最多隻有m個worker在棧中阻塞.所以3.4是合理的. while (j++ <= m && (sp = (int)(c = ctl)) != 0) tryRelease(c, ws[sp & m], AC_UNIT); } } return true; }
tryTerminate方法的實現並不複雜,不過這裏有一點須要注意的地方:從方法中返回true,至少能夠理解爲進入了終止流程,但不必定表明已終止(即便是now的狀況),由於僅看方法的後半點,返回true時,線程池必定已經進入stop(從3.4break),或完成了terminated(從3.1break).
顯然,線程池的關閉必然會先經歷STOP,而後再TERMINATED,故前面全部的使用線程池的方法都是直接先判斷stop,由於若是線程池terminated了,那麼必定先stop.一樣,還有一個shutdown標記位來標記runState是否已進入負值,它小於0時(SHUTDOWN是最高位),則不能再接收新的任務.
其實從調用者能夠看出來它的幾種執行狀況.
顯然,在對線程進行解除註冊時,等待任務時和提交任務時,now和enable均會傳入false,若是沒有其餘地方提交調用了shutdown將runState的首位置1,這三個方法沒法經過註釋(1)處的代碼攔截.
shutdown會用enable的方式,將當前還沒有將runState置負的狀態置負,使得下一次調用deregisterWorker,awaitWork,externalSubmit,shutdown四個方法均能走後置的邏輯.
shutdownNow則兩個參數均會置true,會走完上面的全部邏輯.
下面來看externalSubmit等外部操做的方法.
//字面意思,從外面提交一任務入池.有前面的基礎後,此方法很容易理解. //此方法會處理一些不常見的case,好比輔助進行池的一些初始化過程(首次提交任務), //若是發現是首次外部線程提交任務,在ws的目標索引位置爲空或者出現競態,它會嘗試建立新的共享隊列. //參數task是目標任務,調用者必須保證非空. private void externalSubmit(ForkJoinTask<?> task) { int r; //初始化一個用於定位的隨機數r,前面曾簡單介紹過它和localInit,許多公司的分佈式id也是有它的成份. //而這個隨機數與線程至關於綁定在了一塊兒,所以,能夠使用它表示一個線程特有的東西. if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } //循環嘗試壓入任務. for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; if ((rs = runState) < 0) { //1.發現此時線程池的運行狀態已經進入SHUTDOWN,幫助終止線程池,並拋出拒絕異常. tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } //2.發現線程池還未初始化,輔助初始化.STARTED爲第三位. else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; //加鎖. rs = lockRunState(); try { //double check並嘗試初臺化stealCounter.它在awaitRunStateLock(嘗試加鎖)的時候會用來wait, //同時也處理了初始化階段的競態,還記得在awaitRunStateLock方法中發現stealCounter爲null時的註釋(初始化競態)嗎? if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); //建立workQueues數組,大小是2的冪.並保證至少有兩個插槽(語義理解,若是是4個的話,兩個share兩個獨有). int p = config & SMASK; //下面這個比較好理解,總之,最小的狀況下,n也會是(1+1)<<1=4,這樣保證有兩個位置給SHARE兩個位置給工做線程. //n的初始值取決於並行度. int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n]; //新的運行狀態. ns = STARTED; } } finally { //完成了輔助初始化,則解鎖,並置runState加上STARTED標識. unlockRunState(rs, (rs & ~RSLOCK) | ns); } } //3.某輪循環發現早已完成初始化,使用本線程的隨機數r計算索引,發現ws[k]存在.說明已被別的線程在此初始化了一個隊列. //注意索引k的值的計算,它與m進行與運算,保證不大於m,同時與SQMASK,即share-queue mask,它的值是0X007e,前面說過, //很明顯,它是整數的2至7位,保證了共享隊列只能放在ws的偶數位. else if ((q = ws[k = r & m & SQMASK]) != null) { //3.1對隊列加鎖. if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //取隊列的數組和top ForkJoinTask<?>[] a = q.array; int s = q.top; //初始化提交或者擴容. boolean submitted = false; try { // locked version of push //||左邊的語句指符合添加元素的條件,右邊表示若是不符合添加條件,則進行擴容. if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { //符合添加條件或擴容成功,取top對應的索引j. int j = (((a.length - 1) & s) << ASHIFT) + ABASE; //向top放入task. U.putOrderedObject(a, j, task); //給top加1. U.putOrderedInt(q, QTOP, s + 1); //標記爲已提交. submitted = true; } } finally { //釋放qlock. U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { //若是提交成功,則嘗試喚醒top或建立一個worker(若是太少).並返回. signalWork(ws, q); return; } } //競態失敗,標記move move = true; // move on failure } //3.2計算出的位置沒有queue,且runState未鎖,建立一個新的. else if (((rs = runState) & RSLOCK) == 0) { // create new queue //共享隊列沒有owner. q = new WorkQueue(this, null); //隨機數就用線程的隨機數r. q.hint = r; //config的第32位置1表示共享隊列 q.config = k | SHARED_QUEUE; //隊列的scanState直接置爲INACTIVE,很明顯,參考前面的描述, //它沒有工做線程,也不會參與活化和scan阻塞的過程,也不會將本身的scanState壓入ctl後32位作棧元素. q.scanState = INACTIVE; //加鎖. rs = lockRunState(); if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) //仍舊符合添加條件,池未終結,將q賦給ws[k],不然的話,可能在下一輪循環進入1幫助終止, //也可能進入2用現成的隊列內的任務數組添加元素到top.也可能在4處發現競態,並最終致使5處重初始化r並從新循環找索引. ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } //4.標記繁忙. else move = true; // move if busy //5.本輪循環經歷2的競態失敗或4的繁忙,從新初始化一個r供下輪循環使用. if (move) r = ThreadLocalRandom.advanceProbe(r); } }
這個方法的邏輯相對簡單,用到的方法和字段基本都是前面說過的.
它的最終結果只有兩個:
1.任務提交入池,並喚醒正在scan的棧頂worker或建立一個新的worker(空閒太多).
2.終止了線程池並拋出拒絕異常.
看一看有關的幾個簡短方法.
//嘗試將給定的task添加到一個提交者當前的隊列中,若是還須要額外的初始化操做等,使用上面的externalSubmit. //咱們知道,絕大多數的狀況下,不須要初始化線程池的任務數組(整個線程池就一次),不須要初始化一個工做隊列(每一個ws一個位置只一次). //所以它至關於先嚐試用最簡單直接的辦法將任務壓入隊列,若是ws存在而隊列須要初始化,或者池自己就沒有完成初始化,再使用externalSubmit. //參數task是要提交的任務,調用者自己必須保證它非空. final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; //老辦法,初始的隨機數.運行狀態. int r = ThreadLocalRandom.getProbe(); int rs = runState; //快速壓入的代碼分支,條件是隊列數組ws已分配,非空,且根據r計算出來索引位取出的隊列 //存在且已完成初始化,線程池未進入SHUTDOWN,而且可以對隊列進行加鎖. if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //快速入隊. ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && //第二個條件是沒有到擴容條件. (am = a.length - 1) > (n = (s = q.top) - q.base)) { //計算出top的索引j,並將當前任務放入,將top加1 int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putIntVolatile(q, QLOCK, 0); if (n <= 1) //發現原來的隊列長度很短,有可能有worker正在scan,嘗試喚醒一個worker或添加一個worker signalWork(ws, q); //只要成功壓入,返回. return; } //最後解鎖. U.compareAndSwapInt(q, QLOCK, 1, 0); } //未能成功壓棧,緣由多是線程池未初始化,工做隊列未初始化,隊列達到擴容閾值等.使用externalSubmit進行. externalSubmit(task); } //嘗試彈出外部提交者的任務,找到隊列,非空時加鎖,最後調整top,每次進行都會檢查失敗,儘管不多失敗. //在前面ForkJoinTask和CountedCompleter等文章中曾引用過相關方法,此方法能夠令等待任務的線程 //自行將任務出隊並執行,而不是在池內線程還忙碌的狀況下乾等.可是該隊列可能被其餘外部線程放置了新的棧頂 //且看內部方法實現,當且僅當task是棧頂纔有用. final boolean tryExternalUnpush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue w; ForkJoinTask<?>[] a; int m, s; //當前線程生成隨機數r. int r = ThreadLocalRandom.getProbe(); if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && (w = ws[m & r & SQMASK]) != null && (a = w.array) != null && (s = w.top) != w.base) { //進入if的條件.線程池已初始化,ws存在,w存在且w非空隊列.注意,仍舊取的偶數索引. //計算當前最頂部元素的索引j long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; //嘗試加鎖qlock,加鎖成功進入. if (U.compareAndSwapInt(w, QLOCK, 0, 1)) { //進一步check隊列w的top和w的array未變. if (w.top == s && w.array == a && //隊列w的頂部元素就是參數task U.getObject(a, j) == task && //成功將task出隊 U.compareAndSwapObject(a, j, task, null)) { //將top減1並釋放鎖,返回true. U.putOrderedInt(w, QTOP, s - 1); U.putOrderedInt(w, QLOCK, 0); return true; } //加鎖前已有更改或者task自己就不是頂部任務,直接解鎖.返回false. U.compareAndSwapInt(w, QLOCK, 1, 0); } } //默認返回fasle return false; } //外部提交者helpComplete.介紹CountedCompleter提過此方法. //當目標任務task是CountedCompleter類型時能夠手動調用CountedCompleter::helpComplete,它會調用此處,ForkJoinTask::get也有調用. //此方法能夠令外部線程在等待task時幫助completer棧鏈上它的子孫任務完成,從而加速task的完成. final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { WorkQueue[] ws; int n; int r = ThreadLocalRandom.getProbe(); return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 : //ws未初始化,返回0,不然返回helpComplete的結果,取w的方式不變. helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks); }
終於能夠看對外公有的api了,咱們使用ForkJoinPool的公有方法:
//invoke方法會嘗試將task壓入池,但也會當即join等待,壓入池的方法即前面介紹過的externalPush,一樣join方法也可能會 //致使當前線程自身完成了任務(池中工做線程忙碌而當前線程當即從隊列中獲取了該任務). //執行結束後返回該任務的執行結果,當出現異常時,直接從新拋出.但也可能拋出拒絕異常(拒絕入隊). public <T> T invoke(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task.join(); } //安排給定任務的執行,異步進行. public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); externalPush(task); } //繼承自AbstractExecutorService的方法列表 //execute方法,傳入runnable,使用前面文章介紹的ForkJoinTask.RunnableExecuteAction適配器. public void execute(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.RunnableExecuteAction(task); externalPush(job); } //submit一個task,返回它自己. public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } //對callable的適配,前面也提過. public <T> ForkJoinTask<T> submit(Callable<T> task) { ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task); externalPush(job); return job; } //對task和result的適配. public <T> ForkJoinTask<T> submit(Runnable task, T result) { ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result); externalPush(job); return job; } //對submit一個Runnable的適配.避免重複包裝.由於ForkJoinTask也能夠實現runnable. //典型的場景,先submit一個runnable,獲得返回的job,再將job給submit進去. public ForkJoinTask<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.AdaptedRunnableAction(task); externalPush(job); return job; } //執行全部任務.同樣先入隊再執行,可能出現本外部線程又偷回來執行的狀況. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); //標記是否有異常. boolean done = false; try { for (Callable<T> t : tasks) { ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); //把全部任務包成適配器並加入futures列表. futures.add(f); //壓入池. externalPush(f); } for (int i = 0, size = futures.size(); i < size; i++) //對每個任務進行靜默等待. ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); //上面的循環成功退出,置true返回. done = true; return futures; } finally { if (!done) //發現是異常退出,則依次取消任務. for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(false); } }
截止到此,ForkJoinPool中的主體難點方法已所有介紹完畢,下面選看一些周邊的有助於理解的簡單方法.
//估計當前線程池中正在運行(偷任務或運行任務)的線程,也就是未阻塞等待任務的線程.它會過分估計正在運行的線程數. public int getRunningThreadCount() { int rc = 0; WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 1; i < ws.length; i += 2) { if ((w = ws[i]) != null && w.isApparentlyUnblocked()) //只取ws奇數索引的worker,只要它isApparentlyUnblocked,即未進入waiting,blocking,wating_timed. ++rc; } } return rc; } //估計當前正在進行偷取或執行任務的線程(未阻塞等待任務),此方法也會過分估計. public int getActiveThreadCount() { //很明顯,根據前面咱們研究了很久的邏輯,每release/signal的worker都會增長一個活躍數單元, //初始添加的worker也會增長一個活躍數單元和總數,顯然只要有active的,那麼r必然是一個溢出的正數. int r = (config & SMASK) + (int)(ctl >> AC_SHIFT); return (r <= 0) ? 0 : r; //忽略負值. } //判斷線程池此刻是否已經進入靜寂態,所謂的靜寂態是指當前線程池中全部worker都已經阻塞在等待任務了, //由於沒有任何任務可供他們偷取或執行,也沒有任何掛起的提交入池的任務.此方法相對保守,並非全部線程都空閒的狀況下 //當即會返回true,只有在他們減小了活躍數以後.(也就是保持空閒一段時間) public boolean isQuiescent() { //前面分析過,顯然這個表達式不大於0即爲不溢出的狀況,回憶前面關於scan時,終止時等的下降活躍數. return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0; }
與關閉有關的方法.
//此方法的執行參數,注意,now傳false,enable傳true. //它的執行結果很簡單(能夠參考前面的tryTerminate). //1.此前已經調用過tryTerminate並enable,或者調過shutdown,那會致使一次終結. //2.初次調用,前面提交過的任務繼續執行,但不會接受新的任務(由於runState首位置1了). //3.commonPool不準關. //4.已關的,調用了也沒什麼效果.但第二次調用時,已經在過程當中的任務可能受此影響取消. public void shutdown() { checkPermission(); tryTerminate(false, true); } //它會嘗試當即取消和中止全部的任務,拒絕後續提交的任務.若是是common池則無效果. //若是已經關閉,再調用無影響.正在被提交入池或正在執行的任務(在調用此方法執行時)可能會被取消,也可能不會(取決於時機,可能早於取消過程而完成執行). //它會取消掉已存在的任務或未執行的任務.方法總會返回一個空的list.(與其餘executor不一樣) public List<Runnable> shutdownNow() { checkPermission(); tryTerminate(true, true); return Collections.emptyList(); } //很好理解,runState的31位是1,而僅有在shutdown方法中全部worker都已閒置或ws爲空纔會加上此位.顯然此時全部任務都已完成. public boolean isTerminated() { return (runState & TERMINATED) != 0; }
//正在關閉中,必定有STOP標記位,沒有TERMINATED位.
public boolean isTerminating() { int rs = runState; return (rs & STOP) != 0 && (rs & TERMINATED) == 0; } //已SHUTDOWN,首位標記,顯然只要shutdown方法調用並傳enable爲true必定會有此結果. public boolean isShutdown() { return (runState & SHUTDOWN) != 0; } //等待一個shutdown請求後全部的任務完成或者發生超時,或者當前線程被擾動(第一優先級). //由於common池永遠不會隨程序調用shutdown而終止,所以使用commonPool調用此方法時, //會直接等效於awaitQuiescence,並且永遠會返回false. //返回true,表明當前線程池終止了,false表明超時了. public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { if (Thread.interrupted()) //1.當前線程中斷,拋出異常. throw new InterruptedException(); if (this == common) { //2.common池等效於awaitQuiescence並返回false. awaitQuiescence(timeout, unit); return false; } long nanos = unit.toNanos(timeout); if (isTerminated()) //3.發現全部任務已完成返回true. return true; if (nanos <= 0L) //4.已超時返回false. return false; //5.計算deadline並進入循環等待邏輯. long deadline = System.nanoTime() + nanos; synchronized (this) { for (;;) { if (isTerminated()) //5.1循環中發現已達到完成態,返回true. return true; if (nanos <= 0L) //5.2循環時發現超時,false. return false; //5.3循環時減小時間並等待. long millis = TimeUnit.NANOSECONDS.toMillis(nanos); wait(millis > 0L ? millis : 1L); nanos = deadline - System.nanoTime(); } } } //等待靜寂.若是當前線程是池內線程,等效於ForkJoinTask::helpQuiesce方法,不然只是等待. public boolean awaitQuiescence(long timeout, TimeUnit unit) { long nanos = unit.toNanos(timeout); ForkJoinWorkerThread wt; Thread thread = Thread.currentThread(); if ((thread instanceof ForkJoinWorkerThread) && (wt = (ForkJoinWorkerThread)thread).pool == this) { //1.線程是ForkJoinWorkerThread,幫助靜寂.返回true. helpQuiescePool(wt.workQueue); return true; } //2.不是池內線程,準備計時,初始化若干變量. long startTime = System.nanoTime(); WorkQueue[] ws; int r = 0, m; boolean found = true;//表明發現任務. //3.循環等待靜寂或超時. while (!isQuiescent() && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { //3.1有趣的地方,只有本輪沒找到任務纔會進行超時判斷. if (!found) { //3.1.1判斷超時了,返回false. if ((System.nanoTime() - startTime) > nanos) return false; //3.1.2沒超時,放棄執行權一段時間,不能阻塞在此. Thread.yield(); // cannot block } //改成false. found = false; //4.內循環從數組中間開始,一直遞減到0. for (int j = (m + 1) << 2; j >= 0; --j) { ForkJoinTask<?> t; WorkQueue q; int b, k; //4.1取隊列從0開始,只要取出了ws的非空隊列成員,進入邏輯. if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null && (b = q.base) - q.top < 0) { //4.2 found標記true found = true; if ((t = q.pollAt(b)) != null) //4.3嘗試從底部取出任務並執行.至關於幫助靜寂 t.doExec(); //進入4.1,即break掉內循環,可能湊巧,執行完一個任務就靜寂了. break; } } } //能從while循環break出來或者循環條件爲假退出,說明達到靜寂. return true; }
上面的代碼自己沒有什麼問題,可是已經涉及到了外部api,ForkJoinTask::helpQuiesce,此api是由咱們決定調用時機的,顯然,咱們能夠在任何一個入池的ForkJoinTask中執行此方法來幫助ForkJoinPool進入靜寂態,幫助執行全部待執行的任務,參考helpQuiescePool方法(會先執行本地任務,再偷其餘隊的任務執行).
到此,源碼只剩下一個blocker了.
//MangedBlocker接口.它是一個爲運行在ForkJoinPool中的任務維護並行度的接口 //咱們能夠經過拓展它來實如今ForkJoinPool中運行的任務的並行度管理.它只有兩個方法. //isReleasable方法會在沒有必要阻塞時必定返回true,block方法會在必要時阻塞當前線程, //它內部能夠調用isReleasable.而這個調度須要使用ForkJoinPool#managedBlock(ManagedBlocker) //它會嘗試去調度,避免長期的阻塞,它容許更靈活的內部處理. public static interface ManagedBlocker { //可能會阻塞一個線程,好比等待監視器,當返回true時表示認爲當前沒有必要繼續block. boolean block() throws InterruptedException; //返回true表示認爲沒有必要block. boolean isReleasable(); } //運行給定的阻塞任務,當在ForkJoinPool運行ForkJoinTask時,此方法在當前線程阻塞的狀況下(調用blocker.block), //認爲須要保持必要並行度時安排一個備用線程,方法內重複調用blocker.isReleasable和blocker.block.且前者必在後者前, //它返回false時纔會有後者.若是沒運行在ForkJoinPool內,那麼方法的行爲等效於下面這段代碼: //while(!blocker.isReleasable()){if(blocker.block() break;} //參數blocker是上面的接口的實現類,在前面的文章CompletableFuture和響應式編程中曾見到一個實現類. public static void managedBlock(ManagedBlocker blocker) throws InterruptedException { ForkJoinPool p; ForkJoinWorkerThread wt; Thread t = Thread.currentThread(); //1.當前是ForkJoinPool池內線程時的邏輯. if ((t instanceof ForkJoinWorkerThread) && (p = (wt = (ForkJoinWorkerThread)t).pool) != null) { WorkQueue w = wt.workQueue; //1.1取出工做隊列,進行循環,blocker.isReleasable判斷當前並不是沒有必要加鎖時進入. while (!blocker.isReleasable()) { //1.2要加鎖,嘗試補償,它會在此時喚醒一個空閒的線程或建立一個新的線程來補償當前線程的阻塞. if (p.tryCompensate(w)) { try { //1.3當前線程阻塞等待. do {} while (!blocker.isReleasable() && !blocker.block()); } finally { U.getAndAddLong(p, CTL, AC_UNIT); } break; } } } //2.非池內線程的邏輯.同上面的阻塞邏輯. else { do {} while (!blocker.isReleasable() && !blocker.block()); } }
關於blocker咱們並不陌生,在CompletableFuture和響應式編程一文中,咱們提到了CompletableFuture中內部實現了一個blocker,並使用ForkJoinPool的managedBlock方法管理.還記得這方面的實現嗎?
CompletableFuture內部維護了一個相似棧的結構,用內部類Completion和它的子類們實現,而Completion自己是ForkJoinTask的子類.
一樣,使用CompletableFuture時,咱們能夠在入口方法入包含runAsyc之類的方法,該方法默認會提供一個線程池,而此線程池會由可用核數來決定,會選定一個ForkJoinPool或一個low逼的一任務一線程的線程池.
若是選擇了ForkJoinPool,顯然能及時補償一個工做線程的阻塞是很是有必要的,這也是提高性能之舉.
到此爲止,ForkJoinPool的源碼分析完畢.
這篇文章是ForkJoin框架系列的最後一篇,前面分析了ForkJoinPool的代碼,它是ForkJoin框架的核心,代碼較爲複雜,做者我的以爲它也是全部線程池中最複雜的一個,下面咱們來總結一下ForkJoinPool和整個ForkJoin框架。
ForkJoinTask是運行在ForkJoinPool的task,它定義了任務自身的入口api,維護了任務的status字段和result,結合ForkJoinPool來實現調度。ForkJoinTask必定會運行在一個ForkJoinPool中,若是沒有顯式地交它提交到ForkJoinPool,會使用一個common池(全進程共享)來執行任務。
ForkJoinTask支持fork和join,fork就是將當前task入池,join就是等待此task的結束並獲取結果。
CountedCompleter是一個另類的ForkJoinTask,它在ForkJoinTask基礎上維護了一個棧鏈,其實在某些視角上即像棧,又像一個不保存子節點的樹。同時它也不保存運行結果,使用它去getRawResult只能獲得null,可是任務的status會進行維護(委託給父類ForkJoinTask)。並行流是基於它來實現的,調度交由CountedCompleter完成,而原集的分割,結果的合併則由並行流的邏輯實現。
ForkJoinWorkerThread是運行在ForkJoinPool中的線程,它內部會維護一個存放ForkJoinTask的WorkQueue隊列,而WorkQueue是ForkJoinPool的內部類。
ForkJoinPool是框架的核心,不一樣於其餘線程池,它的構建不須要提供核心線程數,最大線程數,阻塞隊列等,還增長了未捕獲異常處理器,而該處理器會交給工做線程,由該線程處理,這樣的好處在於當一個線程的工做隊列上的某個任務出現異常時,不至於結束掉線程,而是讓它繼續運行隊列上的其餘任務。它會依託於並行度(或默認根據核數計算)來決定最大線程數,它內部維護了WorkQueue數組ws取代了阻塞隊列,ws中下標爲奇數的爲工做線程的所屬隊列,偶數的爲共享隊列,雖然名稱有所區分,但重要的區別只有一點:共享隊列不存在工做線程。
關於工做竊取,線程池外的提交者在join一個任務或get結果時,若是發現沒有完成,它不會幹等着工做線程,而是嘗試自行執行,當執行方法結束,任務尚未完成的狀況,它能夠幫助工做線程作一些其餘工做,好比當任務是CountedCompleter類型時,幫助完成位於棧鏈前方的子任務,而這個子任務先從當前worker隊列的top找,後從其餘隊列的base找;線程池中的工做線程會在任務入隊時被嘗試喚醒,會循環執行,每輪循環都會先嚐試隨機scan到一個任務(該任務可能屬於其餘線程),執行它,再執行本地任務,如此往復,scan的過程能夠理解爲一種竊取,當不能竊取時則會inactive;工做竊取時從隊列的base開始,工做壓入時從pop進入,執行本身隊列的任務時,依託於FIFO仍是LIFO的模式。此外,反向幫助小偷(helpStealer)也是一個「反彈式」的工做竊取,它與helpComplete一併屬於工做竊取的一部分。
ForkJoinPool維護了一個ctl控制信號,前16位表示活躍worker數,33至48位表示worker總數,後32位能夠粗略理解用於表示worker等待隊列的棧頂。ForkJoinPool利用這個ctl,WorkQueue的scanState和stackPred以及ws的索引算法維護了一個相似隊列(或者叫棧更貼切一些)的數據結構。每當有一個線程偷不到任務,就會存放此前的ctl後置標記位到pred,並將本身的索引交給ctl做爲棧頂。相應的喚醒操做則由棧頂起。相應的方法在進行嘗試添加worker時,會綜合當前是否有阻塞等待任務的線程。
當全部線程都不能竊取到新的任務,進入等待隊列時,稱之爲「靜寂態」。
ForkJoinPool對全局全狀的修改須要加鎖進行,這些操做如修改ctl(改變棧頂,增刪活躍數或總數等),處理ws中的元素,擴容ws,關閉線程池,初始化(包含ws的初始化),註冊線程入池等。而這個鎖就是runState,它除了當鎖,也間接表示了運行狀態,相應的線程池的SHUTDOWN,STOP,TERMINATED等狀態均與其相應的位有關。
線程池的並行度保存在config字段的後16位,config的第17位決定了是FIFO仍是LIFO。而這個並行度也經過間接地取反並計入到ctl的前32位,線程池中判斷是否當前有活躍的線程,或者是否已進入寂靜態,都是用保存在config的並行度和保存在ctl前32位的活躍數與並行度的運算結果進行相加,判斷是否會溢出(正數)來決定的。
ForkJoinPool還提供了補償機制,用於在線程將要阻塞在執行過程當中前釋放掉一個正在空閒的工做線程或建立一個新的工做線程,從而保證了並行度。第一篇文章中提到的CompletableFuture就是將Completion棧(ForkJoinTask)交給ForkJoinPool(取決於並行度)去執行並用它來進行調度。
ForkJoinPool的關閉則可能有多種場景:當一個worker被解除註冊時,嘗試一次,並不強關,也不指定enable,只有在線程池已經收到關閉信號並處在過程當中時,它纔會幫助關閉;工做線程由於scan不到work而不得不進行await,當它發現當前線程池已處於靜寂態,也嘗試同上的關閉線程池,一樣不強行關閉也不指定enable,只有線程池已經收到關閉信號並處在過程當中時,它纔會幫助關閉;線程池外提交任務時,發現線程池已收到關閉信號,嘗試幫助關閉;手動傳入關閉信號,即調用shutdown時,會指定非now,enable,則線程池將收到關閉信號,記錄該信號,並進行關閉流程,當下一次再有前述三種狀況調用時,必然能夠進入到關閉流程;即刻關閉,shutdownNow,它會要求即刻進入關閉,不會進入非now的狀況下的release以及等待靜寂等操做。
完。