根據前文描述的Doug Lea的理論基礎,在JDK1.7中已經給出了Fork Join的實現。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask這樣5個類。本文就對JDK1.7中增長這5個工具類實現作簡要分析。java
0. JDK中ForkJoin實現概述數組
在JavaSE7的API和JDK1.7中,分別集成了支持ForkJoin的五個類:併發
ForkJoinPool維護了多個線程構成的數組,維護了任務提交隊列,給出了多個線程之間工做竊取的實現。給出了任務類型適配,和提交任務邏輯的實現。須要和線程緊密配合。框架
而ForkJoinWorkerThread則繼承了java.lang.Thread類,維護了線程本身的隊列,同一個任務fork()操做原則上會添加到同一個線程隊列中。而這個線程類須要和ForkJoinPool緊密合做,有指向對應ForkJoinPool對象的引用。less
ForkJoinTask則實現了Future接口,除了對接口的實現外,主要是fork()和join()操做。注意,貌似fork()只有ForkJoinWorkerThread 中才能執行。dom
兩個子類RecursiveAction和RecursiveTask則實現比較簡單,區別就在於返回值的處理不一樣。工具
1. ForkJoinPoolthis
ForkJoinPool是實現了 Fork Join 的線程池。看JDK源碼咱們知道ForkJoinPool是extends AbstractExecutorService的,也就是說間接地實現了Executor和ExecutorService接口。實際上也就意味着ForkJoinPool是繼ThreadPoolExecutor後的又一個Executor(Service)的具體實現。線程
1.1. 構建初始化code
咱們先看ForkJoinPool的構造方法,一共有3個重載的實現。有一個單參數的默認實現,一般咱們使用這個就足夠了,這最終會以默認的參數調用3參數的構造方法。咱們再來看3個參數的構造方法實現。其中:
1.2. 任務提交
前面已經提到,ForkJoinPool也是Executor(Service)的實現,那麼execute()和submit()這樣向ThreadPoolExecutor提交任務的方法對於ForkJoinPool來講也是同樣有效的。
須要說明的是,除了增長支持ForkJoinTask對象參數的重載實現外,還在Runnable和Callable參數的方法中對原始的Runnable和Callable對象作了到ForkJoinTask的適配,使用的分別是ForkJoinTask的靜態內部類AdaptedRunnable和AdaptedCallable的對象。而這兩個類型參數對應的方法最終都會調用ForkJoinTask參數的方法:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); forkOrSubmit(task); return task; }
咱們接下來再看下任務提交中被調用到的forkOrSubmit()方法:
private <T> void forkOrSubmit(ForkJoinTask<T> task) { ForkJoinWorkerThread w; Thread t = Thread.currentThread(); if (shutdown) throw new RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this) w.pushTask(task); else addSubmission(task); }
邏輯很容易理解,先判斷ForkJoinPool的狀態,若已中止,則拋異常返回。以後若是當前線程是ForkJoinWorkerThread類型的,則將任務追加到ForkJoinWorkerThread對象中維護的隊列上,不然將新的任務放入ForkJoinPool的提交隊列中,並通知線程工做。
1.3. 線程的啓動和工做
前面已經強調過,ForkJoinPool和ForkJoinWorkerThread是緊密相關,耦合在一塊兒的。Thread的start()會調用run(),而ForkJoinWorkerThread類重寫了run()方法,會調用對應的線程池ForkJoinPool對象的work()方法。
咱們來看一下work()方法的實現。
final void work(ForkJoinWorkerThread w) { boolean swept = false; // true on empty scans long c; while (!w.terminate && (int)(c = ctl) >= 0) { int a; // active count if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0) swept = scan(w, a); else if (tryAwaitWork(w, c)) swept = false; } }
裏面主要是一個while循環體,只要當前的線程和線程池不是處於終止狀態,則這個循環一直執行。執行的內容則是這樣的,若是可以根據scan()方法獲得任務,並執行,不然進入阻塞狀態。
咱們來看一下scan()方法的實現。
private boolean scan(ForkJoinWorkerThread w, int a) { int g = scanGuard; // mask 0 avoids useless scans if only one active int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws == null || ws.length <= m) // staleness check return false; for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) { ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; ForkJoinWorkerThread v = ws[k & m]; if (v != null && (b = v.queueBase) != v.queueTop && (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && v.queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { int d = (v.queueBase = b + 1) - v.queueTop; v.stealHint = w.poolIndex; if (d != 0) signalWork(); // propagate if nonempty w.execTask(t); } r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); return false; // store next seed } else if (j < 0) { // xorshift r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; } else ++k; } if (scanGuard != g) // staleness check return false; else { // try to take submission ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; if ((b = queueBase) != queueTop && (q = submissionQueue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { queueBase = b + 1; w.execTask(t); } return false; } return true; // all queues empty } }
看起來很複雜,實際的原理則很簡單,就是先嚐試作任務竊取( Work Stealing ),若是不知足條件則到提交隊列中獲取任務。而ForkJoinWorkerThread線程自己也維護了線程內fork和join任務操做獲得的隊列,結合起來,整體執行任務的順序就是:
1.4. ForkJoinPool的其它屬性
除了上述提到的操做,ForkJoin中還維護了
等數據屬性。
2. ForkJoinWorkerThread
ForkJoinWorkerThread擴展於Thread類,但提供了不少支持ForkJoin的特性。
上文在介紹ForkJoinPool的時候已經對這個類作了不少描述,也強調過線程類ForkJoinWorkerThread和ForkJoinPool相互依賴,放在一塊兒纔有意義。實際上,還要提到描述Fork Join任務的類ForkJoinTask。
除了上面提到的之外,對於ForkJoinWorkerThread這個類,再稍微提一下這樣幾個點:
3. ForkJoinTask及兩個抽象子類
ForkJoinTask是ForkJoin框架中的主體,是ForkJoin中任務的體現。這個類實現了Future和Serializable接口。除了Futrue接口要知足的方法外,我想有這樣3個方法是有必要知道的,分別是fork()、join()和exec()。
對於fork(),這個也許你們都很熟悉了,在這裏也就是分解出子任務的執行。這個在實現上很簡單那,就是在當前線程ForkJoinWorkerThread對象維護的隊列中加入新的子任務。實現以下:
public final ForkJoinTask fork() { ((ForkJoinWorkerThread) Thread.currentThread()) .pushTask(this); return this; }
須要注意的是fork()方法的調用是在當前線程對象爲ForkJoinWorkerThread的條件下。
咱們再來看看對應的join()實現:
public final V join() { if (doJoin() != NORMAL) return reportResult(); else return getRawResult(); }
顯然,它有調用了doJoin()方法,咱們再來深刻了解下。
private int doJoin() { Thread t; ForkJoinWorkerThread w; int s; boolean completed; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { if ((s = status) < 0) return s; if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) return setCompletion(NORMAL); } return w.joinTask(this); } else return externalAwaitDone(); }
大概的邏輯是這樣的,在當前線程對象爲ForkJoinWorkerThread的條件下,從隊列中取回當前任務ForkJoinTask對象,並嘗試在調用線程對其直接執行,不然當前線程調用wait()阻塞等待。更深刻的理解可續繼續查閱源碼。
最後,咱們再來看看exec()方法,這個是在ForkJoinTask中是沒有給出實現的。
在JDK中,有ForkJoinTask的兩個抽象子類RecursiveAction和RecursiveTask,他們分別給出了exec()的實現,這也是這兩個子類主要作的事情,其實是調用了各自的compute()方法,而在RecursiveAction和RecursiveTask中compute()又是未給出實現的。
實際上,compute()方法就是Fork Join要執行的內容,是Fork Join任務的實質,須要開發者給出。
而RecursiveAction和RecursiveTask就是方便開發者使用Fork Join的,RecursiveAction和RecursiveTask這兩個類的區別僅僅是返回結果的狀況不一樣。而這個compute()方法就是留給開發者繼承擴展使用的。這個會在下篇文章詳細講述。