java多線程之ForkJoinPool

背景:ForkJoinPool的優點在於,能夠充分利用多cpu,多核cpu的優點,把一個任務拆分紅多個「小任務」,把多個「小任務」放到多個處理器核心上並行執行;當多個「小任務」執行完成以後,再將這些執行結果合併起來便可。這種思想值得學習。算法

主要參考《瘋狂java講義》數組

使用

Java7 提供了ForkJoinPool來支持將一個任務拆分紅多個「小任務」並行計算,再把多個「小任務」的結果合併成總的計算結果。多線程

ForkJoinPool是ExecutorService的實現類,所以是一種特殊的線程池。dom

使用方法:建立了ForkJoinPool實例以後,就能夠調用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法來執行指定任務了。ide

其中ForkJoinTask表明一個能夠並行、合併的任務。ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecusiveAction和RecusiveTask。其中RecusiveTask表明有返回值的任務,而RecusiveAction表明沒有返回值的任務。函數

下面的UML類圖顯示了ForkJoinPool、ForkJoinTask之間的關係:性能

 

舉例

以還行沒有返回值的「大任務」(簡單低打印1~300的數值)爲例,程序將一個「大任務」拆分紅多個「小任務」,並將任務交給ForkJoinPool來執行學習

複製代碼
/**
 * Project Name:Spring0725
 * File Name:ForkJoinPoolAction.java
 * Package Name:work1201.basic
 * Date:2017年12月4日下午2:26:55
 * Copyright (c) 2017, 深圳金融電子結算中心 All Rights Reserved.
 *
*/

package work1201.basic;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

/**
 * ClassName:ForkJoinPoolAction <br/>
 * Function: 使用ForkJoinPool完成一個任務的分段執行
 * 簡單的打印0-300的數值。用多線程實現並行執行
 * Date:     2017年12月4日 下午2:26:55 <br/>
 * @author   prd-lxw
 * @version   1.0
 * @since    JDK 1.7
 * @see      
 */
public class ForkJoinPoolAction {
    
    public static void main(String[] args) throws Exception{
        PrintTask task = new PrintTask(0, 300);
        //建立實例,並執行分割任務
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(task);
         //線程阻塞,等待全部任務完成
        pool.awaitTermination(2, TimeUnit.SECONDS);
        pool.shutdown();
    }
}

/**
 * ClassName: PrintTask <br/>
 * Function: 繼承RecursiveAction來實現「可分解」的任務。
 * date: 2017年12月4日 下午5:17:41 <br/>
 *
 * @author prd-lxw
 * @version 1.0
 * @since JDK 1.7
 */
class PrintTask extends RecursiveAction{
    private static final int THRESHOLD = 50; //最多隻能打印50個數
    private int start;
    private int end;
    
    

    public PrintTask(int start, int end) {
        super();
        this.start = start;
        this.end = end;
    }



    @Override
    protected void compute() {
        
        if(end - start < THRESHOLD){
            for(int i=start;i<end;i++){
                System.out.println(Thread.currentThread().getName()+"的i值:"+i);
            }
        }else {
            int middle =(start+end)/2;
            PrintTask left = new PrintTask(start, middle);
            PrintTask right = new PrintTask(middle, end);
            //並行執行兩個「小任務」
            left.fork();
            right.fork();
        }
        
    }
    
}
複製代碼

 

執行結果:

複製代碼
ForkJoinPool-1-worker-1的i值:262
ForkJoinPool-1-worker-7的i值:75
ForkJoinPool-1-worker-7的i值:76
ForkJoinPool-1-worker-5的i值:225
ForkJoinPool-1-worker-3的i值:187
ForkJoinPool-1-worker-6的i值:150
ForkJoinPool-1-worker-6的i值:151
ForkJoinPool-1-worker-6的i值:152
ForkJoinPool-1-worker-6的i值:153
ForkJoinPool-1-worker-6的i值:154
......
複製代碼

 

由於個人電腦是i7處理器,一共8個cpu,觀察線程的名稱能夠發現,8個cpu都在運行。

經過RecursiveTask的返回值,來對一個長度爲100的數組元素進行累加。

複製代碼
/**
 * Project Name:Spring0725
 * File Name:ForJoinPollTask.java
 * Package Name:work1201.basic
 * Date:2017年12月4日下午5:41:46
 * Copyright (c) 2017, 深圳金融電子結算中心 All Rights Reserved.
 *
*/

package work1201.basic;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

/**
 * ClassName:ForJoinPollTask <br/>
 * Function: 對一個長度爲100的元素值進行累加
 * Date:     2017年12月4日 下午5:41:46 <br/>
 * @author   prd-lxw
 * @version   1.0
 * @since    JDK 1.7
 * @see      
 */
