談到線程池,不少人會想到Executors提供的一些預設的線程池,好比單線程線程池SingleThreadExecutor
,固定大小的線程池FixedThreadPool
,可是不多有人會注意到其中還提供了一種特殊的線程池:WorkStealingPool
,咱們點進這個方法,會看到和其餘方法不一樣的是,這種線程池並非經過ThreadPoolExecutor
來建立的,而是ForkJoinPool
來建立的:算法
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
複製代碼
這兩種線程池之間並非繼承關係,而是平級關係:數組
ThreadPoolExecutor應該都很瞭解了,就是一個基本的存儲線程的線程池,須要執行任務的時候就從線程池中拿一個線程來執行。而ForkJoinPool則不單單是這麼簡單,一樣也不是ThreadPoolExecutor的代替品,這種線程池是爲了實現「 分治法」這一思想而建立的,經過把大任務拆分紅小任務,而後再把小任務的結果彙總起來就是最終的結果,和MapReduce的思想很相似舉個例子,咱們要統計1-100的累加和,若是使用ForkJoinPool來實現的話,就能夠將1-100每5位劃分一段,劃分出20段,看成20個任務,每一個任務只計算本身區間內的結果,最後將這20個任務的結果彙總起來就是1-100的累加和bash
ForkJoinPool的本質就是兩點:併發
接來下咱們來作一個1-100的累加例子:異步
class Task extends RecursiveTask<Integer> {
private int start;
private int end;
private int mid;
public Task(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - start < 6) {
// 當任務很小時,直接進行計算
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println(Thread.currentThread().getName() + " count sum: " + sum);
} else {
// 不然,將任務進行拆分
mid = (end - start) / 2 + start;
Task left = new Task(start, mid);
Task right = new Task(mid + 1, end);
// 執行上一步拆分的子任務
left.fork();
right.fork();
// 拿到子任務的執行結果
sum += left.join();
sum += right.join();
}
return sum;
}
}
複製代碼
這裏的RecursiveTask
是ForkJoinTask
的子類,ForkJoinTask
又是Future
的子類,不瞭解Future類的能夠認爲是一個異步執行,而且能夠有返回值的Runnable類ide
咱們首先在Task類中定義了任務須要的一些數據,好比開始位置和結束位置。重點是其中的compute方法,在其中實現了咱們剛纔說到的步驟,若是任務很小(經過任務數據來判斷),就進行計算,不然將任務拆分,使用fork()執行,並經過join()拿到計算結果高併發
剛纔咱們定義了任務類,接下來就須要把這個任務提交到線程池:ui
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Task countTask = new Task(1, 100);
ForkJoinTask<Integer> result = forkJoinPool.submit(countTask);
System.out.println("result: " + result.get());
forkJoinPool.shutdown();
}
複製代碼
注意,這裏ForkJoinPool初始化能夠傳入一個並行參數,若是不傳入該參數的話會默認使用處理器個數來做爲並行參數this
建立任務對象和線程池以後,使用submit方法來提交任務,該方法會返回一個ForkJoinTask<T>
類型的對象,調用其get方法便可拿到執行結果spa
同時要注意,該線程池也須要調用shutdown方法來關閉
ForkJoinPool中有三個重要角色:
在線程池中,任務隊列使用數組來保存,其中保存了全部提交進來的任務:
submission
指的是本地提交的任務,如submit、execute提交的任務;而task
則是經過fork方法添加的子任務。這兩種任務僅僅在含義上有所區別,因此一同保存在任務隊列中,經過位置進行區分
想理解ForkJoinPool的原理,就要理解其核心,一共有兩點,其一是分治法,其二就是工做竊取算法。分治法相信就不用多說了,就是經過把大任務拆分紅小任務來提升併發度。重點要說的就是工做竊取算法,該算法的原理:
全部線程均嘗試找到並執行已提交的任務,或是經過其餘任務建立的子任務
依賴於這種特性,來儘可能避免一個線程執行完本身的任務後「無所事事」的狀況。同時,竊取順序是FIFO的