分而治之Fork-Join 框架

什麼是 Fork-Join

Fork/Join框架是Java7提供了的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架,這種開發方法也叫分治編程。分治編程能夠極大地利用CPU資源,提升任務執行的效率,也是目前與多線程有關的前沿技術。java

傳統的分治編程會遇到什麼問題

分治的原理上面已經介紹了,就是切割大任務成小任務來完成。看起來好像也不難實現啊!爲何專門弄一個新的框架呢?算法

咱們先看一下,在不使用 Fork-Join 框架時,使用普通的線程池是怎麼實現的。編程

  1. 咱們往一個線程池提交了一個大任務,規定好任務切割的閥值。
  2. 由線程池中線程(假設是線程A)執行大任務,發現大任務的大小大於閥值,因而切割成兩個子任務,並調用 submit() 提交到線程池,獲得返回的子任務的 Future。
  3. 線程A就調用 返回的 Future 的 get() 方法阻塞等待子任務的執行結果。
  4. 池中的其餘線程(除線程A外,線程A被阻塞)執行兩個子任務,而後判斷子任務的大小有沒有超過閥值,若是超過,則按照步驟2繼續切割,不然,才計算並返回結果。

咦,好像一切都很美好。真的嗎?別忘了, 每個切割任務的線程(如線程A)都被阻塞了,直到其子任務完成,才能繼續往下運行 。若是任務太大了,須要切割屢次,那麼就會有多個線程被阻塞,性能將會急速降低。更糟糕的是,若是你的線程池的線程數量是有上限的,很可能會形成池中全部線程被阻塞,線程池沒法執行任務。多線程

普通線程池實現分治時阻塞的問題

public class CountTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //建立一個計算任務,計算 由1加到12
        CountTask countTask = new CountTask(1, 12);
        Future<Integer> future = forkJoinPool.submit(countTask);
        System.out.println("最終的計算結果:" + future.get());
    }
}

class CountTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 2;
    private int start;
    private int end;


    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;

        //任務已經足夠小,能夠直接計算,並返回結果
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println("執行計算任務,計算 " + start + "到 " + end + "的和 ,結果是:" + sum + " 執行此任務的線程:" + Thread.currentThread().getName());
            return sum;
        } else { //任務過大,須要切割
            System.out.println("任務過大,切割的任務: " + start + "加到 " + end + "的和 執行此任務的線程:" + Thread.currentThread().getName());
            int middle = (start + end) / 2;
            //切割成兩個子任務
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //執行子任務
            leftTask.fork();
            rightTask.fork();
            //等待子任務的完成,並獲取執行結果
            invokeAll(leftTask,rightTask);
// int leftResult = leftTask.join();
// int rightResult = rightTask.join();
            //合併子任務
// sum = leftResult + rightResult;
// return sum;
            return leftTask.join()+rightTask.join();
        }

    }
}
複製代碼

運行結果:併發

切割的任務:1加到10   執行此任務的線程是 pool-1-thread-1
切割的任務:1加到5   執行此任務的線程是 pool-1-thread-2
切割的任務:6加到10   執行此任務的線程是 pool-1-thread-3
複製代碼

池的線程只有三個,當任務分割了三次後,池中的線程也就都被阻塞了,沒法再執行任何任務,一直卡着動不了。框架

爲了解決這個問題,工做竊取算法呼之欲出異步

工做竊取算法

針對上面的問題,Fork-Join 框架使用了「工做竊取(work-stealing)」算法。工做竊取(work-stealing)算法是指某個線程從其餘隊列裏竊取任務來執行。在《Java 併發編程的藝術》對工做竊取算法的解釋:ide

使用工做竊取算法有什麼優點呢?假如咱們須要作一個比較大的任務,咱們能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,因而把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,好比A線程負責處理A隊列裏的任務。可是有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。性能

Fork-Join 框架使用工做竊取算法

  1. Fork-Join 框架的線程池ForkJoinPool 的任務分爲「外部任務」 和 「內部任務」。
  2. 「外部任務」是放在 ForkJoinPool 的全局隊列裏;
  3. ForkJoinPool 池中的每一個線程都維護着一個內部隊列,用於存放「內部任務」。
  4. 線程切割任務獲得的子任務就會做爲「內部任務」放到內部隊列中。
  5. 當此線程要想要拿到子任務的計算結果時,先判斷子任務有沒有完成,若是沒有完成,則再判斷子任務有沒有被其餘線程「竊取」,一旦子任務被竊取了則去執行本線程「內部隊列」的其餘任務,或者掃描其餘的任務隊列,竊取任務,若是子任務沒有被竊取,則由本線程來完成。
  6. 最後,當線程完成了其「內部任務」,處於空閒的狀態時,就會去掃描其餘的任務隊列,竊取任務

工做竊取算法的優勢

Fork-Join 框架中的工做竊取算法的優勢能夠總結爲如下兩點:this

  1. 線程是不會由於等待某個子任務的完成或者沒有內部任務要執行而被阻塞等待、掛起,而是會掃描全部的隊列,竊取任務,直到全部隊列都爲空時,纔會被掛起。
  2. Fork-Join 框架在多CPU的環境下,能提供很好的並行性能。在使用普通線程池的狀況下,當CPU再也不是性能瓶頸時,能並行地運行多個線程,然而卻由於要互斥訪問一個任務隊列而致使性能提升不上去。而 Fork-Join 框架爲每一個線程爲維護着一個內部任務隊列,以及一個全局的任務隊列,並且任務隊列都是雙向隊列,可從首尾兩端來獲取任務,極大地減小了競爭的可能性,提升並行的性能。

