併發編程之Fork/Join

併發與並行

併發:多個進程交替執行。java

並行:多個進程同時進行,不存在線程的上下文切換。算法

併發與並行的目的都是使CPU的利用率達到最大。Fork/Join就是爲了儘量提升硬件的使用率而應運而生的。數組

計算密集型與IO密集型

計算密集型:也稱之爲CPU密集型,此時系統的硬盤,內存性能相對於CPU要不少。系統在運做的時候CPU是處於100% loading的狀態,在系統完成磁盤的讀寫(I/O)之後,程序就會進行計算,在進行計算的時候CPU佔用率是很高的。計算密集型任務最大的特色就是進行大量的計算,消耗CPU資源,好比說高清解碼,計算圓周率啥的,都是靠CPU的運算能力。這種類型的任務雖然也支持多任務,可是花費在任務切換的時間越多,執行效率就越低,要最高效的利用cpu,建議任務數小於核心線程數。代碼運行效率也很關鍵,通常使用C語言來寫。線程數的設置:CPU核數+1(現代CPU支持超線程)。網絡

IO密集型:CPU性能要比硬盤,內存性能好不少。這時候,大部分的狀況是CPU在等I/O的讀寫操做,此時CPU loading並非很高。I/O bound的程序通常在達到極限的時候,CPU利用率仍然比較低。對於IO密集型的任務主要涉及到網絡,磁盤IO.特色是CPU消耗不多,任務的大部分時間都是在等待IO操做完成(磁盤IO的速度遠遠低於cpu與內存的速度)。對於這種任務,任務越多,CPU的效率越高。對於這種任務適合使用開發效率最高的腳本語言,C語言基本上沒啥用。線程數的設置:(線程等待時間+線程CPU時間)/線程CPU時間)*CPU數目。多線程

如何利用多核CPU,計算很大數組中全部整數進行排序?併發

當數據量小的時候使用快速排序快,快速排序顯著的特徵是用遞歸的方法去排序的。當數據量大的時候歸遞排序。遞歸排序的思想就是在數組中取一箇中間值,將一個數組分爲2個,一個比中間值大,一個比中間值小,如此反覆拆分排序,直到最後沒法再進行拆分,而後將結果合併。所以遞歸方法除了空間複雜度增長了,還可能會產生棧溢出。(程序計數器是惟一不會發生棧溢出的),虛擬機棧默認最大空間是1M.   app

分治思想:就是將一個規模大的問題劃分爲規模較小的子問題,而後逐步解決小問題,最後合併子問題的解就獲得了原問題的解。即分割原問題--求解子問題--合併子問題的解。框架

子問題通常都是相互獨立的,所以,一般經過遞歸調用算法來求解子問題。dom

Fork/Join框架

  Fork/Join 是一個用於並行執行任務的框架,是一個把大任務拆分紅小任務,執行小任務,最後彙總小任務的結果獲得大任務的結果的框架。總體框架以下:異步

 

 

 

Fork/Join 特徵:

一、ForkJoinPool是ExecutorService的補充 ,適用於一些特定的場景,適合於計算密集型場景。若是存在I/O,線程間同步,sleep()等會形成線程長時間阻塞的狀況,此時能夠配合ManagedBlocker使用。

二、ForkJoinPool主要是實現分治法,分治以後遞歸調用函數。

 

ForkJoinPool 框架主要類

ForkJoinPool 實現ForkJoin的線程池 - ThreadPool

ForkJoinWorkerThread 實現ForkJoin的線程

ForkJoinTask<V> 一個描述ForkJoin的抽象類 Runnable/Callable

RecursiveAction 無返回結果的ForkJoinTask實現Runnable

RecursiveTask<V> 有返回結果的ForkJoinTask實現Callable

CountedCompleter<T> 在任務完成執行後會觸發執行一個自定義的鉤子函數

提交任務:

 

 

fork()相似於Thread.start(),可是它並不當即執行任務,而是將任務放入工做隊列中, 跟Thread.join()不一樣,ForkJoinTask的join()方法並不簡單的阻塞線程 利用工做線程運行其餘任務, 當一個工做線程中中調用join(),它將處理其餘任務,直到注意到目標子任務已經完成。

 ForkJoinPool中的全部工做線程都有一個本身的工做隊列WorkQueue,是一個雙端隊列Deque,從隊頭取任務,先進後出,線程私有,不共享。

