Fork/Join框架原理

Fork/Join是一個用戶並行執行任務的框架,把一個大任務分割成若干個小人物,最終彙總每一個小任務的結果後獲得大任務的結果。java

講到Fork/Join必需要講工做竊取算法。算法

下面講下什麼叫工做竊取算法:某個線程從其餘隊列裏竊取任務來執行,聽名字感受很奇怪,不經想什麼鬼?黑人問號臉。。。。app

咱們來舉個例子:假如咱們須要作一個比較大的任務,能夠把這個任務分割爲若干互不相依的子任務,爲了減小線程間的競爭,把這些子任務放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列對應。這時A線程負責的A隊列裏的任務都處理完了,而其餘隊列裏還有任務等待處理,幹完活的線程若是乾等着就顯得很傻,老闆也不肯意是不(ps:老闆就喜歡壓榨咱們搬磚的),因而A就去其餘線程的隊列裏竊取一個任務來執行。這裏A和B(被竊取的線程)同時訪問一個隊列會出現競爭問題,那麼爲了不這個問題,A(竊取線程)會從雙端隊列的尾部拿任務,B從頭部拿任務。框架

上面講了理論,那麼在執行的時候其實會有幾個問題,好比ide

1.雙端隊列只有一個任務時會出現競爭this

2.該算法會消耗更多的系統資源,好比建立多個線程和多個雙端隊列spa

好處也顯而易見:充分利用線程並行計算,減小線程間的競爭線程

咱們來看代碼若是實現code

ForkJoinTask:ForkJoin任務類,它提供在任務中執行fork和join的操做機制,一般狀況下咱們不須要直接繼承ForkJoinTask類,只須要繼承它的子類就能夠了,目前提供了兩個子類blog

RecursiveAction:用於沒有返回結果的任務

RecursiveTask:用於有返回結果的任務

ForkJoinPool:ForkJoinTask須要經過ForkJoinTask來執行.

下面看一個例子實現:

首先建立一個任務類

public class CountTask  extends RecursiveTask<Integer> {
    private static final int THRESHOLD=2;//閾值
    private  int start;
    private  int end;
    public  CountTask(int start,int end){
        this.start=start;
        this.end=end;
    }

    @Override
    protected Integer compute() {
        int sum=0;
        //若是任務足夠小就計算任務
        boolean canCompute=(end-start)<=THRESHOLD;
        if(canCompute){
            for (int i=start;i<=end;i++){
                sum+=i;
                try {
                    TimeUnit.SECONDS.sleep(1);
                }catch (Exception e){

                }
            }
        }else {
            int middle=(start+end)/2;
            CountTask leftTest=new CountTask(start,middle);
            CountTask rightTest=new CountTask(middle+1,end);
            //執行子任務
            leftTest.fork();
            rightTest.fork();
            //等待子任務執行完,並獲得其結果
            int leftResult=leftTest.join();
            int rightResult=rightTest.join();
            //合併子任務
            sum=leftResult+rightResult;
        }
        return  sum;
    }
}

  調用類:

@RestController
public class ForkJoinTest {

    private int start = 1;
    private int end = 4;

    @GetMapping("ForkJoinTest")
    public void forkjointest() throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //生成一個計算任務,負責計算1+2+3+4
        CountTask test = new CountTask(start, end);
        //執行任務
        Future<Integer> result = forkJoinPool.submit(test);
        System.out.println(result.get());
    }

    @GetMapping("test")
    public void test() throws InterruptedException {
        int sum = 0;
        for (int i = start; i <=end; i++) {
            sum += i;
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(sum);
    }
}

 

這裏寫了兩個方法,一個是經過ForkJoinTask來執行的,一個沒有,咱們執行看下耗時

方法:forkjointest,參數:[Ljava.lang.Object;@6f277213
10
方法:forkjointest--cost:2009mills
方法:test,參數:[Ljava.lang.Object;@a52eb2f
10
方法:test--cost:4001mills
相關文章
相關標籤/搜索