JDK 1.7 時,標準類庫添加了 ForkJoinPool
,做爲對 Fork/Join 型線程池的實現。Fork 在英文中有 分叉 的意思,而 Join 有 合併 的意思。ForkJoinPool
的功能也是如此:Fork 將大任務分叉爲多個小任務,而後讓小任務執行,Join 是得到小任務的結果,而後進行合併,將合併的結果做爲大任務的結果 —— 而且這會是一個遞歸的過程 —— 由於任務若是足夠大,能夠將任務多級分叉直到任務足夠小。算法
因而可知,ForkJoinPool
能夠知足 並行 地實現 分治算法(Divide-and-Conquer) 的須要。併發
ForkJoinPool
的類圖以下:異步
能夠看到 ForkJoinPool
實現了 ExecutorService
接口,因此首先 ForkJoinPool
也是一個 ExecutorService
(線程池)。於是 Runnable
和 Callable
類型的任務,ForkJoinPool
也能夠經過 submit
、invokeAll
和 invokeAny
等方法來執行。可是標準類庫還爲 ForkJoinPool
定義了一種新的任務,它就是 ForkJoinTask<V>
。ide
ForkJoinTask
類圖:高併發
ForkJoinTask<V>
用來專門定義 Fork/Join 型任務 —— 完成將大任務分割爲小任務以及合併結果的工做。通常咱們不須要直接使用 ForkJoinTask<V>
,而是經過繼承它的子類 RecursiveAction
和 RecursiveTask
並實現對應的抽象方法 —— compute
,來定義咱們本身的任務。其中,RecursiveAction
是不帶返回值的 Fork/Join 型任務,因此使用此類任務並不產生結果,也就不涉及到結果的合併;而 RecursiveTask
是帶返回值的 Fork/Join 型任務,使用此類任務的話,在任務結束前,咱們須要進行結果的合併。其中,經過 ForkJoinTask<V>
的 fork
方法,咱們能夠產生子任務並執行;經過 join
方法,咱們能夠得到子任務的結果。this
ForkJoinPool
可使用三種方法用來執行 ForkJoinTask
:spa
invoke
方法:線程
invoke
方法用來執行一個帶返回值的任務(一般繼承自RecursiveTask
),而且該方法是阻塞的,直到任務執行完畢,該方法纔會中止阻塞並返回任務的執行結果。3d
submit
方法:code
除了從 ExecutorService
繼承的 submit
方法外,ForkJoinPool
還定義了用來執行 ForkJoinTask
的 submit
方法 —— 通常該 submit
方法用來執行帶返回值的ForkJoinTask
(一般繼承自RecursiveTask
)。該方法是非阻塞的,調用以後將任務提交給 ForkJoinPool
去執行便當即返回,返回的即是已經提交到 ForkJoinPool
去執行的 task —— 由類圖可知 ForkJoinTask
實現了 Future
接口,因此能夠直接經過 task 來和已經提交的任務進行交互。
execute
方法:
除了從 Executor
得到的 execute
方法外,ForkJoinPool
也定義了用來執行ForkJoinTask
的 execute
方法 —— 通常該 execute
方法用來執行不帶返回值的ForkJoinTask
(一般繼承自RecursiveAction
) ,該方法一樣是非阻塞的。
如今讓咱們來實踐下 ForkJoinPool
的功能:計算 π 的值。計算 π 的值有一個經過多項式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),並且多項式的項數越多,計算出的 π 的值越精確。
首先咱們定義用來估算 π 的 PiEstimateTask
:
class PiEstimateTask extends RecursiveTask<Double> { private final long begin; private final long end; private final long threshold; // 分割任務的臨界值 public PiEstimateTask(long begin, long end, long threshold) { this.begin = begin; this.end = end; this.threshold = threshold; } @Override protected Double compute() { // 實現 compute 方法 if (end - begin <= threshold) { // 臨界值之下,再也不分割,直接計算 int sign; // 符號,多項式中偶數位取 1,奇數位取 -1(位置從 0 開始) double result = 0.0; for (long i = begin; i < end; i++) { sign = (i & 1) == 0 ? 1 : -1; result += sign / (i * 2.0 + 1); } return result * 4; } // 分割任務 long middle = (begin + end) / 2; PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold); PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold); leftTask.fork(); // 異步執行 leftTask rightTask.fork(); // 異步執行 rightTask double leftResult = leftTask.join(); // 阻塞,直到 leftTask 執行完畢返回結果 double rightResult = rightTask.join(); // 阻塞,直到 rightTask 執行完畢返回結果 return leftResult + rightResult; // 合併結果 } }
而後咱們使用 ForkJoinPool
的 invoke
執行 PiEstimateTask
:
public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 計算 10 億項,分割任務的臨界值爲 1 千萬 PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000); double pi = forkJoinPool.invoke(task); // 阻塞,直到任務執行完畢返回結果 System.out.println("π 的值:" + pi); forkJoinPool.shutdown(); // 向線程池發送關閉的指令 } }
運行結果:
咱們也可使用 submit
方法異步的執行任務(此處 submit
方法返回的 future 指向的對象即提交任務時的 task):
public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(4); PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000); Future<Double> future = forkJoinPool.submit(task); // 不阻塞 double pi = future.get(); System.out.println("π 的值:" + pi); System.out.println("future 指向的對象是 task 嗎:" + (future == task)); forkJoinPool.shutdown(); // 向線程池發送關閉的指令 }
運行結果:
值得注意的是,選取一個合適的分割任務的臨界值,對 ForkJoinPool
執行任務的效率有着相當重要的影響。臨界值選取過大,任務分割的不夠細,則不能充分利用 CPU;臨界值選取太小,則任務分割過多,可能產生過多的子任務,致使過多的線程間的切換和加劇 GC 的負擔從而影響了效率。因此,須要根據實際的應用場景選擇一個合適的分割任務的臨界值。
ForkJoinPool
相比於 ThreadPoolExecutor
,還有一個很是重要的特色(優勢)在於,ForkJoinPool
具備 Work-Stealing (工做竊取)的能力。所謂 Work-Stealing,在 ForkJoinPool
中的實現爲:線程池中每一個線程都有一個互不影響的任務隊列(雙端隊列),線程每次都從本身的任務隊列的隊頭中取出一個任務來運行;若是某個線程對應的隊列已空而且處於空閒狀態,而其餘線程的隊列中還有任務須要處理可是該線程處於工做狀態,那麼空閒的線程能夠從其餘線程的隊列的隊尾取一個任務來幫忙運行 —— 感受就像是空閒的線程去偷人家的任務來運行同樣,因此叫 「工做竊取」。
Work-Stealing 的適用場景是不一樣的任務的耗時相差比較大,即某些任務須要運行較長時間,而某些任務會很快的運行完成,這種狀況下用 Work-Stealing 很合適;可是若是任務的耗時很平均,則此時 Work-Stealing 並不適合,由於竊取任務時不一樣線程須要搶佔鎖,這可能會形成額外的時間消耗,並且每一個線程維護雙端隊列也會形成更大的內存消耗。因此 ForkJoinPool
並非 ThreadPoolExecutor
的替代品,而是做爲對 ThreadPoolExecutor
的補充。
總結:ForkJoinPool
和 ThreadPoolExecutor
都是 ExecutorService
(線程池),但ForkJoinPool
的獨特色在於:
ThreadPoolExecutor
只能執行 Runnable
和 Callable
任務,而 ForkJoinPool
不只能夠執行 Runnable
和 Callable
任務,還能夠執行 Fork/Join 型任務 —— ForkJoinTask
—— 從而知足並行地實現分治算法的須要;ThreadPoolExecutor
中任務的執行順序是按照其在共享隊列中的順序來執行的,因此後面的任務須要等待前面任務執行完畢後才能執行,而 ForkJoinPool
每一個線程有本身的任務隊列,並在此基礎上實現了 Work-Stealing 的功能,使得在某些狀況下 ForkJoinPool
能更大程度的提升併發效率。