以下圖所示:

 

 

線程竊取

工做竊取就是指某個線程從其餘隊列裏竊取任務來執行。在ForkJoinPool中就是將一個大任務分紅n個互不依賴的子任務,爲了減小線程之間的競爭,因而把這些子任務放到不一樣的隊列當中去,併爲每個對列建立一個線程來執行隊列中的任務,A隊列的任務由A線程來執行。可是有的線程執行得比較快,很快就把本身隊列當中的任務執行完成了,可是A隊列裏還有待執行的任務,這時候這個線程(假設是B線程)就會去竊取他的隊列當中的任務來執行。爲了減小竊取任務線程與被竊取任務線程之間的競爭,採用雙端隊列,竊取任務是從隊尾竊取,被竊取任務線程從隊頭獲取任務來執行。

爲了儘量的提升CPU的利用率,空閒的線程將從其餘線程的隊列中竊取任務來執行,從workQueue的隊尾竊取任務,從而減小競爭,任務的竊取是聽從FIFO順序進行的,由於先放入的任務每每表示更大的工做量,竊取來的任務支持進一步的遞歸分解。

WorkQueue雙端隊列最小化任務「竊取」的競爭, push()/pop()僅在其全部者工做線程中調用 ,這些操做都是經過CAS來實現的,是Wait-free的 。

poll() 則由其餘工做線程來調用「竊取」任務 可能不是wait-free。任務竊取的好處就是充分利用了資源,可是也有缺點,當隊列當中只有一個任務的時候,就會出現競爭,而且系統會耗費更多的資源,好比建立多個線程和多個雙端隊列。

 

 

 總結一下就是:

1. ForkJoinPool 的每一個工做線程都維護着一個工做隊列(WorkQueue),這是一個雙端隊列(Deque),裏面存放的對象是任務(ForkJoinTask)。
2. 每一個工做線程在運行中產生新的任務(一般是由於調用了 fork())時,會放入工做隊列的隊頭(左爲隊尾,右側爲隊頭),而且工做線程在處理本身的工做隊列時,使用的是 LIFO 方式,也就是說每次從隊頭取出任務來執行。(ForkJoinTask的fork()的子任務,將放入運行該任務的工做線程的隊頭,工做線程以LIFO的順序來處理隊列中的任務)
3. 每一個工做線程在處理本身的工做隊列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其餘工做線程的工做隊列),竊取的任務位於其餘線程的工做隊列的隊尾,也就是說工做線程在竊取其餘工做線程的任務時,使用的是FIFO 方式。
4. 在遇到 join() 時,若是須要 join 的任務還沒有完成,則會先處理其餘任務,並等待其完成。
5. 在既沒有本身的任務,也沒有能夠竊取的任務時,進入休眠。

代碼以下:

 public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }

爲了測試ForkJoinPool的好處,咱們來看如下兩段代碼,來對比一下結果:

首先咱們來看一下,就用本身寫的分任務執行,來計算

package com.test.executor.arrsum;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.test.executor.arrsum.utils.Utils;

public class SumRecursiveMT {
    public static class RecursiveSumTask implements Callable<Long> {
        public static final int SEQUENTIAL_CUTOFF = 1;
        int lo;
        int hi;
        int[] arr; // arguments
        ExecutorService executorService;

        RecursiveSumTask( ExecutorService executorService, int[] a, int l, int h) {
            this.executorService = executorService;
            this.arr = a;
            this.lo = l;
            this.hi = h;
        }

        public Long call() throws Exception { // override
            System.out.format("%s range [%d-%d] begin to compute %n",
                    Thread.currentThread().getName(), lo, hi);
            long result = 0;
            if (hi - lo <= SEQUENTIAL_CUTOFF) {
                for (int i = lo; i < hi; i++)
                    result += arr[i];

                System.out.format("%s range [%d-%d] begin to finished %n",
                        Thread.currentThread().getName(), lo, hi);
            }
            else {
                RecursiveSumTask left = new RecursiveSumTask(executorService, arr, lo, (hi + lo) / 2);
                RecursiveSumTask right = new RecursiveSumTask(executorService, arr, (hi + lo) / 2, hi);
                Future<Long> lr = executorService.submit(left);
                Future<Long> rr = executorService.submit(right);

                result = lr.get() + rr.get();
                System.out.format("%s range [%d-%d] finished to compute %n",
                        Thread.currentThread().getName(), lo, hi);
            }

            return result;
        }
    }


