Fork-join框架

這是一個JDK7引入的並行框架,它把流程劃分紅fork(分解)+join(合併)兩個步驟(怎麼那麼像MapReduce?),傳統線程池來實現一個並行任務的時候,常常須要花費大量的時間去等待其餘線程執行任務的完成,可是fork-join框架使用work stealing技術緩解了這個問題:java

  1. 每一個工做線程都有一個雙端隊列,當分給每一個任務一個線程去執行的時候,這個任務會放到這個隊列的頭部;
  2. 當這個任務執行完畢,須要和另一個任務的結果執行合併操做,但是那個任務卻沒有執行的時候,不會幹等,而是把另外一個任務放到隊列的頭部去,讓它儘快執行;
  3. 當工做線程的隊列爲空,它會嘗試從其餘線程的隊列尾部偷一個任務過來;
  4. 取得的任務能夠被進一步分解。
  • ForkJoinPool.class,ForkJoin框架的任務池,ExecutorService的實現類
  • ForkJoinTask.class,Future的子類,框架任務的抽象
  • ForkJoinWorkerThread.class,工做線程
  • RecursiveTask.class,ForkJoinTask的實現類,compute方法有返回值,下文中有例子
  • RecursiveAction.class,ForkJoinTask的實現類,compute方法無返回值,只須要覆寫compute方法,對於可繼續分解的子任務,調用coInvoke方法完成(參數是RecursiveAction子類對象的可變數組):

  依靠應用程序自己並行拆封任務,若是使用簡單的多線程程序的方法,複雜度必然很大。這就須要一個更好的範式或者工具來代程序員處理這類問題。Java 7也意識到了這個問題,才標準庫中集成了由Doug Lea開發的Fork/Join並行計算框架。經過使用 Fork/Join 模式,軟件開發人員可以方便地利用多核平臺的計算能力。儘管尚未作到對軟件開發人員徹底透明,Fork/Join 模式已經極大地簡化了編寫併發程序的瑣碎工做。對於符合 Fork/Join 模式的應用,軟件開發人員再也不須要處理各類並行相關事務,例如同步、通訊等,以難以調試而聞名的死鎖和 data race 等錯誤也就不會出現,提高了思考問題的層次。你能夠把 Fork/Join 模式看做並行版本的 Divide and Conquer 策略,僅僅關注如何劃分任務和組合中間結果,將剩下的事情丟給 Fork/Join 框架。可是Fork/Join並行計算框架,並非銀彈,並不能解決全部應用程序在超多核心處理器上的併發問題。程序員

    若是一個應用能被分解成多個子任務,而且組合多個子任務的結果就可以得到最終的答案,那麼這個應用就適合用 Fork/Join 模式來解決。其原理以下圖。
數組

    應用程序開發者須要作的就是拆分任務並組合每一個子任務的中間結果,而不用再考慮線程和鎖的問題。多線程

一個簡單的例子

咱們首先看一個簡單的Fork/Join的任務定義。併發

Java代碼 複製代碼  收藏代碼
  1. public class Calculator extends RecursiveTask<Integer> {   
  2.   
  3.     private static final int THRESHOLD = 100;   
  4.     private int start;   
  5.     private int end;   
  6.   
  7.     public Calculator(int start, int end) {   
  8.         this.start = start;   
  9.         this.end = end;   
  10.     }   
  11.   
  12.     @Override  
  13.     protected Integer compute() {   
  14.         int sum = 0;   
  15.         if((start - end) < THRESHOLD){   
  16.             for(int i = start; i< end;i++){   
  17.                 sum += i;   
  18.             }   
  19.         }else{   
  20.             int middle = (start + end) /2;   
  21.             Calculator left = new Calculator(start, middle);   
  22.             Calculator right = new Calculator(middle + 1, end);   
  23.             left.fork();   
  24.             right.fork();   
  25.   
  26.             sum = left.join() + right.join();   
  27.         }   
  28.         return sum;   
  29.     }   
  30.   
  31. }  
public class Calculator extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 100;
    private int start;
    private int end;

    public Calculator(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if((start - end) < THRESHOLD){
            for(int i = start; i< end;i++){
                sum += i;
            }
        }else{
            int middle = (start + end) /2;
            Calculator left = new Calculator(start, middle);
            Calculator right = new Calculator(middle + 1, end);
            left.fork();
            right.fork();

            sum = left.join() + right.join();
        }
        return sum;
    }

}

 

    這段代碼中,定義了一個累加的任務,在compute方法中,判斷當前的計算範圍是否小於一個值,若是是則計算,若是沒有,就把任務拆分爲連個子任務,併合並連個子任務的中間結果。程序遞歸的完成了任務拆分和計算。框架

    任務定義以後就是執行任務,Fork/Join提供一個和Executor框架 的擴展線程池來執行任務。dom

