Java的Fork/Join框架

       當咱們須要執行大量的小任務時,有經驗的Java開發人員都會採用線程池來高效執行這些小任務。然而,有一種任務,例如,對超過1000萬個元素的數組進行排序,這種任務自己能夠併發執行,但如何拆解成小任務須要在任務執行的過程當中動態拆分。這樣,大任務能夠拆成小任務,小任務還能夠繼續拆成更小的任務,最後把任務的結果彙總合併,獲得最終結果,這種模型就是Fork/Join模型。java

Java7引入了Fork/Join框架,咱們經過RecursiveTask這個類就能夠方便地實現Fork/Join模式。數組

 

下面爲一個Fork/Join例子:併發

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * Java7引入了Fork/Join框架,咱們經過RecursiveTask這個類就能夠方便地實現Fork/Join模式。
 * 如:對一個大數組進行並行求和的RecursiveTask
 */
public class SumTask extends RecursiveTask<Long>{

    static final int THRESHOLD = 100;
    long[] array;
    int start;
    int end;

    SumTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {

        System.out.println("當前執行任務的線程name="+Thread.currentThread().getName());
        //先判斷任務是否是足夠小,若是足夠小,就直接計算並返回結果(注意模擬了1秒延時)
        if (end - start <= THRESHOLD) {
            // 若是任務足夠小,直接計算:
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(String.format("compute %d~%d = %d", start, end, sum));
            return sum;
        }
        // 任務太大,一分爲二:
        int middle = (end + start) / 2;
        System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
        SumTask subtask1 = new SumTask(this.array, start, middle);
        SumTask subtask2 = new SumTask(this.array, middle, end);
        invokeAll(subtask1, subtask2);
//        subtask1.fork();
//        subtask2.fork();
        Long subresult1 = subtask1.join();
        Long subresult2 = subtask2.join();
        Long result = subresult1 + subresult2;
        System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
        return result;
    }

    public static void main(String[] args) throws Exception {
        // 建立隨機數組成的數組:
        long[] array = new long[400];
        for(int i=0;i<400;i++){
            array[i]= i;
        }
        // fork/join task:
        ForkJoinPool fjp = new ForkJoinPool(4); // 最大併發數4
        ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
        long startTime = System.currentTimeMillis();
        Long result = fjp.invoke(task);//提交一個Fork/Join任務併發執行,而後得到異步執行的結果
        long endTime = System.currentTimeMillis();
        System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
    }
}

 

    注:invokeAll(subtask1, subtask2); 和 subtask1.fork();  subtask2.fork(); 是有區別的,查看JDK的invokeAll()方法的源碼就能夠發現,invokeAll的N個任務中,其中N-1個任務會使用fork()交給其它線程執行,可是,它還會留一個任務本身執行,這樣,就充分利用了線程池,保證沒有空閒的不幹活的線程,故爲了節約線程池中的線程利用率,應該用invokeAll()方法框架

相關文章
相關標籤/搜索