    public static long sum(int[] arr) throws Exception {
        int nofProcessors = Runtime.getRuntime().availableProcessors();
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        //ExecutorService executorService = Executors.newCachedThreadPool();

        RecursiveSumTask task = new RecursiveSumTask(executorService, arr, 0, arr.length);
        long result =  executorService.submit(task).get();
        return result;
    }

  //執行該方法,看看測試結果
public static void main(String[] args) throws Exception { int[] arr = Utils.buildRandomIntArray(20); System.out.printf("The array length is: %d\n", arr.length); long result = sum(arr); System.out.printf("The result is: %d\n", result); } } package com.test.executor.arrsum.utils; import java.util.Random; public class Utils { public static int[] buildRandomIntArray(int size) { int[] array = new int[size]; for (int i = 0; i < size; i++) { array[i] = new Random().nextInt(100); } return array; } public static int[] buildRandomIntArray() { int size = new Random().nextInt(100); int[] array = new int[size]; for (int i = 0; i < size; i++) { array[i] = new Random().nextInt(100); } return array; } public static void main(String[] args) { int[] ints = Utils.buildRandomIntArray(20); for (int i = 0; i < ints.length; i++) { System.out.println(ints[i]); } } }
package com.test.executor.arrsum;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.test.executor.arrsum.utils.Utils;

public class SumRecursiveMT {
    public static class RecursiveSumTask implements Callable<Long> {
        public static final int SEQUENTIAL_CUTOFF = 1;
        int lo;
        int hi;
        int[] arr; // arguments
        ExecutorService executorService;

        RecursiveSumTask( ExecutorService executorService, int[] a, int l, int h) {
            this.executorService = executorService;
            this.arr = a;
            this.lo = l;
            this.hi = h;
        }

        public Long call() throws Exception { // override
            System.out.format("%s range [%d-%d] begin to compute %n",
                    Thread.currentThread().getName(), lo, hi);
            long result = 0;
            if (hi - lo <= SEQUENTIAL_CUTOFF) {
                for (int i = lo; i < hi; i++)
                    result += arr[i];

                System.out.format("%s range [%d-%d] begin to finished %n",
                        Thread.currentThread().getName(), lo, hi);
            }
            else {
                RecursiveSumTask left = new RecursiveSumTask(executorService, arr, lo, (hi + lo) / 2);
                RecursiveSumTask right = new RecursiveSumTask(executorService, arr, (hi + lo) / 2, hi);
                Future<Long> lr = executorService.submit(left);
                Future<Long> rr = executorService.submit(right);

                result = lr.get() + rr.get();
                System.out.format("%s range [%d-%d] finished to compute %n",
                        Thread.currentThread().getName(), lo, hi);
            }

            return result;
        }
    }


    public static long sum(int[] arr) throws Exception {
        int nofProcessors = Runtime.getRuntime().availableProcessors();
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        //ExecutorService executorService = Executors.newCachedThreadPool();

        RecursiveSumTask task = new RecursiveSumTask(executorService, arr, 0, arr.length);
        long result =  executorService.submit(task).get();
        return result;
    }

    public static void main(String[] args) throws Exception {
        int[] arr = Utils.buildRandomIntArray(20);
        System.out.printf("The array length is: %d\n", arr.length);
        
        long result = sum(arr);

        System.out.printf("The result is: %d\n", result);

    }
}

 

運行該代碼的結果以下:

 

 結果一直沒有出來,就說明一直在計算。由於線程在遞歸計算,開的線程太多,而後計算時間比較長。

ForkJoin的使用

