多線程-Fork/Join

Fork/Join

Java7提供了Fork/Join來支持將一個任務拆分紅多個「小任務」並行計算,再把多個「小任務」的結果合併成總的計算結果。java

類圖

Java7提供了ForkJoinPool來支持將一個任務拆分爲多個小任務並行計算,再把多個小任務的結果合併成總的計算結果。ForkJoinPool是ExecutorService的實現類,所以是一種特殊的線程池。
ForkJoinPool(int n)建立一個包含n個並行線程的ForkJoinPool
ForkJoinPool()建立一個Runtime.availableProcessors()返回值個數的並行線程。
ForkJoinTask表明一個能夠並行、合併的任務,是一個抽象類,它還有兩個抽象子類:RecursiveAction和RecursiveTask。其中RecursiveTask表明有返回值的任務,而RecursiveAction表明沒有返回值的任務。

RecursiveAction實例:

package org.github.lujiango;  
  
import java.util.concurrent.ForkJoinPool;  
import java.util.concurrent.RecursiveAction;  
import java.util.concurrent.TimeUnit;  
  
class PrintTask extends RecursiveAction {  
    private static final long serialVersionUID = 1L;  
    private static final int threshold = 50;  
    private int start;  
    private int end;  
  
    public PrintTask(int start, int end) {  
        this.start = start;  
        this.end = end;  
    }  
  
    @Override  
    protected void compute() {  
        if (end - start < threshold) {  
            for (int i = start; i < end; i++) {  
                System.out.println(Thread.currentThread().getName() + " i: " + i);  
            }  
        } else {  
            int middle = (start + end) / 2;  
            PrintTask left = new PrintTask(start, middle);  
            PrintTask right = new PrintTask(middle, end);  
            left.fork();  
            right.fork();  
        }  
    }  
  
}  
  
public class Test20 {  
  
    public static void main(String[] args) throws InterruptedException {  
        ForkJoinPool pool = new ForkJoinPool();  
        pool.submit(new PrintTask(0, 300));  
        pool.awaitTermination(2, TimeUnit.SECONDS);  
        pool.shutdown();  
    }  
  
}  

分解後的任務分別調用fork()方法開始並行執行。git

RecursiveTask<T>實例:

 
若是大任務是有返回值的任務,則可讓任務繼承RecursiveTask<T>,其中泛型參數T就表明了該任務的返回值類型。
package org.github.lujiango;  
  
import java.util.Random;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.ForkJoinPool;  
import java.util.concurrent.Future;  
import java.util.concurrent.RecursiveTask;  
  
class CalTask extends RecursiveTask<Integer> {  
    private static final long serialVersionUID = 1L;  
    private static final int threshold = 20;  
    private int[] arr;  
    private int start;  
    private int end;  
  
    public CalTask(int[] arr, int start, int end) {  
        this.arr = arr;  
        this.start = start;  
        this.end = end;  
    }  
  
    @Override  
    protected Integer compute() {  
        int sum = 0;  
        if (end - start < threshold) {  
            for (int i = start; i < end; i++) {  
                sum += arr[i];  
            }  
            return sum;  
        } else {  
            int middle = (start + end) / 2;  
            CalTask left = new CalTask(arr, start, middle);  
            CalTask right = new CalTask(arr, middle, end);  
            left.fork();  
            right.fork();  
            return left.join() + right.join();  
        }  
    }  
  
}  
  
public class Test21 {  
  
    public static void main(String[] args) throws InterruptedException, ExecutionException {  
        int[] arr = new int[100];  
        Random random = new Random();  
        int total = 0;  
        for (int i = 0, len = arr.length; i < len; i++) {  
            int tmp = random.nextInt(20);  
            total += (arr[i] = tmp);  
        }  
        System.out.println(total);  
  
        ForkJoinPool pool = new ForkJoinPool();  
        Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));  
        System.out.println(future.get());  
        pool.shutdown();  
  
    }  
  
}  
相關文章
相關標籤/搜索