Fork/Join 框架框架使用

一、介紹

Fork/Join 框架是 Java7 提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。在多核計算機中正確使用能夠很好的發揮cpu的做用,提升程序的執行效率。框架採用工做竊取算法,當有子任務線程處理完當前任務時,它會從其餘線程執行的任務隊列裏竊取任務來執行,從而提升總體的執行效率。爲了減小線程間的任務資源競爭,隊列一般使用雙端隊列,別竊取任務線程永遠從啥UN廣大UN隊列的嘔吐不獲取任務執行,而竊取任務的線程永遠從雙端隊列的尾部獲取任務執行。java

二、使用

根據業務場景來考慮是否須要使用Fork/Join框架來進行任務的拆分和彙總操做。當須要時,好比說須要執行一個很大的業務計算之類的,此時使用Fork/Join框架分如下兩步:git

  • 對任務進行分割        把大任務分割成子任務,有可能子任務仍是很大,因此還須要不停的分割,直到分割出的子任務足夠小
  • 執行分割的子任務並彙總結果       分割的子任務分別放在雙端隊列裏,而後幾個啓動線程分別從雙端隊列裏獲取任務執行。子任務執行完的結果都統一放在一個隊列裏,啓動一個線程從隊列裏拿數據,而後合併這些數據

具體實現以上兩步:github

  1. 建立ForkJoinTask         它提供在任務中執行 fork() 和 join() 操做的機制,一般狀況下咱們不須要直接繼承 ForkJoinTask 類,而只須要繼承它的子類,Fork/Join 框架提供瞭如下兩個子類:   RecursiveAction:用於沒有返回結果的任務   RecursiveTask :用於有返回結果的任務
  2. 使用ForkJoinPool執行ForkJoinTask       任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,進入隊列的頭部。當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘工做線程的隊列的尾部獲取一個任務

三、示例

3.1 使用fork算法

public class SumTask extends RecursiveTask <Integer>{
    private static final Integer THRESHOLD = 1000;
    private int start;
    private int end;
    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        Integer sum = 0;
        boolean isOk = (end - start) <= THRESHOLD;
        if(isOk) {
            for(int i = start; i <= end; i ++) {
                sum += i;
            }
            return sum;
        }

        int middle = (end + start) / 2;
        //子任務遞歸
        SumTask sumSubTask = new SumTask(start, middle);
        SumTask sumSubTask1 = new SumTask(middle + 1, end);

        //fork子任務
        sumSubTask.fork();
        sumSubTask1.fork();

        //join子任務
        Integer join = sumSubTask.join();
        Integer join1 = sumSubTask1.join();

        sum = join + join1;
        //計算結果
        return sum;
    }
}

3.2 使用invokeAll框架

public class SumTask2 extends RecursiveTask <Integer>{
    private static final Integer THRESHOLD = 1000;
    private int start;
    private int end;
    public SumTask2(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        Integer sum = 0;
        boolean isOk = end - start <= THRESHOLD;
        if(isOk) {
            for(int i = start; i <= end; i ++) {
                sum += i;
            }
//            System.out.println(String.format("compute %d-%d = %d", start, end, sum));
            return sum;
        }

        //除以2
        int middle = (end + start) / 2;
        //子任務遞歸
//        System.out.println(String.format("fork %d-%d => %d-%d&%d-%d", start, end, start, middle - 1, middle, end));
        SumTask2 sumSubTask = new SumTask2(start, middle - 1);
        SumTask2 sumSubTask1 = new SumTask2(middle, end);

        //fork子任務
        invokeAll(sumSubTask, sumSubTask1);

        //join子任務
        Integer join = sumSubTask.join();
        Integer join1 = sumSubTask1.join();

        sum = join + join1;
        //計算結果
        return sum;
    }
}

測試ide

     ForkJoinPool fjp2 = new ForkJoinPool();
        SumTask2 sumTask2 = new SumTask2(start, end);
        long begin3 = System.currentTimeMillis();
        Integer invoke = fjp2.invoke(sumTask2);
        long end3 = System.currentTimeMillis();
        System.out.println("計算結果3爲 sum = " + invoke + ",計算時長爲" + begin3 + "-" + end3 + "---  " + (end3 - begin3) + "ms");

        ForkJoinPool fjp = new ForkJoinPool();
        long begin2 = System.currentTimeMillis();
        SumTask sumTask = new SumTask(start, end);
        ForkJoinTask<Integer> submit = fjp.submit(sumTask);
        Integer join = submit.join();
        long end2 = System.currentTimeMillis();
        System.out.println("計算結果2爲 sum = " + join + ",計算時長爲" + begin2 + "-" + end2 + "---   " + (end2 - begin2) + "ms");
    

結果測試

從結果(能夠屢次運行測試)能夠看出,使用invokeAll方式效率比使用單獨fork方式高,因此在使用時儘可能採用invokeAll方式,這樣能夠充分利用線程池中的線程去執行任務。this

 

源碼參照Github

相關文章
相關標籤/搜索