Fork-Join 框架的使用介紹

Fork/Join有三個核心類:

  1. ForkJoinPool: 執行任務的線程池,繼承了 AbstractExecutorService 類。
  2. ForkJoinWorkerThread: 執行任務的工做線程(即ForkJoinPool線程池裏的線程)。每一個線程都維護着一個內部隊列,用於存放「內部任務」。繼承了 Thread 類。
  3. ForkJoinTask: 一個用於ForkJoinPool的任務抽象類。實現了 Future 接口

由於ForkJoinTask比較複雜,抽象方法比較多,平常使用時通常不會繼承ForkJoinTask來實現自定義的任務,而是繼承ForkJoinTask的兩個子類,實現 compute() 方法:

RecursiveTask: 子任務帶返回結果時使用
RecursiveAction: 子任務不帶返回結果時使用
複製代碼

compute 方法的實現模式通常是:

if 任務足夠小
    直接返回結果
else
    分割成N個子任務
    依次調用每一個子任務的fork方法執行子任務
    依次調用每一個子任務的join方法合併執行結果
複製代碼

Fork-Join 例子演示

計算 1+2+....+12 的結果。

使用Fork/Join框架首先要考慮到的是如何分割任務,若是咱們但願每一個子任務最多執行兩個數的相加,那麼咱們設置分割的閾值是2,因爲是12個數字相加。同時,觀察執行任務的線程名稱,理解工做竊取算法的實現。

public class CountTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //建立一個計算任務,計算 由1加到12
        CountTask countTask = new CountTask(1, 12);
        Future<Integer> future = forkJoinPool.submit(countTask);
        System.out.println("最終的計算結果:" + future.get());
    }
}

class CountTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 2;
    private int start;
    private int end;


    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;

        //任務已經足夠小,能夠直接計算,並返回結果
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println("執行計算任務,計算 " + start + "到 " + end + "的和 ,結果是:" + sum + " 執行此任務的線程:" + Thread.currentThread().getName());

        } else { //任務過大,須要切割
            System.out.println("任務過大,切割的任務: " + start + "加到 " + end + "的和 執行此任務的線程:" + Thread.currentThread().getName());
            int middle = (start + end) / 2;
            //切割成兩個子任務
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //執行子任務
            leftTask.fork();
            rightTask.fork();
            //等待子任務的完成,並獲取執行結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合併子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }
}

複製代碼

運行結果:

任務過大,切割的任務: 1加到 12的和 執行此任務的線程:ForkJoinPool-1-worker-1
任務過大,切割的任務: 7加到 12的和 執行此任務的線程:ForkJoinPool-1-worker-3
任務過大,切割的任務: 1加到 6的和 執行此任務的線程:ForkJoinPool-1-worker-2
執行計算任務,計算 7到 9的和 ,結果是:24 執行此任務的線程:ForkJoinPool-1-worker-3
執行計算任務,計算 1到 3的和 ,結果是:6 執行此任務的線程:ForkJoinPool-1-worker-1
執行計算任務,計算 4到 6的和 ,結果是:15 執行此任務的線程:ForkJoinPool-1-worker-1
執行計算任務,計算 10到 12的和 ,結果是:33 執行此任務的線程:ForkJoinPool-1-worker-3
最終的計算結果:78
複製代碼

從結果能夠看出,提交的計算任務是由線程1執行,線程1進行了第一次切割,切割成兩個子任務 「7加到12「 和 」1加到6「,並提交這兩個子任務。而後這兩個任務便被 線程二、線程3 給竊取了。線程1 的內部隊列中已經沒有任務了,這時候,線程二、線程3 也分別進行了一次任務切割並各自提交了兩個子任務,因而線程1也去竊取任務(這裏竊取的都是線程2的子任務)。

RecursiveAction 演示

遍歷指定目錄(含子目錄)找尋指定類型文件

public class FindDirsFiles extends RecursiveAction{

    /** * 當前任務須要搜尋的目錄 */
    private File path;

    public FindDirsFiles(File path) {
        this.path = path;
    }

    public static void main(String [] args){
        try {
            // 用一個 ForkJoinPool 實例調度總任務
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("D:/"));

            //異步調用
            pool.execute(task);

            System.out.println("Task is Running......");
            Thread.sleep(1);
            int otherWork = 0;
            for(int i=0;i<1000000;i++){
                otherWork = otherWork+i;
            }
            System.out.println("Main Thread done sth......,otherWork=" + otherWork);
            //阻塞的方法
            task.join();
            System.out.println("Task end");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

	@Override
	protected void compute() {
		
		List<FindDirsFiles> subTasks = new ArrayList<>();
		
		File[] files = path.listFiles();
		if(files!=null) {
			for(File file:files) {
				if(file.isDirectory()) {
					subTasks.add(new FindDirsFiles(file));
				}else {
					//遇到文件,檢查
					if(file.getAbsolutePath().endsWith("txt")) {
						System.out.println("文件:"+file.getAbsolutePath());
					}
				}
			}
			if(!subTasks.isEmpty()) {
                for (FindDirsFiles subTask : invokeAll(subTasks)) {
                    //等待子任務執行完成
                    subTask.join();
                }
			}
		}


		
	}
}

複製代碼
相關文章
相關標籤/搜索