ForkJoinTask:咱們要使用ForkJoin框架,就要建立一個ForkJoin 任務,建立ForkJoin任務的話,不須要直接繼承ForkJoinTask類,而是繼承他的子類.ForkJoin框架有兩個子類RecursiveAction和RecursiveTask<V>。

  一、RecursiveAction:用於返回沒有結果的任務。(好比寫數據到磁盤之後就退出。一個RecursiveAction能夠把工做分割成若干小塊,由獨立的線程或者CPU執行,經過繼承實現RecursiveAction)

  二、RecursiveTask<V> :用於執行有返回結果的任務。(將一個任務分割成若干的子任務,每一個子任務返回的值合併到一個集體結果,能夠水平的分割和合並。)

 ForkJoinPool:ForkJoinTask須要經過ForkJoinPool來執行。任務分割出來的子任務會添加到當前工做線程的雙端隊列當中,進入隊列的頭部。當一個工做線程的隊列中沒有任務的時候它會從其餘隊列的尾部獲取任務來執行。

接下來來看看用ForkJoinPool來計算的代碼:

package com.test.executor.arrsum;

import java.util.concurrent.RecursiveTask;

/**
 * RecursiveTask 並行計算,同步有返回值
 * ForkJoin框架處理的任務基本都能使用遞歸處理,好比求斐波那契數列等,但遞歸算法的缺陷是:
 *    一隻會只用單線程處理,
 *    二是遞歸次數過多時會致使堆棧溢出;
 * ForkJoin解決了這兩個問題,使用多線程併發處理,充分利用計算資源來提升效率,同時避免堆棧溢出發生。
 * 固然像求斐波那契數列這種小問題直接使用線性算法搞定可能更簡單,實際應用中徹底不必使用ForkJoin框架,
 * 因此ForkJoin是核彈,是用來對付你們夥的,好比超大數組排序。
 * 最佳應用場景:多核、多內存、能夠分割計算再合併的計算密集型任務
 */
class LongSum extends RecursiveTask<Long> {

    static final int SEQUENTIAL_THRESHOLD = 1000;
    static final long NPS = (1000L * 1000 * 1000);
    static final boolean extraWork = true; // change to add more than just a sum


    int low;
    int high;
    int[] array;

    LongSum(int[] arr, int lo, int hi) {
        array = arr;
        low = lo;
        high = hi;
    }

    /**
     * fork()方法:將任務放入隊列並安排異步執行,一個任務應該只調用一次fork()函數,除非已經執行完畢並從新初始化。
     * tryUnfork()方法:嘗試把任務從隊列中拿出單獨處理,但不必定成功。
     * join()方法:等待計算完成並返回計算結果。
     * isCompletedAbnormally()方法:用於判斷任務計算是否發生異常。
     */
    protected Long compute() {

        if (high - low <= SEQUENTIAL_THRESHOLD) {
            long sum = 0;
            for (int i = low; i < high; ++i) {
                sum += array[i];
            }
            return sum;

        } else {
            int mid = low + (high - low) / 2;
            LongSum left = new LongSum(array, low, mid);
            LongSum right = new LongSum(array, mid, high);
            left.fork();
            right.fork();
            long rightAns = right.join();
            long leftAns = left.join();
            return leftAns + rightAns;
        }
    }
}

       package com.test.executor.arrsum;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

import com.test.executor.arrsum.utils.Utils;

public class LongSumMain {

    //獲取邏輯處理器數量
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    /** for time conversion */
    static final long NPS = (1000L * 1000 * 1000);

    static long calcSum;

    static final boolean reportSteals = true;

    public static void main(String[] args) throws Exception {
        int[] array = Utils.buildRandomIntArray(20000000);
        System.out.println("cpu-num:"+NCPU);
        //單線程下計算數組數據總和
         calcSum = seqSum(array);
        System.out.println("seq sum=" + calcSum);

        //採用fork/join方式將數組求和任務進行拆分執行,最後合併結果
        LongSum ls = new LongSum(array, 0, array.length);
          ForkJoinPool fjp  = new ForkJoinPool(NCPU); //使用的線程數
        ForkJoinTask<Long> task = fjp.submit(ls);
        System.out.println("forkjoin sum=" + task.get());

        if(task.isCompletedAbnormally()){
            System.out.println(task.getException());
        }

        fjp.shutdown();

    }


