在前面的文章"CompletableFuture和響應式編程"中提到了ForkJoinTask和ForkJoinPool,後者毫無疑問是一個線程池,前者則是一個相似FutureTask經典定義的概念.node
官方有一個很是無語的解釋:ForkJoinTask就是運行在ForkJoinPool的一個任務抽象,ForkJoinPool就是運行ForkJoinTask的線程池.算法
ForkJoin框架包含ForkJoinTask,ForkJoinWorkerThread,ForkJoinPool和若干ForkJoinTask的子類,它的核心在於分治和工做竅取,最大程度利用線程池中的工做線程,避免忙的忙死,餓的餓死.編程
ForkJoinTask能夠理解爲類線程但比線程輕量的實體,在ForkJoinPool中運行的少許ForkJoinWorkerThread能夠持有大量的ForkJoinTask和它的子任務.ForkJoinTask同時也是一個輕量的Future,使用時應避免較長阻塞和io.api
ForkJoinTask在JAVA8中應用普遍,但它是一個抽象類,它的子類派生了各類用途,如後續計劃單獨介紹的CountedCompleter,以及若干JAVA8中stream api定義的與並行流有關的各類操做(ops).數組
首先看ForkJoinTask的簽名.併發
public abstract class ForkJoinTask<V> implements Future<V>, Serializable
從簽名上看,ForkJoinTask實現了future,也能夠序列化,但它不是一個Runnable或Callable.app
ForkJoinTask雖然能夠序列化,但它只對運行前和後敏感,對於執行過程當中不敏感.框架
先來看task的運行字段:異步
//volatie修飾的任務狀態值,由ForkJoinPool或工做線程修改. volatile int status; static final int DONE_MASK = 0xf0000000;//用於屏蔽完成狀態位. static final int NORMAL = 0xf0000000;//表示正常完成,是負值. static final int CANCELLED = 0xc0000000;//表示被取消,負值,且小於NORMAL static final int EXCEPTIONAL = 0x80000000;//異常完成,負值,且小於CANCELLED static final int SIGNAL = 0x00010000;//用於signal,必須不小於1<<16,默認爲1<<16. static final int SMASK = 0x0000ffff;//後十六位的task標籤
很顯然,DONE_MASK可以過濾掉全部非NORMAL,非CANCELLED,非EXCEPTIONAL的狀態,字段的含義也很直白,後面的SIGNAL和SMASK還不明確,後面再看.ide
//標記當前task的completion狀態,同時根據狀況喚醒等待該task的線程. private int setCompletion(int completion) { for (int s;;) { //開啓一個循環,若是當前task的status已是各類完成(小於0),則直接返回status,這個status多是某一次循環前被其餘線程完成. if ((s = status) < 0) return s; //嘗試將原來的status設置爲它與completion按位或的結果. if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { if ((s >>> 16) != 0) //此處體現了SIGNAL的標記做用,很明顯,只要task完成(包含取消或異常),或completion傳入的值不小於1<<16, //就能夠起到喚醒其餘線程的做用. synchronized (this) { notifyAll(); } //cas成功,返回參數中的completion. return completion; } } }
前面用註釋解釋了這個方法的邏輯,顯然該方法是阻塞的,若是傳入的參數不能將status設置爲負值會如何?
顯然,可能會有至多一次的成功cas,而且若知足喚醒的條件,會嘗試去喚醒線程,甚至可能由於爲了喚醒其餘線程而被阻塞在synchonized代碼塊外;也可能沒有一次成功的cas,直到其餘線程成功將status置爲完成.
//final修飾,運行ForkJoinTask的核心方法. final int doExec() { int s; boolean completed; //僅未完成的任務會運行,其餘狀況會忽略. if ((s = status) >= 0) { try { //調用exec completed = exec(); } catch (Throwable rex) { //發生異常,用setExceptionalCompletion設置結果 return setExceptionalCompletion(rex); } if (completed) //正常完成,調用前面說過的setCompletion,參數爲normal,並將返回值做爲結果s. s = setCompletion(NORMAL); } //返回s return s; } //記錄異常而且在符合條件時傳播異常行爲 private int setExceptionalCompletion(Throwable ex) { //首先記錄異常信息到結果 int s = recordExceptionalCompletion(ex); if ((s & DONE_MASK) == EXCEPTIONAL) //status去除非完成態標誌位(只保留前4位),等於EXCEPTIONAL.內部傳播異常 internalPropagateException(ex); return s; } //internalPropagateException方法是一個空方法,留給子類實現,可用於completer之間的異常傳遞 void internalPropagateException(Throwable ex) {} //記錄異常完成 final int recordExceptionalCompletion(Throwable ex) { int s; if ((s = status) >= 0) { //只能是異常態的status能夠記錄. //hash值禁止重寫,不使用子類的hashcode函數. int h = System.identityHashCode(this); final ReentrantLock lock = exceptionTableLock; //異常鎖,加鎖 lock.lock(); try { //抹除髒異常,後面敘述 expungeStaleExceptions(); //異常表數組.ExceptionNode後面敘述. ExceptionNode[] t = exceptionTable;//exceptionTable是一個全局的靜態常量,後面敘述 //用hash值和數組長度進行與運算求一個初始的索引 int i = h & (t.length - 1); for (ExceptionNode e = t[i]; ; e = e.next) { //找到空的索引位,就建立一個新的ExceptionNode,保存this,異常對象並退出循環 if (e == null) { t[i] = new ExceptionNode(this, ex, t[i]);//(1) break; } if (e.get() == this) //已設置在相同的索引位置的鏈表中,退出循環.//2 break; //不然e指向t[i]的next,進入下個循環,直到發現判斷包裝this這個ForkJoinTask的ExceptionNode已經出如今t[i]這個鏈表並break(2), //或者直到e是null,意味着t[i]出發開始的鏈表並沒有包裝this的ExceptionNode,則將構建一個新的ExceptionNode並置換t[i], //將原t[i]置爲它的next(1).整個遍歷判斷和置換過程處在鎖中進行. } } finally { lock.unlock(); } //記錄成功,將當前task設置爲異常完成. s = setCompletion(EXCEPTIONAL); } return s; } //exceptionTable聲明 private static final ExceptionNode[] exceptionTable;//全局異常node表 private static final ReentrantLock exceptionTableLock;//上面用到的鎖,就是一個普通的可重入鎖. private static final ReferenceQueue<Object> exceptionTableRefQueue;//變量表引用隊列,後面詳述. private static final int EXCEPTION_MAP_CAPACITY = 32;//異常表的固定容量,不大,只有32並且是全局的. //初始化在一個靜態代碼塊. static { exceptionTableLock = new ReentrantLock(); exceptionTableRefQueue = new ReferenceQueue<Object>(); exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];//容量 try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ForkJoinTask.class; STATUS = U.objectFieldOffset (k.getDeclaredField("status")); } catch (Exception e) { throw new Error(e); } } //先來看ExceptionNode內部類的實現 //簽名,實現了一個ForkJoinTask的弱引用. static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { final Throwable ex; ExceptionNode next; final long thrower; // use id not ref to avoid weak cycles final int hashCode; // store task hashCode before weak ref disappears ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) { super(task, exceptionTableRefQueue);//指向弱引用的構造函數,保存引用爲task,隊列爲全局的exceptionTableRefQueue. this.ex = ex;//拋出的異常的引用 this.next = next;//數組中的ExceptionNode以鏈表形式存在,前面分析過,先入者爲後入者的next this.thrower = Thread.currentThread().getId();//保存拋出異常的線程id(嚴格來講是建立了this的線程) this.hashCode = System.identityHashCode(task);//哈希碼保存關聯task的哈希值. } } //清除掉異常表中的髒數據,僅在持有全局鎖時纔可以使用.前面看到在記錄新的異常信息時要進行一次清除嘗試 private static void expungeStaleExceptions() { //循環條件,全局exceptionTableRefQueue隊列不爲空,前面說過ExceptionNode是弱引用,當它被回收時會被放入此隊列. for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { //從隊首依次取出元素. if (x instanceof ExceptionNode) { //計算在全局exceptionTable中的索引. int hashCode = ((ExceptionNode)x).hashCode; ExceptionNode[] t = exceptionTable; int i = hashCode & (t.length - 1); //取出node ExceptionNode e = t[i]; ExceptionNode pred = null; //不停遍歷,直到e是null爲止. while (e != null) { //e的next ExceptionNode next = e.next;//2 //x是隊首出隊的元素.它與e相等說明找到 if (e == x) { //e是一個鏈表的元素,pred表示它是否有前置元素 if (pred == null) //無前置元素,說明e在鏈表首部,直接將首部元素指向next便可. t[i] = next; else //有前置元素,說明循環過若干次,將當前e出鏈表 pred.next = next; //在鏈表中發現x即break掉內循環,繼續從exceptionTableRefQueue的隊首彈出新的元素. break; } //只要發現當前e不是x,準備下一次循環,pred指向e.e指向next,進行下一個元素的比較. pred = e; e = next; } } } }
到此doExec(也是每一個ForkJoinTask的執行核心過程)就此結束.
很明顯,ForkJoinTask的doExec負責了核心的執行,它留下了exec方法給子類實現,而重點負責了後面出現異常狀況的處理.處理的邏輯前面已論述,在產生異常時嘗試將異常存放在全局的execptionTable中,存放的結構爲數組+鏈表,按哈希值指定索引,每次存放新的異常時,順便清理上一次已被gc回收的ExceptionNode.全部ForkJoinTask共享了一個exceptionTable,所以必然在有關的幾個環節要進行及時的清理.除了剛剛論述的過程,還有以下的幾處:
前面論述了recordExceptionalCompletion,一共有四處使用了expungeStaleException,將已回收的ExceptionNode從引用隊列中清除.
clearExceptionalCompletion在對一個ForkJoinTask從新初始化時使用,咱們在前面提到序列化時說過,ForkJoinTask的序列化結果只保留了兩種狀況:運行前,運行結束.從新初始化一個ForkJoinTask,就要去除任何中間狀態,包含自身產出的已被回收的異常node,而expungeStaleExceptions顯然也順便幫助其餘task清除.
getThrowableException是查詢task運行結果時調用,如一些get/join方法,很明顯,記錄這個異常的做用就在於返回給get/join,在這一塊順便清理已被回收的node,尤爲是將本身運行時生成的node清除.
helpExpungeStaleExceptions是提供給ForkJoinPool在卸載worker時使用,順便幫助清理全局異常表.
使用它們的方法稍後再論述,先來繼續看ForkJoinTask的源碼.
//內部等待任務完成,直到完成或超時. final void internalWait(long timeout) { int s; //status小於0表明已完成,直接忽略wait. //未完成,則試着加上SIGNAL的標記,令完成任務的線程喚醒這個等待. if ((s = status) >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //加鎖,只有一個線程能夠進入. synchronized (this) { //再次判斷未完成.等待timeout,且忽略擾動異常. if (status >= 0) try { wait(timeout); } catch (InterruptedException ie) { } else //已完成則響醒其餘等待者. notifyAll(); } } }
internalWait方法邏輯很簡單,首先判斷是否未完成,知足未完成,則將標記位加上SIGNAL(可能已有別的線程作過),隨後加鎖double check status,還未完成則等待並釋放鎖,若發現已完成,或在後續被喚醒後發現已完成,則喚醒其餘等待線程.經過notifyAll的方式避免了通知丟失.
同時,它的使用方法目前只有一個ForkJoinPool::awaitJoin,在該方法中使用循環的方式進行internalWait,知足了每次按截止時間或週期進行等待,同時也順便解決了虛假喚醒.
繼續看externalAwaitDone函數.它體現了ForkJoin框架的一個核心:外部幫助.
//外部線程等待一個common池中的任務完成. private int externalAwaitDone() { int s = ((this instanceof CountedCompleter) ? //當前task是一個CountedCompleter,嘗試使用common ForkJoinPool去外部幫助完成,並將完成狀態返回. ForkJoinPool.common.externalHelpComplete( (CountedCompleter<?>)this, 0) : //當前task不是CountedCompleter,則調用common pool嘗試外部彈出該任務並進行執行, //status賦值doExec函數的結果,若彈出失敗(其餘線程先行彈出)賦0. ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { //檢查上一步的結果,即外部使用common池彈出並執行的結果(不是CountedCompleter的狀況),或外部嘗試幫助CountedCompleter完成的結果 //status大於0表示嘗試幫助完成失敗. //擾動標識,初值false boolean interrupted = false; do { //循環嘗試,先給status標記SIGNAL標識,便於後續喚醒操做. if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { //CAS成功,進同步塊發現double check未完成,則等待. wait(0L); } catch (InterruptedException ie) { //若在等待過程當中發生了擾動,不中止等待,標記擾動. interrupted = true; } } else //進同步塊發現已完成,則喚醒全部等待線程. notifyAll(); } } } while ((s = status) >= 0);//循環條件,task未完成. if (interrupted) //循環結束,若循環中間曾有擾動,則中斷當前線程. Thread.currentThread().interrupt(); } //返回status return s; }
externalAwaitDone的邏輯不復雜,在當前task爲ForkJoinPool.common的狀況下能夠在外部進行等待和嘗試幫助完成.方法會首先根據ForkJoinTask的類型進行嘗試幫助,並返回當前的status,若發現未完成,則進入下面的等待喚醒邏輯.該方法的調用者爲非worker線程.
類似的方法:externalInterruptibleAwaitDone
private int externalInterruptibleAwaitDone() throws InterruptedException { int s; //不一樣於externalAwaitDone,入口處發現當前線程已中斷,則當即拋出中斷異常. if (Thread.interrupted()) throw new InterruptedException(); if ((s = status) >= 0 && (s = ((this instanceof CountedCompleter) ? ForkJoinPool.common.externalHelpComplete( (CountedCompleter<?>)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0)) >= 0) { while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) //wait時也不catch中斷異常,發生即拋出. wait(0L); else notifyAll(); } } } } return s; }
externalInterruptibleAwaitDone的邏輯與externalAwaitDone類似,只是對中斷異常的態度爲拋,後者爲catch.
它們的使用點,externalAwaitDone爲doJoin或doInvoke方法調用,externalInterruptibleAwaitDone爲get方法調用,很明顯,join操做不可擾動,get則能夠擾動.
下面來看看doJoin和doInvoke
//join的核心方法 private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; //已完成,返回status,未完成再嘗試後續 return (s = status) < 0 ? s : //未完成,當前線程是ForkJoinWorkerThread,從該線程中取出workQueue,並嘗試將 //當前task出隊而後執行,執行的結果是完成則返回狀態,不然使用當線程池所在的ForkJoinPool的awaitJoin方法等待. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : //當前線程不是ForkJoinWorkerThread,調用前面說的externalAwaitDone方法. externalAwaitDone(); } //invoke的核心方法 private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; //先嚐試本線程執行,不成功才走後續流程 return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //與上一個方法基本相同,但在當前線程是ForkJoinWorkerThread時不嘗試將該task移除棧並執行,而是等 (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); }
到此終於能夠看一些公有對外方法了.有了前面的基礎,再看get,join,invoke等方法很是簡單.
//get方法還有get(long time)的變種. public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? //當前線程是ForkJoinWorkerThread則調用前面提過的doJoin方法. //不然調用前述externalInterruptibleAwaitDone doJoin() : externalInterruptibleAwaitDone(); Throwable ex; if ((s &= DONE_MASK) == CANCELLED) //異常處理,取消的任務,拋出CancellationException. throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) //異常處理,調用getThrowableException獲取異常,封進ExecutionException. throw new ExecutionException(ex); //無異常處理,返回原始結果. return getRawResult(); } //getRawResult默認爲一個抽象實現,在ForkJoinTask中,並未保存該結果的字段. public abstract V getRawResult(); //getThrowableException方法 private Throwable getThrowableException() { //不是異常標識,直接返回null,從方法名的字面意思看,要返回一個可拋出的異常. if ((status & DONE_MASK) != EXCEPTIONAL) return null; //系統哈希碼來定位ExceptionNode int h = System.identityHashCode(this); ExceptionNode e; final ReentrantLock lock = exceptionTableLock; //加異常表全局鎖 lock.lock(); try { //先清理已被回收的異常node,前面已述. expungeStaleExceptions(); ExceptionNode[] t = exceptionTable; e = t[h & (t.length - 1)]; //循環找出this匹配的異常node while (e != null && e.get() != this) e = e.next; } finally { lock.unlock(); } Throwable ex; //前面找不出異常node或異常node中存放的異常爲null,則返回null if (e == null || (ex = e.ex) == null) return null; if (e.thrower != Thread.currentThread().getId()) { //不是當前線程拋出的異常. Class<? extends Throwable> ec = ex.getClass(); try { Constructor<?> noArgCtor = null;//該異常的無參構造器 Constructor<?>[] cs = ec.getConstructors();//該異常類公有構造器 for (int i = 0; i < cs.length; ++i) { Constructor<?> c = cs[i]; Class<?>[] ps = c.getParameterTypes(); if (ps.length == 0) //構建器參數列表長度0說明存在無參構造器,存放. noArgCtor = c; else if (ps.length == 1 && ps[0] == Throwable.class) { //發現有參構造器且參數長度1且第一個參數類型是Throwable,說明能夠存放cause. //反射將前面取出的ex做爲參數,反射調用該構造器建立一個要拋出的Throwable. Throwable wx = (Throwable)c.newInstance(ex); //反射失敗,異常會被catch,返回ex,不然返回wx. return (wx == null) ? ex : wx; } } if (noArgCtor != null) { //在嘗試了尋找有參無參構造器,並發現只存在無參構造器的狀況,用無參構造器初始化異常. Throwable wx = (Throwable)(noArgCtor.newInstance()); if (wx != null) { //將ex設置爲它的cause並返回它的實例. wx.initCause(ex); return wx; } } } catch (Exception ignore) { //此方法不可拋出異常,必定要成功返回. } } //有參無參均未成功,返回找到的異常. return ex; } //join公有方法 public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) //調用doJoin方法阻塞等待的結果不是NORMAL,說明有異常或取消.報告異常. reportException(s); //等於NORMAL,正常執行完畢,返回原始結果. return getRawResult(); } //報告異常,可在前一步判斷執行status是否爲異常態,而後獲取並重拋異常. private void reportException(int s) { //參數s必須用DONE_MASK處理掉前4位之後的位. if (s == CANCELLED) //傳入的狀態碼等於取消,拋出取消異常. throw new CancellationException(); if (s == EXCEPTIONAL) //使用前面的getThrowableException方法獲取異常並從新拋出. rethrow(getThrowableException()); } //invoke公有方法. public final V invoke() { int s; //先嚐試執行 if ((s = doInvoke() & DONE_MASK) != NORMAL) //doInvoke方法的結果status只保留完成態位表示非NORMAL,則報告異常. reportException(s); //正常完成,返回原始結果. return getRawResult(); }
終於,讀到此處的讀者將關鍵的方法線串了起來,前述的全部內部方法,常量和變量與公有接口的關係已經明瞭.
很顯然,ForkJoinTask是個抽象類,且它並未保存任務的完成結果,也不負責這個結果的處理,但聲明並約束了返回結果的抽象方法getRawResult供子類實現.
所以,ForkJoinTask的自身關注任務的完成/異常/未完成,子類關注這個結果的處理.
每當獲取到任務的執行狀態時,ForkJoinTask可根據status來判斷是不是異常/正常完成,並進入相應的處理邏輯,最終使用子類實現的方法完成一個閉環.
若是理解爲將ForkJoinTask和子類的有關代碼合併起來,在結果/完成狀態/異常信息這一塊,至關於同時有三個part在合做.
第一個part:status字段,它同時表示了未完成/正常完成/取消/異常完成等狀態,也同時告訴有關等待線程是否要喚醒其餘線程(每一個線程等待前會設置SIGNAL),同時留出了後面16位對付其餘狀況.
第二個part:result,在ForkJoinTask見不到它,也沒有相應的字段,子類也未必須要提供這個result字段,前面提到的CountedCompleter就沒有提供這個result,它的getRawResult會固定返回null.可是CountedCompleter能夠繼承子類並實現這個result的保存與返回(道格大神在註釋中舉出了若干典型代碼例子),在JAVA8中,stream api中的並行流也會保存每一步的計算結果,並對結果進行合併.
第三個part:異常.在ForkJoinTask中已經完成了全部異常處理流程和執行流程的定義,重點在於異常的存放,它是由ForkJoinTask的類變量進行存放的,結構爲數組+鏈表,且元素利用了弱引用,借gc幫助清除掉已經被回收的ExceptionNode,顯然在gc以前必須獲得使用.而異常隨時能夠發生並進行record入列,但相應的能消費掉這個異常的只有相應的外部的get,join,invoke等方法或者內部擴展了exec()等方式,獲得其餘線程執行的task異常結果的狀況.巧妙的是,只有外部調用者調用(get,invoke,join)時,這個異常信息才足夠重要,須要rethrow出去並保存關鍵的堆棧信息;而內部線程在訪問一些非自身執行的任務時,每每只須要status判斷是否異常便可,在exec()中fork新任務的,也每每必須當即join這些新的子任務,這就保證了可以及時獲得子任務中的異常堆棧(即便拿不到堆棧也知道它失敗了).
通過前面的論述,ForkJoinTask的執行和異常處理已經基本論結,可是,一個ForkJoinTask在建立以後是如何運行的?顯然,它不是一個Runnable,也不是Callable,不能直接submit或execute到普通的線程池.
臨時切換到ForkJoinPool的代碼,前面提到過,ForkJoinTask的官方定義就是能夠運行在ForkJoinPool中的task.
//ForkJoinPool代碼,submit一個ForkJoinTask到ForkJoinPool,並將該task自身返回. //拿到返回的task,咱們就能夠進行前述的get方法了. public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } //execute,不返回.相似普通線程池提交一個runnable的行爲. public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); externalPush(task); }
顯然,若要使用一個自建的ForkJoinPool,可使用execute或submit函數提交入池,而後用前述的get方法和變種方法進行.這是一種運行task的方式.
前面論述過的invoke方法會先去先去嘗試本地執行,而後纔去等待,故咱們本身new一個ForkJoinTask,同樣能夠經過invoke直接執行,這是第二種運行task的方式.
前面論述的join方法在某種狀況下也是一種task的運行方式,在當前線程是ForkJoinWorkerThread時,會去嘗試將task出隊並doExec,也就是會先用本線程執行一次,不成功才幹等,非ForkJoinWorkerThread則直接乾等了.顯然咱們能夠本身構建一個ForkJoinWorkerThread並去join,這時會將任務出隊並執行(但存在一個問題:何時入隊).且出隊後若未執行成功,則awaitJoin(參考ForkJoinPool::awaitJoin),此時因任務已出隊,不會被竊取或幫助(在awaitJoin中會有helpStealer,但其實任務是當前線程本身"偷走"了),彷佛徹底要靠本身了.但並不表示ForkJoinTask子類沒法獲取這個已出隊的任務,好比CountedCompleter使用時,能夠在compute中新生成的Completer時,將源CountedCompleter(ForkJoinTask的子類)做爲新生成的CountedCountedCompleter的completer(該子類中的一個字段),這樣,如有一個ForkJoinWorkerThread竊取了這個新生成的CountedCompleter,能夠經過completer鏈表找到先前被出隊的CountedCompleter(ForkJoinTask).關於CountedCompleter單獨文章詳述.
除此以外呢?包含前面提到的使用join操做不是ForkJoinWorkerThread調用的狀況,不使用ForkJoinPool的submit execute入池,如何能讓一個ForkJoinTask在未來執行?咱們來看後面的方法.
//fork方法,將當前任務入池. public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) //若是當前線程是ForkJoinWorkerThread,將任務壓入該線程的任務隊列. ((ForkJoinWorkerThread)t).workQueue.push(this); else //不然調用common池的externalPush方法入隊. ForkJoinPool.common.externalPush(this); return this; }
顯然,咱們還能夠經過對一個ForkJoinTask進行fork方法入池,入哪一個池徹底取決於當前線程的類型.這是第四種讓任務能被運行的方式.
一樣,咱們也看到了第五種方式,ForkJoinPool.common其實就是一個常量保存的ForkJoinPool,它可以調用externalPush,咱們天然也能夠直接new一個ForkJoinPool,而後將當前task進行externalPush,字面意思外部壓入.這種辦法,非ForkJoinWorkerThread也能將任務提交到非common的ForkJoinPool.
從名字來看,ForkJoinTask彷佛已經說明了一切,按照官方的註釋也是如此.對一個task,先Fork壓隊,再Join等待執行結果,這是一個ForkJoinTask的執行週期閉環(但不要簡單理解爲生命週期,前面提到過,任務能夠被從新初始化,並且從新初始化時還會清空ExceptionNode數組上的已回收成員).
到此爲止,ForkJoinTask的核心函數和api已經基本瞭然,其它同類型的方法以及周邊的方法均不難理解,如invokeAll的各類變種.下面來看一些"周邊"類型的函數.有前述的基礎,它們很好理解.
//取消一個任務的執行,直接將status設置成CANCELLED,設置後判斷該status 是否爲CANCELLED,是則true不然false. public boolean cancel(boolean mayInterruptIfRunning) { return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; } //判斷是否完成,status小於0表明正常完成/異常完成/取消,很好理解. public final boolean isDone() { return status < 0; } //判斷當前任務是否取消. public final boolean isCancelled() { //status前4位 return (status & DONE_MASK) == CANCELLED; } public final boolean isCompletedAbnormally() { //是否爲異常完成,前面說過,CANCELLED和EXCEPTIONAL均小於NORMAL return status < NORMAL; } //是否正常完成. public final boolean isCompletedNormally() { //完成態位等於NORMAL return (status & DONE_MASK) == NORMAL; } //獲取異常. public final Throwable getException() { int s = status & DONE_MASK; //當爲正常完成或未完成時,返回null. return ((s >= NORMAL) ? null : //是取消時,新建一個取消異常. (s == CANCELLED) ? new CancellationException() : //不是取消,參考前面提到的getThrowableException. getThrowableException()); } //使用異常完成任務. public void completeExceptionally(Throwable ex) { //參考前述的setExceptionalCompletion, //ex已是運行時異常或者Error,直接使用ex完成,如果受檢異常,包裝成運行時異常. setExceptionalCompletion((ex instanceof RuntimeException) || (ex instanceof Error) ? ex : new RuntimeException(ex)); } //使用value完成任務. public void complete(V value) { try { //設置原始結果,它是一個空方法.前面說過ForkJoinTask沒有維護result之類的結果字段,子類可自行發揮. setRawResult(value); } catch (Throwable rex) { //前述步驟出現異常,就用異常方式完成. setExceptionalCompletion(rex); return; } //前面的結果執行完,標記當前爲完成. setCompletion(NORMAL); } //安靜完成任務.直接用NORMAL setCompletion,沒什麼好說的. public final void quietlyComplete() { setCompletion(NORMAL); } //安靜join,它不會返回result也不會拋出異常.處理集合任務時,若是須要全部任務都被執行而不是一個執行出錯(取消)其餘也跟着出錯的狀況下, //很明顯適用,這不一樣於invokeAll,靜態方法invokeAll或invoke(ForkJoinTask,ForkJoinTask)會在任何一個任務出現異常後取消執行並拋出. public final void quietlyJoin() { doJoin(); } //安靜執行一次,不返回結果不拋出異常,沒什麼好說的. public final void quietlyInvoke() { doInvoke(); } //從新初臺化當前task public void reinitialize() { if ((status & DONE_MASK) == EXCEPTIONAL) //若是當前任務是異常完成的,清除異常.該方法參考前面的論述. clearExceptionalCompletion(); else //不然重置status爲0. status = 0; } //反fork. public boolean tryUnfork() { Thread t; //當前線程是ForkJoinWorkerThread,從它的隊列嘗試移除. return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : //當前線程不是ForkJoinWorkerThread,用common池外部移除. ForkJoinPool.common.tryExternalUnpush(this)); }
上面是一些簡單的周邊方法,大多並不須要再論述了,unfork方法很明顯在某些場景下不會成功,顯然,當一個任務剛剛入隊並未進行後續操做時,極可能成功.按前面所述,當對一個任務進行join時,可能會成功的彈出當前任務並執行,此時不可能再次彈出;當一個任務被其餘線程竊取或被它自己執行的也不會彈出.
再來看一些老朋友,在前面的文章"CompletableFuture和響應式編程"一文中,做者曾着重強調過它將每一個要執行的動做進行壓棧(未能當即執行的狀況),而棧中的元素Completion便是ForkJoinTask的子類,而標記該Completion是否被claim的方法和周邊方法以下:
//獲取ForkJoinTask的標記,返回結果爲short型 public final short getForkJoinTaskTag() { //status的後16位 return (short)status; } //原子設置任務的標記位. public final short setForkJoinTaskTag(short tag) { for (int s;;) { //不停循環地嘗試將status的後16位設置爲tag. if (U.compareAndSwapInt(this, STATUS, s = status, //替換的結果,前16位爲原status的前16位,後16位爲tag. (s & ~SMASK) | (tag & SMASK))) //返回被換掉的status的後16位. return (short)s; } } //循環嘗試原子設置標記位爲tag,前提是原來的標記位等於e,成功true失敗false public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { for (int s;;) { if ((short)(s = status) != e) //若是某一次循環的原標記位不是e,則返回false. return false; //同上個方法 if (U.compareAndSwapInt(this, STATUS, s, (s & ~SMASK) | (tag & SMASK))) return true; } }
還記得CompletableFuture在異步執行Completion時要先claim嗎?claim方法中,會嘗試設置這個標記位.這是截止jdk8中CompletableFuture使用到ForkJoinTask的功能.
目前來看,在CompletableFuture的內部實現Completion尚未使用到ForkJoinTask的其餘屬性,好比放入一個ForkJoinPool執行(沒有任何前面總結的調用,好比用ForkJoinPool的push,execute,submit等,也沒有fork到common池).可是很明顯,道格大神令它繼承自ForkJoinTask不可能純粹只爲了使用區區一個標記位,試想一下,在如此友好支持響應式編程的CompletableFuture中傳入的每個action均可以生成若干新的action,那麼CompletableFuture負責將這些action封裝成Completion放入ForkJoinPool執行,將最大化利用到ForkJoin框架的工做竊取和外部幫助的功效,強力結合分治思想,這將是多麼優雅的設計.或者在jdk9-12中已經出現了相應的Completion實現(儘管做者寫過JAVA9-12,遺憾的是也沒有去翻它們的源碼).
另外,儘管Completion的衆多子類也沒有result之類的表示結果的字段,但它的一些子類經過封裝,實際上間接地將這個Completion所引用的dep的result做爲了本身的"result",固然,getRawResult依舊是null,可是理念倒是相通的.
以上是ForkJoinTask的部分核心源碼,除了上述的源碼外,還有一些同屬於ForkJoinTask的核心源碼部分,好比其餘的public方法(參考join fork invoke 便可),一些利用ForkJoinPool的實現,要深刻了解ForkJoinPool才能瞭解的方法,一些不太難的靜態方法等,這些沒有必要論述了.
除了核心源碼外,ForkJoinTask也提供了對Runnable,Callable的適配器實現,這塊很好理解,簡單看一看.
//對Runnable的實現,若是在ForkJoinPool中提交一個runnable,會用它封裝成ForkJoinTask static final class AdaptedRunnable<T> extends ForkJoinTask<T> implements RunnableFuture<T> { final Runnable runnable; T result; AdaptedRunnable(Runnable runnable, T result) { //不能沒有runnable if (runnable == null) throw new NullPointerException(); this.runnable = runnable; //對runnable作適配器時,能夠提交將結果傳入,並設置爲當前ForkJoinTask子類的result. //前面說過,ForkJoinTask不以result做爲完成標記,判斷一個任務是否完成或異常,使用status足以, //返回的結果才使用result. this.result = result; } public final T getRawResult() { return result; } public final void setRawResult(T v) { result = v; } //前面說過提交入池的ForkJoinTask最終會運行doExec,而它會調用exec,此處會調用run. public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L;//序列化用 } //無結果的runnable適配器 static final class AdaptedRunnableAction extends ForkJoinTask<Void> implements RunnableFuture<Void> { final Runnable runnable; AdaptedRunnableAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } //區別就是result固定爲null,也不能set public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } //對runnable的適配器,但強制池中的工做線程在執行任務發現異常時拋出 static final class RunnableExecuteAction extends ForkJoinTask<Void> { final Runnable runnable; RunnableExecuteAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } //默認null結果,set也是空實現 public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } void internalPropagateException(Throwable ex) { //前面說過doExec會被執行,它會調exec並catch,在catch塊中設置當前任務爲異常完成態, //而後調用internalPropagateException方法,而在ForkJoinTask中默認爲空實現. //此處將異常從新拋出,將形成worker線程拋出異常. rethrow(ex); } private static final long serialVersionUID = 5232453952276885070L; } //對callable的適配器,當將callable提交至ForkJoinPool時使用. static final class AdaptedCallable<T> extends ForkJoinTask<T> implements RunnableFuture<T> { final Callable<? extends T> callable; T result; AdaptedCallable(Callable<? extends T> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; } //字段中有一個result,直接使用它返回. public final T getRawResult() { return result; } //result可外部直接設置. public final void setRawResult(T v) { result = v; } public final boolean exec() { try { //默認的result用call函數設置. result = callable.call(); return true; } catch (Error err) { //catch住Error,拋出 throw err; } catch (RuntimeException rex) { //catch住運行時異常,拋出 throw rex; } catch (Exception ex) { //catch住受檢異常,包裝成運行時異常拋出. throw new RuntimeException(ex); } } //run方法同樣只是調用invoke,進而調用doExec. public final void run() { invoke(); } private static final long serialVersionUID = 2838392045355241008L; } //runnable生成適配器的工具方法 public static ForkJoinTask<?> adapt(Runnable runnable) { return new AdaptedRunnableAction(runnable); } //指定結果設置runnable的適配器工具方法 public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) { return new AdaptedRunnable<T>(runnable, result); } //對callable生成適配器的方法. public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) { return new AdaptedCallable<T>(callable); }
以上的代碼都不復雜,只要熟悉了ForkJoinTask的自己代碼結構,對於這一塊瞭解很是容易,這也間接說明了ForkJoinPool中是如何處理Runnable和Callable的(由於ForkJoinPool自己也是一種線程池,能夠接受提交Callable和Runnable).
將runnable提交到pool時,能夠指定result,也能夠不指定,也能夠用submit或execute方法區分異常處理行爲,ForkJoinPool會自行選擇相應的適配器.
將callable 提交到pool時,pool會選擇對callable的適配器,它的結果將爲task的結果,它的異常將爲task的異常.
到此爲止,ForkJoinTask的源碼分析完成.
本文詳細分析了ForkJoinTask的源碼,並解釋了前文CompletableFuture中Completion與它的關聯,以及分析了Completion繼承自ForkJoinTask目前已帶來的功能利用(tag)和未來可能增長的功用(一個Completion產生若干多個Completion並在ForkJoinPool中運行,還支持工做竊取).
同時本文也對ForkJoinPool和ForkJoinWorkerThread,以及CountedCompleter和Stream api中的並行流進行了略微的描述.
在文章的最後,或許有一些新手讀者會好奇,咱們究竟何時會使用ForkJoinTask?
首先,若是你在項目中大肆使用了流式計算,並使用了並行流,那麼你已經在使用了.
前面提過,官方解釋ForkJoinTask能夠視做比線程輕量許多的實體,也是輕量的Future.結合在源碼中時不時出來秀存在感的ForkJoinWorkerThread,顯然它就是聽說比普通線程輕量一些的線程,在前面的源碼中能夠看出,它維護了一組任務的隊列,每一個線程負責完成隊列中的任務,也能夠偷其餘線程的任務,甚至池外的線程均可以時不時地來個join,順便幫助出隊執行任務.
顯然,對於重計算,輕io,輕阻塞的任務,適合使用ForkJoinPool,也就使用了ForkJoinTask,你不會認爲它能夠提交runnable和callable,就能夠不用ForkJoinTask了吧?前面的適配器ForkJoinPool在這種狀況下必用的,能夠去翻相應的源碼.
本章沒有去詳述CountedCompleter,但前面論述時說過,你能夠在exec()中將一個計算複雜的任務拆解爲小的子任務,而後將子任務入池執行,父任務合併子任務的結果.這種分治的算法此前基本是在單線程模式下運行,使用ForkJoinTask,則能夠將這種計算交給一個ForkJoinPool中的全部線程並行執行.