Java中J.U.C擴展組件之Fork,join

Fork/join介紹

Fork/join框架是 java7提供的並行執行任務的框架,是把大任務分割成若干小任務,最後彙總若干小任務的執行結果獲得最終的結果。它的思想與 MapReduce相似。 Fork把一個大任務分割成若干小任務, Join用於合併小任務的結果,最後獲得大框架的結果。主要採起工做竊取算法。

工做竊取(work-stealing)算法是指某個線程從其它隊列竊取任務執行。java

img

假如咱們須要作一個比較大的任務,咱們能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,因而把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,好比A線程負責處理A隊列裏的任務。可是有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

工做竊取算法的優勢是充分利用線程進行並行計算,並減小了線程間的競爭,其缺點是在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且消耗了更多的系統資源,好比建立多個線程和多個雙端隊列。算法

對於Fork/Join框架而言,當一個任務正在等待它使用Join操做建立的子任務結束時,執行這個任務的工做線程,尋找其餘並未被執行的任務,並開始執行,經過這種方式,線程充分利用它們的運行時間,來提升應用程序的性能。爲了實現這個目標,Fork/Join框架執行的任務有一些侷限性:微信

  • 任務只能使用Fork、Join操做來做爲同步機制,若是使用了其餘同步機制,那他們在同步操做時,工做線程則不能執行其餘任務。如:在框架的操做中,使任務進入睡眠,那麼在這個睡眠期間內,正在執行這個任務的工做線程,將不會執行其餘任務
  • 所執行的任務,不該該執行IO操做,如讀和寫數據文件
  • 任務不能拋出檢查型異常,必須經過必要的代碼處理它們
核心是兩個類: ForkJoinTaskForkJoinPool。Pool主要負責實現,包括上面所介紹的工做竊取算法,管理工做線程和提供關於任務的狀態以及它們的執行信息;Task主要提供在任務中,執行Fork與Join操做的機制。

img

Fork/join代碼演示

package com.rumenz.task;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;


public class ForkJoinExample extends RecursiveTask<Integer> {

    public final static int threshold=2;
    private int start;
    private int end;

    public ForkJoinExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum=0;
        boolean b = (end - start) <= threshold;
        if(b){
            //任務足夠小的時候,直接計算,不進行分裂計算
            for (int i = start; i <=end ; i++) {
                sum+=i;
            }
        }else{
            int mid=(start+end)/2;
            //繼續分裂任務
            ForkJoinExample task1=new ForkJoinExample(start,mid);
            ForkJoinExample task2=new ForkJoinExample(mid+1,end);

            // 執行子任務
            task1.fork();
            task2.fork();

            // 等待任務執行結束合併其結果
            Integer m = task1.join();
            Integer n = task2.join();
            sum=m+n;

        }
        return sum;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //生成一個池
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        
        ForkJoinTask task=new ForkJoinExample(1, 100000);
        ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
        Integer sum = submit.get();
        System.out.println("最後的結果是:"+sum);

    }
}
經過這個例子讓咱們再來進一步瞭解ForkJoinTask,任務類繼承RecursiveTask,ForkJoinTask與通常的任務的主要區別在於它須要實現compute()方法,在這個方法裏,首先須要判斷任務是否足夠小,若是足夠小就直接執行任務。若是不足夠小,就必須分割成兩個子任務,每一個子任務在調用fork()方法時,又會進入compute()方法,看看當前子任務是否須要繼續分割成孫任務,若是不須要繼續分割,則執行當前子任務並返回結果。使用join()方法會等待子任務執行完並獲得其結果。

關注微信公衆號:【入門小站】,解鎖更多知識點框架

相關文章
相關標籤/搜索