Java代碼 複製代碼  收藏代碼
  1. @Test  
  2. public void run() throws Exception{   
  3.     ForkJoinPool forkJoinPool = new ForkJoinPool();   
  4.     Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000));   
  5.   
  6.     assertEquals(new Integer(49995000), result.get());   
  7. }  
    @Test
    public void run() throws Exception{
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000));

        assertEquals(new Integer(49995000), result.get());
    }

 

Fork/Join框架的主要類


RecursiveAction供不須要返回值的任務繼續。ide

RecursiveTask經過泛型參數設置計算的返回值類型。工具

ForkJoinPool提供了一系列的submit方法,計算任務。ForkJoinPool默認的線程數經過Runtime.availableProcessors()得到,由於在計算密集型的任務中,得到多於處理性核心數的線程並不能得到更多性能提高。性能

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    doSubmit(task);
    return task;
}

sumit方法返回了task自己,ForkJoinTask實現了Future接口,因此能夠經過它等待得到結果。

另外一例子

這個例子並行排序數組,不須要返回結果,因此繼承了RecursiveAction。

Java代碼 複製代碼  收藏代碼
  1. public class SortTask extends RecursiveAction {   
  2.     final long[] array;   
  3.     final int start;   
  4.     final int end;   
  5.     private int THRESHOLD = 100; //For demo only   
  6.   
  7.     public SortTask(long[] array) {   
  8.         this.array = array;   
  9.         this.start = 0;   
  10.         this.end = array.length - 1;   
  11.     }   
  12.   
  13.     public SortTask(long[] array, int start, int end) {   
  14.         this.array = array;   
  15.         this.start = start;   
  16.         this.end = end;   
  17.     }   
  18.   
  19.     protected void compute() {   
  20.         if (end - start < THRESHOLD)   
  21.             sequentiallySort(array, start, end);   
  22.         else {   
  23.             int pivot = partition(array, start, end);   
  24.             new SortTask(array, start, pivot - 1).fork();   
  25.             new SortTask(array, pivot + 1, end).fork();   
  26.         }   
  27.     }   
  28.   
  29.     private int partition(long[] array, int start, int end) {   
  30.         long x = array[end];   
  31.         int i = start - 1;   
  32.         for (int j = start; j < end; j++) {   
  33.             if (array[j] <= x) {   
  34.                 i++;   
  35.                 swap(array, i, j);   
  36.             }   
  37.         }   
  38.         swap(array, i + 1, end);   
  39.         return i + 1;   
  40.     }   
  41.   
  42.     private void swap(long[] array, int i, int j) {   
  43.         if (i != j) {   
  44.             long temp = array[i];   
  45.             array[i] = array[j];   
  46.             array[j] = temp;   
  47.         }   
  48.     }   
  49.   
  50.     private void sequentiallySort(long[] array, int lo, int hi) {   
  51.         Arrays.sort(array, lo, hi + 1);   
  52.     }   
  53. }  
public class SortTask extends RecursiveAction {
    final long[] array;
    final int start;
    final int end;
    private int THRESHOLD = 100; //For demo only

    public SortTask(long[] array) {
        this.array = array;
        this.start = 0;
        this.end = array.length - 1;
    }

    public SortTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    protected void compute() {
        if (end - start < THRESHOLD)
            sequentiallySort(array, start, end);
        else {
            int pivot = partition(array, start, end);
            new SortTask(array, start, pivot - 1).fork();
            new SortTask(array, pivot + 1, end).fork();
        }
    }

    private int partition(long[] array, int start, int end) {
        long x = array[end];
        int i = start - 1;
        for (int j = start; j < end; j++) {
            if (array[j] <= x) {
                i++;
                swap(array, i, j);
            }
        }
        swap(array, i + 1, end);
        return i + 1;
    }

    private void swap(long[] array, int i, int j) {
        if (i != j) {
            long temp = array[i];
            array[i] = array[j];
            array[j] = temp;
        }
    }

    private void sequentiallySort(long[] array, int lo, int hi) {
        Arrays.sort(array, lo, hi + 1);
    }
}

 

Java代碼 複製代碼  收藏代碼
  1. @Test  
  2. public void run() throws InterruptedException {   
  3.     ForkJoinPool forkJoinPool = new ForkJoinPool();   
  4.     Random rnd = new Random();   
  5.     long[] array = new long[SIZE];   
  6.     for (int i = 0; i < SIZE; i++) {   
  7.         array[i] = rnd.nextInt();   
  8.     }   
  9.     forkJoinPool.submit(new SortTask(array));   
  10.   
  11.     forkJoinPool.shutdown();   
  12.     forkJoinPool.awaitTermination(1000, TimeUnit.SECONDS);   
  13.   
  14.     for (int i = 1; i < SIZE; i++) {   
  15.         assertTrue(array[i - 1] < array[i]);   
  16.     }   
  17. }  
相關文章
相關標籤/搜索