fork/join框架

 

1. Fork/Join是什麼

 

  Fork/Join框架是Java 7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。Fork/Join框架要完成兩件事情:html

   Oracle的官方給出的定義是:Fork/Join框架是一個實現了ExecutorService接口的多線程處理器。它能夠把一個大的任務劃分爲若干個小的任務併發執行,充分利用可用的資源,進而提升應用的執行效率。java

    1.任務分割:首先Fork/Join框架須要把大的任務分割成足夠小的子任務,若是子任務比較大的話還要對子任務進行繼續分割算法

    2.執行任務併合並結果:分割的子任務分別放到雙端隊列裏,而後幾個啓動線程分別從雙端隊列裏獲取任務執行。子任務執行完的結果都放在另一個隊列裏,啓動一個線程從隊列裏取數據,而後合併這些數據。數組

  Fork/Join實現了ExecutorService,因此它的任務也須要放在線程池中執行。它的不一樣在於它使用了工做竊取算法,空閒的線程能夠從滿負荷的線程中竊取任務來幫忙執行。多線程

  Fork/Join框架的核心是繼承了AbstractExecutorService的ForkJoinPool類,它保證了工做竊取算法和ForkJoinTask的正常工做。併發

  在Java的Fork/Join框架中,使用兩個類完成上述操做框架

  1.ForkJoinTask:咱們要使用Fork/Join框架,首先須要建立一個ForkJoin任務。該類提供了在任務中執行fork和join的機制。一般狀況下咱們不須要直接集成ForkJoinTask類,只須要繼承它的子類,Fork/Join框架提供了兩個子類:異步

    a.RecursiveAction:用於沒有返回結果的任務ide

    b.RecursiveTask:用於有返回結果的任務ui

  2.ForkJoinPool:ForkJoinTask須要經過ForkJoinPool來執行

    任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,進入隊列的頭部。當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘工做線程的隊列的尾部獲取一個任務(工做竊取算法)。

    執行方法有同步的:invoke,異步的execute和submit

2. Fork/Join框架的實現原理

  ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責將存放程序提交給ForkJoinPool,而ForkJoinWorkerThread負責執行這些任務。

2.1 ForkJoinTask的Fork方法的實現原理:  

  當咱們調用ForkJoinTask的fork方法時,程序會把任務放在ForkJoinWorkerThread的pushTask的workQueue中,異步地執行這個任務,而後當即返回結果,代碼以下:

public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

  pushTask方法把當前任務存放在ForkJoinTask數組隊列裏。而後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工做線程來執行任務。代碼以下:

final void push(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; ForkJoinPool p;
            int b = base, s = top, n;
            if ((a = array) != null) {    // ignore if queue removed
                int m = a.length - 1;     // fenced write for task visibility
                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                U.putOrderedInt(this, QTOP, s + 1);
                if ((n = s - b) <= 1) {
                    if ((p = pool) != null)
                        p.signalWork(p.workQueues, this);
                }
                else if (n >= m)
                    growArray();
            }
        }

2.2 ForkJoinTask的Fork方法的實現原理:

當咱們調用ForkJoinTask的fork方法時,程序會把任務放在ForkJoinWorkerThread的pushTask的workQueue中,異步地執行這個任務,而後當即返回結果,代碼以下:

public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

  pushTask方法把當前任務存放在ForkJoinTask數組隊列裏。而後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工做線程來執行任務。代碼以下:  

final void push(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; ForkJoinPool p;
            int b = base, s = top, n;
            if ((a = array) != null) {    // ignore if queue removed
                int m = a.length - 1;     // fenced write for task visibility
                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                U.putOrderedInt(this, QTOP, s + 1);
                if ((n = s - b) <= 1) {
                    if ((p = pool) != null)
                        p.signalWork(p.workQueues, this);
                }
                else if (n >= m)
                    growArray();
            }
        }

2.3 ForkJoinTask的join方法實現原理

  Join方法的主要做用是阻塞當前線程並等待獲取結果。讓咱們一塊兒看看ForkJoinTask的join方法的實現,代碼以下:

public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

  它首先調用doJoin方法,經過doJoin()方法獲得當前任務的狀態來判斷返回什麼結果,任務狀態有4種:已完成(NORMAL)、被取消(CANCELLED)、信號(SIGNAL)和出現異常(EXCEPTIONAL)。

  若是任務狀態是已完成,則直接返回任務結果。

  若是任務狀態是被取消,則直接拋出CancellationException

  若是任務狀態是拋出異常,則直接拋出對應的異常

  讓咱們分析一下doJoin方法的實現

private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

  在doJoin()方法裏,首先經過查看任務的狀態,看任務是否已經執行完成,若是執行完成,則直接返回任務狀態;若是沒有執行完,則從任務數組裏取出任務並執行。若是任務順利執行完成,則設置任務狀態爲NORMAL,若是出現異常,則記錄異常,並將任務狀態設置爲EXCEPTIONAL。

2.4 Fork/Join框架的異常處理

  ForkJoinTask在執行的時候可能會拋出異常,可是咱們沒辦法在主線程裏直接捕獲異常,因此ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,而且能夠經過ForkJoinTask的getException方法獲取異常。使用以下代碼:

if(task.isCompletedAbnormally())
{
    System.out.println(task.getException());
}

  getException方法返回Throwable對象,若是任務被取消了則返回CancellationException。若是任務沒有完成或者沒有拋出異常則返回null。

public final Throwable getException() {
        int s = status & DONE_MASK;
        return ((s >= NORMAL)    ? null :
                (s == CANCELLED) ? new CancellationException() :
                getThrowableException());
    } 

3. Fork/Join的基本用法

3.1 任務類FrokJoinTask

上文已經提到,Fork/Join就是要講一個大的任務分割成若干小的任務,因此第一步固然是要作任務的分割,大體方式以下:

if (這個任務足夠小){ 
  執行要作的任務 

} else { 

  將任務分割成兩小部分 

  執行兩小部分並等待執行結果 

}  

 

要實現FrokJoinTask咱們須要一個繼承了RecursiveTask或RecursiveAction的基類,並根據自身業務狀況將上面的代碼放入基類的coupute方法中。RecursiveTask和RecursiveAction都繼承了FrokJoinTask,它倆的區別就是RecursiveTask有返回值而RecursiveAction沒有。下面是我作的一個選出字符串列表中還有"a"的元素的Demo:

@Override 
   protected List<String> compute() { 

       // 當end與start之間的差小於閾值時,開始進行實際篩選 

       if (end - this.start < threshold) { 

           List<String> temp = list.subList(this.start, end); 

       return temp.parallelStream().filter(s -> s.contains("a")).collect(Collectors.toList()); 

       } else { 

           // 若是當end與start之間的差大於閾值時 

           // 將大任務分解成兩個小任務。 

       int middle = (this.start + end) / 2; 

           ForkJoinTest left = new ForkJoinTest(list, this.start, middle, threshold); 

           ForkJoinTest right = new ForkJoinTest(list, middle, end, threshold); 

           // 並行執行兩個「小任務」 

      left.fork(); 

      right.fork(); 

           // 把兩個「小任務」的結果合併起來 

           List<String> join = left.join(); 

      join.addAll(right.join()); 

      return join; 

       } 

   }  

  

 

3.2 執行類ForkJoinPool

作好了基類就能夠開始調用了,調用時首先咱們須要Fork/Join線程池ForkJoinPool,而後向線程池中提交一個ForkJoinTask並獲得結果。ForkJoinPool的submit方法的入參是一個ForkJoinTask,返回值也是一個ForkJoinTask,它提供一個get方法能夠獲取到執行結果。

代碼以下:

     ForkJoinPool pool = new ForkJoinPool(); 
        // 提交可分解的ForkJoinTask任務 
        ForkJoinTask<List<String>> future = pool.submit(forkJoinService); 
        System.out.println(future.get()); 
        // 關閉線程池 
        pool.shutdown();  

  

就這樣咱們就完成了一個簡單的Fork/Join的開發。

