線程池 一 ForkJoinPool

java.util.concurrent
public class ForkJoinPool extends AbstractExecutorService
public abstract class ForkJoinTask<V> implements Future<V>, Serializable
public class ForkJoinWorkerThread extends Thread

Fork/Join

Fork/Join技術是分治算法(Divide-and-Conquer)的並行實現,它是一項能夠得到良好的並行性能的簡單且高效的設計技術。

ForkJoinPool的優點在於,能夠充分利用多cpu,多核cpu的優點,把一個任務拆分紅多個「小任務」,
把多個「小任務」放到多個處理器核心上並行執行;當多個「小任務」執行完成以後,再將這些執行結果合併起來便可。

JDK用來執行Fork/Join任務的工做線程池默認大小等於CPU核心數。在一個4核CPU上,最多能夠同時執行4個子任務。

目的是爲了幫助咱們更好地利用多處理器帶來的好處,使用全部可用的運算能力來提高應用的性能。


1. Fork/Join框架主要由ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask來實現

2. ForkJoinPool中只能夠運行ForkJoinTask類型的任務
    (在實際使用中,也能夠接收Runnable/Callable任務,但在真正運行時,也會把這些任務封裝成ForkJoinTask類型的任務)

3. ForkJoinTask表示一個任務,
    ForkJoinTask的子類中有RecursiveAction和RecursiveTask
    RecursiveAction無返回結果,RecursiveTask有返回結果
    工做中,通常重寫RecursiveAction或RecursiveTask的compute(),完成計算或者能夠進行任務拆分。

4. 調用ForkJoinTask的fork()的方法,可讓其餘空閒的線程執行這個ForkJoinTask
    調用ForkJoinTask的join()的方法,將多個小任務的結果進行彙總。

5. ForkJoinWorkerThread是運行ForkJoinTask任務的工做線程

work-stealing(工做竊取)算法

ForkJoinPool中,線程池中每一個工做線程(ForkJoinWorkerThread)都對應一個任務隊列(WorkQueue),
工做線程優先處理來自自身隊列的任務(LIFO),而後以FIFO的順序隨機竊取其餘隊列中的任務。java

ForkJoinPool

ForkJoinPool並行的實現了分治算法(Divide-and-Conquer):把任務遞歸的拆分爲各個子任務,這樣能夠更好的利用系統資源算法

ForkJoinPool中的任務分爲兩種:框架

  1. 一種是本地提交的任務(Submission task,如 execute、submit 提交的任務)
  2. 另一種是 fork 出的子任務(Worker task)

提交任務

外部任務(external/submissions task)提交三種方式:

    execute()是直接向池提交一個任務來異步執行,無返回結果;

    invoke()會一直阻塞到任務執行完成返回計算結果

    submit()也是異步執行,可是會返回提交的任務,在適當的時候可經過task.get()獲取執行結果,實現同步到主線程


子任務(Worker task)提交:
    由任務的fork()方法完成。
    任務被分割(fork)以後調用了ForkJoinPool.WorkQueue.push()方法直接把任務放到隊列中等待被執行。

    public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        int s1, s2;
        t2.fork();
        if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
            t1.reportException(s1);
        if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
            t2.reportException(s2);
    }

示例

對於fork/join模式,假如pool裏面線程數量是固定的,那麼調用子任務的fork方法至關於A先分工給B和C,
而後A當監工不幹活,B和C去完成A交代的任務。
因此上面的模式至關於浪費了一個線程。

若是使用invokeAll至關於A分工給B和C後,A和B和C都去完成工做。這樣縮短了執行的時間。


/**
 * 測試 ForkJoinPool 線程池的使用
 */
public class task {

    /**
     * 測試使用 ForkJoinPool 無返回值的任務執行
     */
    public static void noResultTask() throws Exception {
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(new PrintTask(1, 200));
        pool.shutdown();
    }
}


/**
 * 無返回值的打印任務
 */
class PrintTask extends RecursiveAction {

    private static final long serialVersionUID = 1L;
    private static final int THRESHOLD = 50;
    private int start;
    private int end;

    public PrintTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        //當 結束值 比 起始值 大於 50 時,按數值區間平均拆分爲兩個任務;不然直接打印該區間的值
        if (end - start < THRESHOLD) {
            for (int i = start; i <= end; i++) {
                System.out.println(Thread.currentThread().getName() + ", i = " + i);
            }
        } else {
            int middle = (start + end) / 2;
            //分紅兩個子任務
            PrintTask firstTask = new PrintTask(start, middle);
            PrintTask secondTask = new PrintTask(middle + 1, end);
            //任務拆分
            //firstTask.fork();
            //secondTask.fork();
            invokeAll(firstTask,secondTask);
        }
    }
}


public class task {

    /**
     * 測試使用 ForkJoinPool 有返回值的任務執行,對結果進行合併。計算 1 到 200 的累加和
     */
    public static void hasResultTask() throws Exception {
        //線程池
        ForkJoinPool pool = new ForkJoinPool();
        //提交任務
        ForkJoinTask<Integer> task = pool.submit(new CalculateTask(1, 200));
        //獲得結果
        int result = task.get();
        //關閉線程
        pool.shutdown();
    }

}


/**
 * 有返回值的計算任務
 */
class CalculateTask extends RecursiveTask<Integer> {

    private static final long serialVersionUID = 1L;
    private static final int THRESHOLD = 50;
    private int start;
    private int end;

    public CalculateTask(int start, int end) {
        super();
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        //當 結束值 比 起始值 大於 50 時,按數值區間平均拆分爲兩個任務,進行兩個任務的累加值彙總
        //不然直接計算累加值
        if (end - start <= THRESHOLD) {
            int result = 0;
            for (int i = start; i <= end; i++) {
                result += i;
            }
            return result;
        } else {
            int middle = (start + end) / 2;
            CalculateTask firstTask = new CalculateTask(start, middle);
            CalculateTask secondTask = new CalculateTask(middle + 1, end);
            //任務拆分
            invokeAll(firstTask,secondTask);
            //任務合併
            return firstTask.join() + secondTask.join();
        }
    }

}
相關文章
相關標籤/搜索