public class ForJoinPollTask {

    public static void main(String[] args) throws Exception {
        int[] arr = new int[100];
        Random random = new Random();
        int total =0;
        //初始化100個數組元素
        for(int i=0,len = arr.length;i<len;i++){
            int temp = random.nextInt(20);
            //對數組元素賦值,並將數組元素的值添加到sum總和中
            total += (arr[i]=temp);
        }
        System.out.println("初始化數組總和:"+total);
        SumTask task = new SumTask(arr, 0, arr.length);
//        建立一個通用池,這個是jdk1.8提供的功能
        ForkJoinPool pool = ForkJoinPool.commonPool();
        Future<Integer> future = pool.submit(task); //提交分解的SumTask 任務
        System.out.println("多線程執行結果:"+future.get());
        pool.shutdown(); //關閉線程池
        
        

    }

}

/**
 * ClassName: SumTask <br/>
 * Function: 繼承抽象類RecursiveTask,經過返回的結果,來實現數組的多線程分段累累加
 *  RecursiveTask 具備返回值
 * date: 2017年12月4日 下午6:08:11 <br/>
 *
 * @author prd-lxw
 * @version 1.0
 * @since JDK 1.7
 */
class SumTask extends RecursiveTask<Integer>{
    private static final int THRESHOLD = 20; //每一個小任務 最多隻累加20個數
    private int arry[];
    private int start;
    private int end;
    
    

    /**
     * Creates a new instance of SumTask.
     * 累加從start到end的arry數組
     * @param arry
     * @param start
     * @param end
     */
    public SumTask(int[] arry, int start, int end) {
        super();
        this.arry = arry;
        this.start = start;
        this.end = end;
    }



    @Override
    protected Integer compute() {
        int sum =0;
        //當end與start之間的差小於threshold時,開始進行實際的累加
        if(end - start <THRESHOLD){
            for(int i= start;i<end;i++){
                sum += arry[i];
            }
            return sum;
        }else {//當end與start之間的差大於threshold,即要累加的數超過20個時候,將大任務分解成小任務
            int middle = (start+ end)/2;
            SumTask left = new SumTask(arry, start, middle);
            SumTask right = new SumTask(arry, middle, end);
            //並行執行兩個 小任務
            left.fork();
            right.fork();
            //把兩個小任務累加的結果合併起來
            return left.join()+right.join();
        }
        
    }
    
}
複製代碼

 

執行結果:

初始化數組總和:1008
多線程執行結果:1008

 

分析

在Java 7中引入了一種新的線程池:ForkJoinPool。

它同ThreadPoolExecutor同樣,也實現了Executor和ExecutorService接口。它使用了一個無限隊列來保存須要執行的任務,而線程的數量則是經過構造函數傳入,若是沒有向構造函數中傳入但願的線程數量,那麼當前計算機可用的CPU數量會被設置爲線程數量做爲默認值。

ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用好比快速排序算法。

這裏的要點在於,ForkJoinPool須要使用相對少的線程來處理大量的任務。

好比要對1000萬個數據進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合併任務。以此類推,對於500萬的數據也會作出一樣的分割處理,到最後會設置一個閾值來規定當數據規模到多少時,中止這樣的分割處理。好比,當元素的數量小於10時,會中止分割,轉而使用插入排序對它們進行排序。

那麼到最後,全部的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它全部的子任務完成以後,它纔可以被執行。

因此當使用ThreadPoolExecutor時,使用分治法會存在問題,由於ThreadPoolExecutor中的線程沒法像任務隊列中再添加一個任務而且在等待該任務完成以後再繼續執行。而使用ForkJoinPool時,就可以讓其中的線程建立新的任務,並掛起當前的任務,此時線程就可以從隊列中選擇子任務執行。

 

以上程序的關鍵是fork()和join()方法。在ForkJoinPool使用的線程中,會使用一個內部隊列來對須要執行的任務以及子任務進行操做來保證它們的執行順序。

那麼使用ThreadPoolExecutor或者ForkJoinPool,會有什麼性能的差別呢?

首先,使用ForkJoinPool可以使用數量有限的線程來完成很是多的具備父子關係的任務,好比使用4個線程來完成超過200萬個任務。可是,使用ThreadPoolExecutor時,是不可能完成的,由於ThreadPoolExecutor中的Thread沒法選擇優先執行子任務,須要完成200萬個具備父子關係的任務時,也須要200萬個線程,顯然這是不可行的。

ps:ForkJoinPool在執行過程當中,會建立大量的子任務,致使GC進行垃圾回收,這些是須要注意的。

相關文章
相關標籤/搜索