併發編程之fork/join(分而治之)

1.什麼是分而治之

分而治之就是將一個大任務層層拆分紅一個個的小任務,直到不可拆分,拆分依據定義的閾值劃分任務規模。數組

fork/join經過fork將大任務拆分紅小任務,在將小任務的結果join彙總框架

2.fork/join標準範式

先上圖異步

在使用fork/join作任務分配以前,首先得了解其中的幾個類:ide

ForkJoinPool:充當fork/join框架裏面的管理者,最原始的任務都要交給它才能處理。它負責控制整個fork/join有多少個workerThread,workerThread的建立,激活都是由它來掌控。它還負責workQueue隊列的建立和分配,每當建立一個workerThread,它負責分配相應的workQueue。而後它把接到的活都交給workerThread去處理,它能夠說是整個frok/join的容器。this

        ForkJoinWorkerThread:fork/join裏面真正幹活的"工人",本質是一個線程。裏面有一個ForkJoinPool.WorkQueue的隊列存放着它要乾的活,接活以前它要向ForkJoinPool註冊(registerWorker),拿到相應的workQueue。而後就從workQueue裏面拿任務出來處理。它是依附於ForkJoinPool而存活,若是ForkJoinPool的銷燬了,它也會跟着結束。spa

 ForkJoinPool.WorkQueue: 雙端隊列就是它,它負責存儲接收的任務。線程

ForkJoinTask:表明fork/join裏面任務類型,咱們通常用它的兩個子類RecursiveTask、RecursiveAction。這兩個區別在於RecursiveTask任務是有返回值,RecursiveAction沒有返回值。任務的處理邏輯包括任務的切分都集中在compute()方法裏面。code

 

3.廢話很少說,代碼走起

  fork/join在平時的使用過程當中,通常分爲同步調用和異步調用,下面是兩種狀況的實例:blog


/**同步用法*/
1 public class SumArray { 2 private static class SumTask extends RecursiveTask<Integer>{ 3 4 private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10; 5 private int[] src; //表示咱們要實際統計的數組 6 private int fromIndex;//開始統計的下標 7 private int toIndex;//統計到哪裏結束的下標 8 9 public SumTask(int[] src, int fromIndex, int toIndex) { 10 this.src = src; 11 this.fromIndex = fromIndex; 12 this.toIndex = toIndex; 13 } 14 15 @Override 16 protected Integer compute() { 17 if(toIndex-fromIndex < THRESHOLD) { 18 int count = 0; 19 for(int i=fromIndex;i<=toIndex;i++) { 20 //SleepTools.ms(1); 21 count = count + src[i]; 22 } 23 return count; 24 }else { 25 //fromIndex....mid....toIndex 26 //1...................70....100 27 int mid = (fromIndex+toIndex)/2;
//將任務一分爲二
28 SumTask left = new SumTask(src,fromIndex,mid); 29 SumTask right = new SumTask(src,mid+1,toIndex); 30 invokeAll(left,right); //提交任務 31 return left.join()+right.join(); 32 } 33 } 34 } 35 36 public static void main(String[] args) { 37 38 ForkJoinPool pool = new ForkJoinPool(); 39 int[] src = MakeArray.makeArray(); 40 41 SumTask innerFind = new SumTask(src,0,src.length-1); 42 43 long start = System.currentTimeMillis(); 44 45 int a = pool.invoke(innerFind);//同步調用 46 System.out.println("Task is Running....."); 47 System.out.println("The count is "+innerFind.join() 48 +" spend time:"+(System.currentTimeMillis()-start)+"ms"+a); 49 } 50 }

 

/**
 *異步用法
 *
 *類說明:遍歷指定目錄(含子目錄)找尋指定類型文件
 */
public class FindDirsFiles extends RecursiveAction{

    private File path;//當前任務須要搜尋的目錄

    public FindDirsFiles(File path) {
        this.path = path;
    }

    public static void main(String [] args){
        try {
            // 用一個 ForkJoinPool 實例調度總任務
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("F:/"));

            pool.execute(task);//異步調用

            System.out.println("Task is Running......");
            Thread.sleep(1);
            int otherWork = 0;
            for(int i=0;i<100;i++){
                otherWork = otherWork+i;
            }
            System.out.println("Main Thread done sth......,otherWork="+otherWork);
            task.join();//阻塞的方法
            System.out.println("Task end");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void compute() {
        
        List<FindDirsFiles> subTasks = new ArrayList<>();
        
        File[] files = path.listFiles();
        if(files!=null) {
            for(File file:files) {
                if(file.isDirectory()) {
                    subTasks.add(new FindDirsFiles(file));
                }else {
                    //遇到文件,檢查
                    if(file.getAbsolutePath().endsWith("txt")) {
                        System.out.println("文件:"+file.getAbsolutePath());
                    }
                }
            }
            if(!subTasks.isEmpty()) {
                for(FindDirsFiles subTask:invokeAll(subTasks)) {
                    subTask.join();//等待子任務執行完成
                }
            }
        }


        
    }
}

 

 這段代碼能夠直接運行試試,跟上面的標準範式同樣,在這裏我是實現了RecursiveTask(有興趣的能夠本身改爲RecursiveAction玩玩,可是RecursiveAction是沒有返回值的,使用的時候須要注意),當調用ForkJoinPool的invoke方法啓動任務,會同步調用重寫的compute方法,這個方法裏面纔是你要寫的fork/join業務代碼。遞歸

能夠看到,我定義了一個閾值THRESHOLD,當任務小於這個閾值的時候,執行運算,不然繼續切分任務,提交任務,循環調用,直到任務不可切分,將全部的運算結果整合。其實我在調用invokeAll方法時,並不會馬上返回結果,裏面仍是會去重複判斷每個任務是否小於閾值,當全部的任務都知足條件並執行完成,纔會返回,其實就是遞歸調用。

 

總結:

    fork/join的使用其實沒什麼難度,其基本思想是將大任務分割成小任務,最後將小任務聚合起來獲得結果。fork是分解的意思, join是收集的意思. 它很是相似於HADOOP提供的MapReduce框架,只是MapReduce的任務能夠針對集羣內的全部計算節點,能夠充分利用集羣的能力完成計算任務。ForkJoin更加相似於單機版的MapReduce。

相關文章
相關標籤/搜索