淺談ForkJoinPool

ForkJoinPool是什麼?

談到線程池,不少人會想到Executors提供的一些預設的線程池,好比單線程線程池SingleThreadExecutor,固定大小的線程池FixedThreadPool,可是不多有人會注意到其中還提供了一種特殊的線程池:WorkStealingPool,咱們點進這個方法,會看到和其餘方法不一樣的是,這種線程池並非經過ThreadPoolExecutor來建立的,而是ForkJoinPool來建立的:算法

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
複製代碼

這兩種線程池之間並非繼承關係,而是平級關係:數組

ThreadPoolExecutor應該都很瞭解了,就是一個基本的存儲線程的線程池,須要執行任務的時候就從線程池中拿一個線程來執行。而ForkJoinPool則不單單是這麼簡單,一樣也不是ThreadPoolExecutor的代替品,這種線程池是爲了實現「 分治法」這一思想而建立的,經過把大任務拆分紅小任務,而後再把小任務的結果彙總起來就是最終的結果,和MapReduce的思想很相似

舉個例子,咱們要統計1-100的累加和,若是使用ForkJoinPool來實現的話,就能夠將1-100每5位劃分一段,劃分出20段,看成20個任務,每一個任務只計算本身區間內的結果,最後將這20個任務的結果彙總起來就是1-100的累加和bash

ForkJoinPool怎麼使用?

ForkJoinPool的本質就是兩點:併發

  1. 若是任務很小:直接計算得出結果
  2. 若是任務很大
    • 拆分紅N個子任務
    • 調用子任務的fork()進行計算
    • 調用子任務的join()合併結果

接來下咱們來作一個1-100的累加例子:異步

  1. 首先定義咱們須要執行的任務:
class Task extends RecursiveTask<Integer> {

    private int start;

    private int end;
    private int mid;

    public Task(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - start < 6) {
            // 當任務很小時,直接進行計算
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println(Thread.currentThread().getName() + " count sum: " + sum);
        } else {
            // 不然,將任務進行拆分
            mid = (end - start) / 2 + start;
            Task left = new Task(start, mid);
            Task right = new Task(mid + 1, end);

            // 執行上一步拆分的子任務
            left.fork();
            right.fork();

            // 拿到子任務的執行結果
            sum += left.join();
            sum += right.join();
        }

        return sum;
    }
}
複製代碼

這裏的RecursiveTaskForkJoinTask的子類,ForkJoinTask又是Future的子類,不瞭解Future類的能夠認爲是一個異步執行,而且能夠有返回值的Runnable類ide

咱們首先在Task類中定義了任務須要的一些數據,好比開始位置和結束位置。重點是其中的compute方法,在其中實現了咱們剛纔說到的步驟,若是任務很小(經過任務數據來判斷),就進行計算,不然將任務拆分,使用fork()執行,並經過join()拿到計算結果高併發

  1. 將任務提交到線程池

剛纔咱們定義了任務類,接下來就須要把這個任務提交到線程池:ui

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        Task countTask = new Task(1, 100);
        ForkJoinTask<Integer> result = forkJoinPool.submit(countTask);

        System.out.println("result: " + result.get());

        forkJoinPool.shutdown();
    }
複製代碼

注意,這裏ForkJoinPool初始化能夠傳入一個並行參數,若是不傳入該參數的話會默認使用處理器個數來做爲並行參數this

建立任務對象和線程池以後,使用submit方法來提交任務,該方法會返回一個ForkJoinTask<T>類型的對象,調用其get方法便可拿到執行結果spa

同時要注意,該線程池也須要調用shutdown方法來關閉

ForkJoinPool的原理

ForkJoinPool中有三個重要角色:

  • ForkJoinWorkerThread:工做線程,在內部對Thread進行的封裝
  • WorkQueue:任務隊列
  • ForkJoinTask:任務,繼承自Future,在含義上分爲submission和task兩種

在線程池中,任務隊列使用數組來保存,其中保存了全部提交進來的任務:

  1. 奇數位置保存submission
  2. 偶數位置保存task

submission指的是本地提交的任務,如submit、execute提交的任務;而task則是經過fork方法添加的子任務。這兩種任務僅僅在含義上有所區別,因此一同保存在任務隊列中,經過位置進行區分

ForkJoinPool的核心

想理解ForkJoinPool的原理,就要理解其核心,一共有兩點,其一是分治法,其二就是工做竊取算法。分治法相信就不用多說了,就是經過把大任務拆分紅小任務來提升併發度。重點要說的就是工做竊取算法,該算法的原理:

全部線程均嘗試找到並執行已提交的任務,或是經過其餘任務建立的子任務

依賴於這種特性,來儘可能避免一個線程執行完本身的任務後「無所事事」的狀況。同時,竊取順序是FIFO的

相關文章
相關標籤/搜索