提示:Java8中java.util.Arrays的parallelSort()方法和java.util.streams包中封裝的方法也都用到了Fork/Join。

3.3 ForkJoinPool的維護方法

ForkJoinPool提供了一系列的維護方法,好比獲取執行線程數量,完成數量,查看偷取任務數等。

4. 實例代碼

 1. 定義抽象類(用於拓展,此例中沒有實際做用,能夠不定義此類):

import java.util.concurrent.RecursiveTask; 
 
/** 
 * Description: ForkJoin接口 
 * Designer: jack 
 * Date: 2017/8/3 
 * Version: 1.0.0 
 */ 
public abstract class ForkJoinService<T> extends RecursiveTask<T>{ 
    @Override 
    protected abstract T compute(); 
}  

 2. 任務類

import java.util.List; 
import java.util.stream.Collectors; 
 
/** 
 * Description: ForkJoin基類 
 * Designer: jack 
 * Date: 2017/8/3 
 * Version: 1.0.0 
 */ 
public class ForkJoinTest extends ForkJoinService<List<String>> { 
 
    private static ForkJoinTest forkJoinTest; 
    private int threshold;  //閾值 
    private List<String> list; //待拆分List 
 
    private ForkJoinTest(List<String> list, int threshold) { 
        this.list = list; 
        this.threshold = threshold; 
    } 
 
    @Override 
    protected List<String> compute() { 
        // 當end與start之間的差小於閾值時,開始進行實際篩選 
        if (list.size() < threshold) { 
            return list.parallelStream().filter(s -> s.contains("a")).collect(Collectors.toList()); 
        } else { 
            // 若是當end與start之間的差大於閾值時,將大任務分解成兩個小任務。 
            int middle = list.size() / 2; 
            List<String> leftList = list.subList(0, middle); 
            List<String> rightList = list.subList(middle, list.size()); 
            ForkJoinTest left = new ForkJoinTest(leftList, threshold); 
            ForkJoinTest right = new ForkJoinTest(rightList, threshold); 
            // 並行執行兩個「小任務」 
            left.fork(); 
            right.fork(); 
            // 把兩個「小任務」的結果合併起來 
            List<String> join = left.join(); 
            join.addAll(right.join()); 
            return join; 
        } 
    } 
 
    /** 
     * 獲取ForkJoinTest實例 
     * @param list  待處理List 
     * @param threshold 閾值 
     * @return ForkJoinTest實例 
     */ 
    public static ForkJoinService<List<String>> getInstance(List<String> list, int threshold) { 
        if (forkJoinTest == null) { 
            synchronized (ForkJoinTest.class) { 
                if (forkJoinTest == null) { 
                    forkJoinTest = new ForkJoinTest(list, threshold); 
                } 
            } 
        } 
        return forkJoinTest; 
    } 
}  

 3. 執行類

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ForkJoinPool; 
import java.util.concurrent.ForkJoinTask; 
 
/** 
 * Description: Fork/Join執行類 
 * Designer: jack 
 * Date: 2017/8/3 
 * Version: 1.0.0 
 */ 
public class Test { 
 
    public static void main(String args[]) throws ExecutionException, InterruptedException { 
 
        String[] strings = {"a", "ah", "b", "ba", "ab", "ac", "sd", "fd", "ar", "te", "se", "te", 
                "sdr", "gdf", "df", "fg", "gh", "oa", "ah", "qwe", "re", "ty", "ui"}; 
        List<String> stringList = new ArrayList<>(Arrays.asList(strings)); 
 
        ForkJoinPool pool = new ForkJoinPool(); 
        ForkJoinService<List<String>> forkJoinService = ForkJoinTest.getInstance(stringList, 20); 
        // 提交可分解的ForkJoinTask任務 
        ForkJoinTask<List<String>> future = pool.submit(forkJoinService); 
        System.out.println(future.get()); 
        // 關閉線程池 
        pool.shutdown(); 
 
    } 
 
}  

  

參考

http://developer.51cto.com/art/201708/547413.htm

https://www.cnblogs.com/senlinyang/p/7885964.html

相關文章
相關標籤/搜索