    static long seqSum(int[] array) {
        long sum = 0;
        for (int i = 0; i < array.length; ++i)
            sum += array[i];
        return sum;
    }

}

以上的運行結果就很快:

cpu-num:4
seq sum=989877234
forkjoin sum=989877234

 Fork/Join框架原理

異常處理

   ForkJoinTask在執行任務的時候可能會拋異常,此時咱們沒有辦法從主線程裏面獲取異常,因此咱們使用如下幾種方法來判斷以及獲取異常:

  一、isCompletedAbnormally()方法來判斷任務有沒有拋出異常或者被取消。

  二、getException()能夠獲取到異常。

  三、isCompletedNormally()這個方法是看任務是否正常執行完成且沒有任何異常。

  示例:

if(task.isCompletedAbnormally())
   System.out.print(task.getException());

ForkJoinPool構造方法

 public ForkJoinPool() {
        this(Runtime.getRuntime().availableProcessors(),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
 public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        Thread.UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        checkPermission();
        if (factory == null)
            throw new NullPointerException();
        if (parallelism <= 0 || parallelism > MAX_ID)
            throw new IllegalArgumentException();
        this.parallelism = parallelism;
        this.factory = factory;
        this.ueh = handler;
        this.locallyFifo = asyncMode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
        this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
        // initialize workers array with room for 2*parallelism if possible
        int n = parallelism << 1;
        if (n >= MAX_ID)
            n = MAX_ID;
        else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
            n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
        }
        workers = new ForkJoinWorkerThread[n + 1];
        this.submissionLock = new ReentrantLock();
        this.termination = submissionLock.newCondition();
        StringBuilder sb = new StringBuilder("ForkJoinPool-");
        sb.append(poolNumberGenerator.incrementAndGet());
        sb.append("-worker-");
        this.workerNamePrefix = sb.toString();
    }

重要參數說明:

一、parallelism:並行數。通常跟CPU個數保持一致。經過Runtime.getRuntime().availableProcessors()能夠獲取到當前機器的CPU個數。

二、ForkJoinWorkerThreadFactory factory:建立線程的工廠

三、Handler  :線程異常處理器,Thread.UncaughtExceptionHandler ,該處理器在線程執行任務時因爲某些沒法預料到的錯誤而致使任務線程中斷時進行一些處理,默認狀況爲null。

 四、boolean asyncMode: 表示工做線程內的任務隊列是採用何種方式進行調度,能夠是先進先出FIFO,也能夠是先進後出FILO.若是爲true,則表示線程池中的線程使用先進先出的方式進行調度,默認爲false.

ForkJoinTask fork()/join()方法

一、fork():這個方法的做用就是將任務放到當前線程的工做隊列當中去;

public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

二、join()的方法咱們先看一下代碼:

 private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
                return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }

  */
    private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        return getRawResult();
    }
public final V join() {
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }

一、檢查調用Join()方法的線程是不是ForkJoinWorkerThread,若是不是的話就阻塞當前線程,等待任務完成,若是是則不阻塞;

二、判斷任務的狀態,是否已經完成,若是已經完成,則返回結果;

三、任務沒有完成,判斷任務是否處於本身的隊列當中,若是是,就取出執行完任務;

四、任務沒有在本身隊列當中,則說明任務被偷走,找到偷走任務的小偷,竊取小偷工做隊列中的任務,並執行,幫助小偷快點完成待join的任務;

五、若小偷偷走的任務已經完成,則找到小偷的小偷,幫助他完成任務;

六、遞歸執行5;

整體概括起來的流程以下:

 

 

 ForkJoinPool 之submit()方法

  public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        forkOrSubmit(task);
        return task;
    }
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&
            (w = (ForkJoinWorkerThread)t).pool == this)
            w.pushTask(task);
        else
            addSubmission(task);
    }

ForkJoinPool有本身的工做隊列,這些工做對列是用來接收由外部線程(非ForkJoinThread)提交過來的任務,這個對列稱爲submittingQueue。submit()和fork()沒有本質的區別,只是提交對象是submittingQueue.submittingQueue也是工做線程竊取對象,當其中的任務被工做線程竊取成功的時候,表明提交任務正式進入執行階段。

 

Fork/Join框架執行流程

相關文章
相關標籤/搜索