ForkJoin框架之ForkJoinTask

前言

在前面的文章"CompletableFuture和響應式編程"中提到了ForkJoinTask和ForkJoinPool,後者毫無疑問是一個線程池,前者則是一個相似FutureTask經典定義的概念.node

官方有一個很是無語的解釋:ForkJoinTask就是運行在ForkJoinPool的一個任務抽象,ForkJoinPool就是運行ForkJoinTask的線程池.算法

ForkJoin框架包含ForkJoinTask,ForkJoinWorkerThread,ForkJoinPool和若干ForkJoinTask的子類,它的核心在於分治和工做竅取,最大程度利用線程池中的工做線程,避免忙的忙死,餓的餓死.編程

ForkJoinTask能夠理解爲類線程但比線程輕量的實體,在ForkJoinPool中運行的少許ForkJoinWorkerThread能夠持有大量的ForkJoinTask和它的子任務.ForkJoinTask同時也是一個輕量的Future,使用時應避免較長阻塞和io.api

ForkJoinTask在JAVA8中應用普遍,但它是一個抽象類,它的子類派生了各類用途,如後續計劃單獨介紹的CountedCompleter,以及若干JAVA8中stream api定義的與並行流有關的各類操做(ops).數組

源碼

首先看ForkJoinTask的簽名.併發

public abstract class ForkJoinTask<V> implements Future<V>, Serializable

從簽名上看,ForkJoinTask實現了future,也能夠序列化,但它不是一個Runnable或Callable.app

ForkJoinTask雖然能夠序列化,但它只對運行前和後敏感,對於執行過程當中不敏感.框架

先來看task的運行字段:異步

//volatie修飾的任務狀態值,由ForkJoinPool或工做線程修改.
volatile int status; 
static final int DONE_MASK   = 0xf0000000;//用於屏蔽完成狀態位. 
static final int NORMAL      = 0xf0000000;//表示正常完成,是負值.
static final int CANCELLED   = 0xc0000000;//表示被取消,負值,且小於NORMAL
static final int EXCEPTIONAL = 0x80000000;//異常完成,負值,且小於CANCELLED
static final int SIGNAL      = 0x00010000;//用於signal,必須不小於1<<16,默認爲1<<16.
static final int SMASK       = 0x0000ffff;//後十六位的task標籤

很顯然,DONE_MASK可以過濾掉全部非NORMAL,非CANCELLED,非EXCEPTIONAL的狀態,字段的含義也很直白,後面的SIGNAL和SMASK還不明確,後面再看.ide

//標記當前task的completion狀態,同時根據狀況喚醒等待該task的線程.
private int setCompletion(int completion) {
    for (int s;;) {
        //開啓一個循環,若是當前task的status已是各類完成(小於0),則直接返回status,這個status多是某一次循環前被其餘線程完成.
        if ((s = status) < 0)
            return s;
        //嘗試將原來的status設置爲它與completion按位或的結果.
        if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
            if ((s >>> 16) != 0)
                //此處體現了SIGNAL的標記做用,很明顯,只要task完成(包含取消或異常),或completion傳入的值不小於1<<16,
                //就能夠起到喚醒其餘線程的做用.
                synchronized (this) { notifyAll(); }
            //cas成功,返回參數中的completion.
            return completion;
        }
    }
}

前面用註釋解釋了這個方法的邏輯,顯然該方法是阻塞的,若是傳入的參數不能將status設置爲負值會如何?

顯然,可能會有至多一次的成功cas,而且若知足喚醒的條件,會嘗試去喚醒線程,甚至可能由於爲了喚醒其餘線程而被阻塞在synchonized代碼塊外;也可能沒有一次成功的cas,直到其餘線程成功將status置爲完成.

//final修飾,運行ForkJoinTask的核心方法.
final int doExec() {
    int s; boolean completed;
    //僅未完成的任務會運行,其餘狀況會忽略.
    if ((s = status) >= 0) {
        try {
            //調用exec
            completed = exec();
        } catch (Throwable rex) {
            //發生異常,用setExceptionalCompletion設置結果
            return setExceptionalCompletion(rex);
        }
        if (completed)
            //正常完成,調用前面說過的setCompletion,參數爲normal,並將返回值做爲結果s.
            s = setCompletion(NORMAL);
    }
    //返回s
    return s;
}

//記錄異常而且在符合條件時傳播異常行爲
private int setExceptionalCompletion(Throwable ex) {
    //首先記錄異常信息到結果
    int s = recordExceptionalCompletion(ex);
    if ((s & DONE_MASK) == EXCEPTIONAL)
        //status去除非完成態標誌位(只保留前4位),等於EXCEPTIONAL.內部傳播異常
        internalPropagateException(ex);
    return s;
}
//internalPropagateException方法是一個空方法,留給子類實現,可用於completer之間的異常傳遞
void internalPropagateException(Throwable ex) {}
//記錄異常完成
final int recordExceptionalCompletion(Throwable ex) {
    int s;
    if ((s = status) >= 0) {
        //只能是異常態的status能夠記錄.
        //hash值禁止重寫,不使用子類的hashcode函數.
        int h = System.identityHashCode(this);
        final ReentrantLock lock = exceptionTableLock;
        //異常鎖,加鎖
        lock.lock();
        try {
            //抹除髒異常,後面敘述
            expungeStaleExceptions();
            //異常表數組.ExceptionNode後面敘述.
            ExceptionNode[] t = exceptionTable;//exceptionTable是一個全局的靜態常量,後面敘述
            //用hash值和數組長度進行與運算求一個初始的索引
            int i = h & (t.length - 1);
            for (ExceptionNode e = t[i]; ; e = e.next) {
                //找到空的索引位,就建立一個新的ExceptionNode,保存this,異常對象並退出循環
                if (e == null) {
                    t[i] = new ExceptionNode(this, ex, t[i]);//(1)
                    break;
                }
                if (e.get() == this) //已設置在相同的索引位置的鏈表中,退出循環.//2
                    break;
            //不然e指向t[i]的next,進入下個循環,直到發現判斷包裝this這個ForkJoinTask的ExceptionNode已經出如今t[i]這個鏈表並break(2),
            //或者直到e是null,意味着t[i]出發開始的鏈表並沒有包裝this的ExceptionNode,則將構建一個新的ExceptionNode並置換t[i],
            //將原t[i]置爲它的next(1).整個遍歷判斷和置換過程處在鎖中進行.
            }
        } finally {
            lock.unlock();
        }
        //記錄成功,將當前task設置爲異常完成.
        s = setCompletion(EXCEPTIONAL);
    }
    return s;
}

//exceptionTable聲明
private static final ExceptionNode[] exceptionTable;//全局異常node表
private static final ReentrantLock exceptionTableLock;//上面用到的鎖,就是一個普通的可重入鎖.
private static final ReferenceQueue<Object> exceptionTableRefQueue;//變量表引用隊列,後面詳述.
private static final int EXCEPTION_MAP_CAPACITY = 32;//異常表的固定容量,不大,只有32並且是全局的.

//初始化在一個靜態代碼塊.
static {
    exceptionTableLock = new ReentrantLock();
    exceptionTableRefQueue = new ReferenceQueue<Object>();
    exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];//容量
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ForkJoinTask.class;
        STATUS = U.objectFieldOffset
            (k.getDeclaredField("status"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

//先來看ExceptionNode內部類的實現
//簽名,實現了一個ForkJoinTask的弱引用.
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
    final Throwable ex;
    ExceptionNode next;
    final long thrower;  // use id not ref to avoid weak cycles
    final int hashCode;  // store task hashCode before weak ref disappears
    ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
        super(task, exceptionTableRefQueue);//指向弱引用的構造函數,保存引用爲task,隊列爲全局的exceptionTableRefQueue.
        this.ex = ex;//拋出的異常的引用
        this.next = next;//數組中的ExceptionNode以鏈表形式存在,前面分析過,先入者爲後入者的next
        this.thrower = Thread.currentThread().getId();//保存拋出異常的線程id(嚴格來講是建立了this的線程)
        this.hashCode = System.identityHashCode(task);//哈希碼保存關聯task的哈希值.
    }
}
//清除掉異常表中的髒數據,僅在持有全局鎖時纔可以使用.前面看到在記錄新的異常信息時要進行一次清除嘗試
private static void expungeStaleExceptions() {
    //循環條件,全局exceptionTableRefQueue隊列不爲空,前面說過ExceptionNode是弱引用,當它被回收時會被放入此隊列.
    for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
        //從隊首依次取出元素.
        if (x instanceof ExceptionNode) {
            //計算在全局exceptionTable中的索引.
            int hashCode = ((ExceptionNode)x).hashCode;
            ExceptionNode[] t = exceptionTable;
            int i = hashCode & (t.length - 1);
            //取出node
            ExceptionNode e = t[i];
            ExceptionNode pred = null;
            //不停遍歷,直到e是null爲止.
            while (e != null) {
                //e的next
                ExceptionNode next = e.next;//2
                //x是隊首出隊的元素.它與e相等說明找到
                if (e == x) {
                    //e是一個鏈表的元素,pred表示它是否有前置元素
                    if (pred == null)
                        //無前置元素,說明e在鏈表首部,直接將首部元素指向next便可.
                        t[i] = next;
                    else
                        //有前置元素,說明循環過若干次,將當前e出鏈表
                        pred.next = next;
                    //在鏈表中發現x即break掉內循環,繼續從exceptionTableRefQueue的隊首彈出新的元素.
                    break;
                }
                //只要發現當前e不是x,準備下一次循環,pred指向e.e指向next,進行下一個元素的比較.
                pred = e;
                e = next;
            }
        }
    }
}

到此doExec(也是每一個ForkJoinTask的執行核心過程)就此結束.

很明顯,ForkJoinTask的doExec負責了核心的執行,它留下了exec方法給子類實現,而重點負責了後面出現異常狀況的處理.處理的邏輯前面已論述,在產生異常時嘗試將異常存放在全局的execptionTable中,存放的結構爲數組+鏈表,按哈希值指定索引,每次存放新的異常時,順便清理上一次已被gc回收的ExceptionNode.全部ForkJoinTask共享了一個exceptionTable,所以必然在有關的幾個環節要進行及時的清理.除了剛剛論述的過程,還有以下的幾處:

clipboard.png

前面論述了recordExceptionalCompletion,一共有四處使用了expungeStaleException,將已回收的ExceptionNode從引用隊列中清除.

clearExceptionalCompletion在對一個ForkJoinTask從新初始化時使用,咱們在前面提到序列化時說過,ForkJoinTask的序列化結果只保留了兩種狀況:運行前,運行結束.從新初始化一個ForkJoinTask,就要去除任何中間狀態,包含自身產出的已被回收的異常node,而expungeStaleExceptions顯然也順便幫助其餘task清除.

getThrowableException是查詢task運行結果時調用,如一些get/join方法,很明顯,記錄這個異常的做用就在於返回給get/join,在這一塊順便清理已被回收的node,尤爲是將本身運行時生成的node清除.

helpExpungeStaleExceptions是提供給ForkJoinPool在卸載worker時使用,順便幫助清理全局異常表.

使用它們的方法稍後再論述,先來繼續看ForkJoinTask的源碼.

//內部等待任務完成,直到完成或超時.
final void internalWait(long timeout) {
    int s;
    //status小於0表明已完成,直接忽略wait.
    //未完成,則試着加上SIGNAL的標記,令完成任務的線程喚醒這個等待.
    if ((s = status) >= 0 && 
        U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
        //加鎖,只有一個線程能夠進入.
        synchronized (this) {
            //再次判斷未完成.等待timeout,且忽略擾動異常.
            if (status >= 0)
                try { wait(timeout); } catch (InterruptedException ie) { }
            else
                //已完成則響醒其餘等待者.
                notifyAll();
        }
    }
}

internalWait方法邏輯很簡單,首先判斷是否未完成,知足未完成,則將標記位加上SIGNAL(可能已有別的線程作過),隨後加鎖double check status,還未完成則等待並釋放鎖,若發現已完成,或在後續被喚醒後發現已完成,則喚醒其餘等待線程.經過notifyAll的方式避免了通知丟失.

同時,它的使用方法目前只有一個ForkJoinPool::awaitJoin,在該方法中使用循環的方式進行internalWait,知足了每次按截止時間或週期進行等待,同時也順便解決了虛假喚醒.

繼續看externalAwaitDone函數.它體現了ForkJoin框架的一個核心:外部幫助.

//外部線程等待一個common池中的任務完成.
private int externalAwaitDone() {
    int s = ((this instanceof CountedCompleter) ? 
    //當前task是一個CountedCompleter,嘗試使用common ForkJoinPool去外部幫助完成,並將完成狀態返回.
             ForkJoinPool.common.externalHelpComplete(
                 (CountedCompleter<?>)this, 0) :
            //當前task不是CountedCompleter,則調用common pool嘗試外部彈出該任務並進行執行,
            //status賦值doExec函數的結果,若彈出失敗(其餘線程先行彈出)賦0.
             ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
    if (s >= 0 && (s = status) >= 0) {
        //檢查上一步的結果,即外部使用common池彈出並執行的結果(不是CountedCompleter的狀況),或外部嘗試幫助CountedCompleter完成的結果
        //status大於0表示嘗試幫助完成失敗.
        //擾動標識,初值false
        boolean interrupted = false;
        do {
            //循環嘗試,先給status標記SIGNAL標識,便於後續喚醒操做.
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                synchronized (this) {
                    if (status >= 0) {
                        try {
                            //CAS成功,進同步塊發現double check未完成,則等待.
                            wait(0L);
                        } catch (InterruptedException ie) {
                            //若在等待過程當中發生了擾動,不中止等待,標記擾動.
                            interrupted = true;
                        }
                    }
                    else
                        //進同步塊發現已完成,則喚醒全部等待線程.
                        notifyAll();
                }
            }
        } while ((s = status) >= 0);//循環條件,task未完成.
        if (interrupted)
            //循環結束,若循環中間曾有擾動,則中斷當前線程.
            Thread.currentThread().interrupt();
    }
    //返回status
    return s;
}

externalAwaitDone的邏輯不復雜,在當前task爲ForkJoinPool.common的狀況下能夠在外部進行等待和嘗試幫助完成.方法會首先根據ForkJoinTask的類型進行嘗試幫助,並返回當前的status,若發現未完成,則進入下面的等待喚醒邏輯.該方法的調用者爲非worker線程.

類似的方法:externalInterruptibleAwaitDone

private int externalInterruptibleAwaitDone() throws InterruptedException {
    int s;
    //不一樣於externalAwaitDone,入口處發現當前線程已中斷,則當即拋出中斷異常.
    if (Thread.interrupted())
        throw new InterruptedException();
    if ((s = status) >= 0 &&
        (s = ((this instanceof CountedCompleter) ?
              ForkJoinPool.common.externalHelpComplete(
                  (CountedCompleter<?>)this, 0) :
              ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
              0)) >= 0) {
        while ((s = status) >= 0) {
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                synchronized (this) {
                    if (status >= 0)
                        //wait時也不catch中斷異常,發生即拋出.
                        wait(0L);
                    else
                        notifyAll();
                }
            }
        }
    }
    return s;
}

externalInterruptibleAwaitDone的邏輯與externalAwaitDone類似,只是對中斷異常的態度爲拋,後者爲catch.

它們的使用點,externalAwaitDone爲doJoin或doInvoke方法調用,externalInterruptibleAwaitDone爲get方法調用,很明顯,join操做不可擾動,get則能夠擾動.

下面來看看doJoin和doInvoke

//join的核心方法
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    //已完成,返回status,未完成再嘗試後續
    return (s = status) < 0 ? s :
        //未完成,當前線程是ForkJoinWorkerThread,從該線程中取出workQueue,並嘗試將
        //當前task出隊而後執行,執行的結果是完成則返回狀態,不然使用當線程池所在的ForkJoinPool的awaitJoin方法等待.
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        //當前線程不是ForkJoinWorkerThread,調用前面說的externalAwaitDone方法.
        externalAwaitDone();
}

//invoke的核心方法
private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    //先嚐試本線程執行,不成功才走後續流程
    return (s = doExec()) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        //與上一個方法基本相同,但在當前線程是ForkJoinWorkerThread時不嘗試將該task移除棧並執行,而是等
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this, 0L) :
        externalAwaitDone();
}

到此終於能夠看一些公有對外方法了.有了前面的基礎,再看get,join,invoke等方法很是簡單.

//get方法還有get(long time)的變種.
public final V get() throws InterruptedException, ExecutionException {
    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
        //當前線程是ForkJoinWorkerThread則調用前面提過的doJoin方法.
        //不然調用前述externalInterruptibleAwaitDone
        doJoin() : externalInterruptibleAwaitDone();
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        //異常處理,取消的任務,拋出CancellationException.
        throw new CancellationException();
    if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
        //異常處理,調用getThrowableException獲取異常,封進ExecutionException.
        throw new ExecutionException(ex);
    //無異常處理,返回原始結果.
    return getRawResult();
}
//getRawResult默認爲一個抽象實現,在ForkJoinTask中,並未保存該結果的字段.
 public abstract V getRawResult();

//getThrowableException方法
private Throwable getThrowableException() {
    //不是異常標識,直接返回null,從方法名的字面意思看,要返回一個可拋出的異常.
    if ((status & DONE_MASK) != EXCEPTIONAL)
        return null;
    //系統哈希碼來定位ExceptionNode
    int h = System.identityHashCode(this);
    ExceptionNode e;
    final ReentrantLock lock = exceptionTableLock;
    //加異常表全局鎖
    lock.lock();
    try {
        //先清理已被回收的異常node,前面已述.
        expungeStaleExceptions();
        ExceptionNode[] t = exceptionTable;
        e = t[h & (t.length - 1)];
        //循環找出this匹配的異常node
        while (e != null && e.get() != this)
            e = e.next;
    } finally {
        lock.unlock();
    }
    Throwable ex;
    //前面找不出異常node或異常node中存放的異常爲null,則返回null
    if (e == null || (ex = e.ex) == null)
        return null;
    if (e.thrower != Thread.currentThread().getId()) {
        //不是當前線程拋出的異常.
        Class<? extends Throwable> ec = ex.getClass();
        try {
            Constructor<?> noArgCtor = null;//該異常的無參構造器
            Constructor<?>[] cs = ec.getConstructors();//該異常類公有構造器
            for (int i = 0; i < cs.length; ++i) {
                Constructor<?> c = cs[i];
                Class<?>[] ps = c.getParameterTypes();
                if (ps.length == 0)
                    //構建器參數列表長度0說明存在無參構造器,存放.
                    noArgCtor = c;
                else if (ps.length == 1 && ps[0] == Throwable.class) {
                    //發現有參構造器且參數長度1且第一個參數類型是Throwable,說明能夠存放cause.
                    //反射將前面取出的ex做爲參數,反射調用該構造器建立一個要拋出的Throwable.
                    Throwable wx = (Throwable)c.newInstance(ex);
                    //反射失敗,異常會被catch,返回ex,不然返回wx.
                    return (wx == null) ? ex : wx;
                }
            }
            if (noArgCtor != null) {
                //在嘗試了尋找有參無參構造器,並發現只存在無參構造器的狀況,用無參構造器初始化異常.
                Throwable wx = (Throwable)(noArgCtor.newInstance());
                if (wx != null) {
                    //將ex設置爲它的cause並返回它的實例.
                    wx.initCause(ex);
                    return wx;
                }
            }
        } catch (Exception ignore) {
            //此方法不可拋出異常,必定要成功返回.
        }
    }
    //有參無參均未成功,返回找到的異常.
    return ex;
}

//join公有方法
public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        //調用doJoin方法阻塞等待的結果不是NORMAL,說明有異常或取消.報告異常.
        reportException(s);
    //等於NORMAL,正常執行完畢,返回原始結果.
    return getRawResult();
}
//報告異常,可在前一步判斷執行status是否爲異常態,而後獲取並重拋異常.
private void reportException(int s) {
    //參數s必須用DONE_MASK處理掉前4位之後的位.
    if (s == CANCELLED)
        //傳入的狀態碼等於取消,拋出取消異常.
        throw new CancellationException();
    if (s == EXCEPTIONAL)
        //使用前面的getThrowableException方法獲取異常並從新拋出.
        rethrow(getThrowableException());
}

//invoke公有方法.
public final V invoke() {
    int s;
    //先嚐試執行
    if ((s = doInvoke() & DONE_MASK) != NORMAL)
        //doInvoke方法的結果status只保留完成態位表示非NORMAL,則報告異常.
        reportException(s);
    //正常完成,返回原始結果.
    return getRawResult();
}

終於,讀到此處的讀者將關鍵的方法線串了起來,前述的全部內部方法,常量和變量與公有接口的關係已經明瞭.

很顯然,ForkJoinTask是個抽象類,且它並未保存任務的完成結果,也不負責這個結果的處理,但聲明並約束了返回結果的抽象方法getRawResult供子類實現.

所以,ForkJoinTask的自身關注任務的完成/異常/未完成,子類關注這個結果的處理.

每當獲取到任務的執行狀態時,ForkJoinTask可根據status來判斷是不是異常/正常完成,並進入相應的處理邏輯,最終使用子類實現的方法完成一個閉環.

若是理解爲將ForkJoinTask和子類的有關代碼合併起來,在結果/完成狀態/異常信息這一塊,至關於同時有三個part在合做.

第一個part:status字段,它同時表示了未完成/正常完成/取消/異常完成等狀態,也同時告訴有關等待線程是否要喚醒其餘線程(每一個線程等待前會設置SIGNAL),同時留出了後面16位對付其餘狀況.

第二個part:result,在ForkJoinTask見不到它,也沒有相應的字段,子類也未必須要提供這個result字段,前面提到的CountedCompleter就沒有提供這個result,它的getRawResult會固定返回null.可是CountedCompleter能夠繼承子類並實現這個result的保存與返回(道格大神在註釋中舉出了若干典型代碼例子),在JAVA8中,stream api中的並行流也會保存每一步的計算結果,並對結果進行合併.

第三個part:異常.在ForkJoinTask中已經完成了全部異常處理流程和執行流程的定義,重點在於異常的存放,它是由ForkJoinTask的類變量進行存放的,結構爲數組+鏈表,且元素利用了弱引用,借gc幫助清除掉已經被回收的ExceptionNode,顯然在gc以前必須獲得使用.而異常隨時能夠發生並進行record入列,但相應的能消費掉這個異常的只有相應的外部的get,join,invoke等方法或者內部擴展了exec()等方式,獲得其餘線程執行的task異常結果的狀況.巧妙的是,只有外部調用者調用(get,invoke,join)時,這個異常信息才足夠重要,須要rethrow出去並保存關鍵的堆棧信息;而內部線程在訪問一些非自身執行的任務時,每每只須要status判斷是否異常便可,在exec()中fork新任務的,也每每必須當即join這些新的子任務,這就保證了可以及時獲得子任務中的異常堆棧(即便拿不到堆棧也知道它失敗了).

通過前面的論述,ForkJoinTask的執行和異常處理已經基本論結,可是,一個ForkJoinTask在建立以後是如何運行的?顯然,它不是一個Runnable,也不是Callable,不能直接submit或execute到普通的線程池.

臨時切換到ForkJoinPool的代碼,前面提到過,ForkJoinTask的官方定義就是能夠運行在ForkJoinPool中的task.

//ForkJoinPool代碼,submit一個ForkJoinTask到ForkJoinPool,並將該task自身返回.
//拿到返回的task,咱們就能夠進行前述的get方法了.
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}
//execute,不返回.相似普通線程池提交一個runnable的行爲.
public void execute(ForkJoinTask<?> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
}

顯然,若要使用一個自建的ForkJoinPool,可使用execute或submit函數提交入池,而後用前述的get方法和變種方法進行.這是一種運行task的方式.

前面論述過的invoke方法會先去先去嘗試本地執行,而後纔去等待,故咱們本身new一個ForkJoinTask,同樣能夠經過invoke直接執行,這是第二種運行task的方式.

前面論述的join方法在某種狀況下也是一種task的運行方式,在當前線程是ForkJoinWorkerThread時,會去嘗試將task出隊並doExec,也就是會先用本線程執行一次,不成功才幹等,非ForkJoinWorkerThread則直接乾等了.顯然咱們能夠本身構建一個ForkJoinWorkerThread並去join,這時會將任務出隊並執行(但存在一個問題:何時入隊).且出隊後若未執行成功,則awaitJoin(參考ForkJoinPool::awaitJoin),此時因任務已出隊,不會被竊取或幫助(在awaitJoin中會有helpStealer,但其實任務是當前線程本身"偷走"了),彷佛徹底要靠本身了.但並不表示ForkJoinTask子類沒法獲取這個已出隊的任務,好比CountedCompleter使用時,能夠在compute中新生成的Completer時,將源CountedCompleter(ForkJoinTask的子類)做爲新生成的CountedCountedCompleter的completer(該子類中的一個字段),這樣,如有一個ForkJoinWorkerThread竊取了這個新生成的CountedCompleter,能夠經過completer鏈表找到先前被出隊的CountedCompleter(ForkJoinTask).關於CountedCompleter單獨文章詳述.

除此以外呢?包含前面提到的使用join操做不是ForkJoinWorkerThread調用的狀況,不使用ForkJoinPool的submit execute入池,如何能讓一個ForkJoinTask在未來執行?咱們來看後面的方法.

//fork方法,將當前任務入池.
public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        //若是當前線程是ForkJoinWorkerThread,將任務壓入該線程的任務隊列.
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        //不然調用common池的externalPush方法入隊.
        ForkJoinPool.common.externalPush(this);
    return this;
}

顯然,咱們還能夠經過對一個ForkJoinTask進行fork方法入池,入哪一個池徹底取決於當前線程的類型.這是第四種讓任務能被運行的方式.

一樣,咱們也看到了第五種方式,ForkJoinPool.common其實就是一個常量保存的ForkJoinPool,它可以調用externalPush,咱們天然也能夠直接new一個ForkJoinPool,而後將當前task進行externalPush,字面意思外部壓入.這種辦法,非ForkJoinWorkerThread也能將任務提交到非common的ForkJoinPool.

從名字來看,ForkJoinTask彷佛已經說明了一切,按照官方的註釋也是如此.對一個task,先Fork壓隊,再Join等待執行結果,這是一個ForkJoinTask的執行週期閉環(但不要簡單理解爲生命週期,前面提到過,任務能夠被從新初始化,並且從新初始化時還會清空ExceptionNode數組上的已回收成員).

到此爲止,ForkJoinTask的核心函數和api已經基本瞭然,其它同類型的方法以及周邊的方法均不難理解,如invokeAll的各類變種.下面來看一些"周邊"類型的函數.有前述的基礎,它們很好理解.

//取消一個任務的執行,直接將status設置成CANCELLED,設置後判斷該status 是否爲CANCELLED,是則true不然false.
public boolean cancel(boolean mayInterruptIfRunning) {
    return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}

//判斷是否完成,status小於0表明正常完成/異常完成/取消,很好理解.
public final boolean isDone() {
    return status < 0;
}

//判斷當前任務是否取消.
public final boolean isCancelled() {
    //status前4位
    return (status & DONE_MASK) == CANCELLED;
}
public final boolean isCompletedAbnormally() {
    //是否爲異常完成,前面說過,CANCELLED和EXCEPTIONAL均小於NORMAL
    return status < NORMAL;
}
//是否正常完成.
public final boolean isCompletedNormally() {
    //完成態位等於NORMAL
    return (status & DONE_MASK) == NORMAL;
}
//獲取異常.
 public final Throwable getException() {
    int s = status & DONE_MASK;
    //當爲正常完成或未完成時,返回null.
    return ((s >= NORMAL)    ? null :
            //是取消時,新建一個取消異常.
            (s == CANCELLED) ? new CancellationException() :
            //不是取消,參考前面提到的getThrowableException.
            getThrowableException());
}
//使用異常完成任務.
 public void completeExceptionally(Throwable ex) {
    //參考前述的setExceptionalCompletion,
    //ex已是運行時異常或者Error,直接使用ex完成,如果受檢異常,包裝成運行時異常.
    setExceptionalCompletion((ex instanceof RuntimeException) ||
                             (ex instanceof Error) ? ex :
                            new RuntimeException(ex));
   }
//使用value完成任務.
public void complete(V value) {
    try {
        //設置原始結果,它是一個空方法.前面說過ForkJoinTask沒有維護result之類的結果字段,子類可自行發揮.
        setRawResult(value);
    } catch (Throwable rex) {
        //前述步驟出現異常,就用異常方式完成.
        setExceptionalCompletion(rex);
        return;
    }
    //前面的結果執行完,標記當前爲完成.
    setCompletion(NORMAL);
}
//安靜完成任務.直接用NORMAL setCompletion,沒什麼好說的.
public final void quietlyComplete() {
    setCompletion(NORMAL);
}

//安靜join,它不會返回result也不會拋出異常.處理集合任務時,若是須要全部任務都被執行而不是一個執行出錯(取消)其餘也跟着出錯的狀況下,
//很明顯適用,這不一樣於invokeAll,靜態方法invokeAll或invoke(ForkJoinTask,ForkJoinTask)會在任何一個任務出現異常後取消執行並拋出.
public final void quietlyJoin() {
    doJoin();
}

//安靜執行一次,不返回結果不拋出異常,沒什麼好說的.
public final void quietlyInvoke() {
    doInvoke();
}
//從新初臺化當前task
public void reinitialize() {
    if ((status & DONE_MASK) == EXCEPTIONAL)
        //若是當前任務是異常完成的,清除異常.該方法參考前面的論述.
        clearExceptionalCompletion();
    else
        //不然重置status爲0.
        status = 0;
}
//反fork.
public boolean tryUnfork() {
    Thread t;
    //當前線程是ForkJoinWorkerThread,從它的隊列嘗試移除.
    return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
            //當前線程不是ForkJoinWorkerThread,用common池外部移除.
            ForkJoinPool.common.tryExternalUnpush(this));
}

上面是一些簡單的周邊方法,大多並不須要再論述了,unfork方法很明顯在某些場景下不會成功,顯然,當一個任務剛剛入隊並未進行後續操做時,極可能成功.按前面所述,當對一個任務進行join時,可能會成功的彈出當前任務並執行,此時不可能再次彈出;當一個任務被其餘線程竊取或被它自己執行的也不會彈出.

再來看一些老朋友,在前面的文章"CompletableFuture和響應式編程"一文中,做者曾着重強調過它將每一個要執行的動做進行壓棧(未能當即執行的狀況),而棧中的元素Completion便是ForkJoinTask的子類,而標記該Completion是否被claim的方法和周邊方法以下:

//獲取ForkJoinTask的標記,返回結果爲short型
public final short getForkJoinTaskTag() {
    //status的後16位
    return (short)status;
}


//原子設置任務的標記位.
public final short setForkJoinTaskTag(short tag) {
    for (int s;;) {
        //不停循環地嘗試將status的後16位設置爲tag.
        if (U.compareAndSwapInt(this, STATUS, s = status,
                                //替換的結果,前16位爲原status的前16位,後16位爲tag.
                                (s & ~SMASK) | (tag & SMASK)))
            //返回被換掉的status的後16位.
            return (short)s;
    }
}


//循環嘗試原子設置標記位爲tag,前提是原來的標記位等於e,成功true失敗false
public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
    for (int s;;) {
        if ((short)(s = status) != e)
            //若是某一次循環的原標記位不是e,則返回false.
            return false;
        //同上個方法
        if (U.compareAndSwapInt(this, STATUS, s,
                                (s & ~SMASK) | (tag & SMASK)))
            return true;
    }
}

還記得CompletableFuture在異步執行Completion時要先claim嗎?claim方法中,會嘗試設置這個標記位.這是截止jdk8中CompletableFuture使用到ForkJoinTask的功能.

目前來看,在CompletableFuture的內部實現Completion尚未使用到ForkJoinTask的其餘屬性,好比放入一個ForkJoinPool執行(沒有任何前面總結的調用,好比用ForkJoinPool的push,execute,submit等,也沒有fork到common池).可是很明顯,道格大神令它繼承自ForkJoinTask不可能純粹只爲了使用區區一個標記位,試想一下,在如此友好支持響應式編程的CompletableFuture中傳入的每個action均可以生成若干新的action,那麼CompletableFuture負責將這些action封裝成Completion放入ForkJoinPool執行,將最大化利用到ForkJoin框架的工做竊取和外部幫助的功效,強力結合分治思想,這將是多麼優雅的設計.或者在jdk9-12中已經出現了相應的Completion實現(儘管做者寫過JAVA9-12,遺憾的是也沒有去翻它們的源碼).

另外,儘管Completion的衆多子類也沒有result之類的表示結果的字段,但它的一些子類經過封裝,實際上間接地將這個Completion所引用的dep的result做爲了本身的"result",固然,getRawResult依舊是null,可是理念倒是相通的.

以上是ForkJoinTask的部分核心源碼,除了上述的源碼外,還有一些同屬於ForkJoinTask的核心源碼部分,好比其餘的public方法(參考join fork invoke 便可),一些利用ForkJoinPool的實現,要深刻了解ForkJoinPool才能瞭解的方法,一些不太難的靜態方法等,這些沒有必要論述了.

除了核心源碼外,ForkJoinTask也提供了對Runnable,Callable的適配器實現,這塊很好理解,簡單看一看.

//對Runnable的實現,若是在ForkJoinPool中提交一個runnable,會用它封裝成ForkJoinTask
static final class AdaptedRunnable<T> extends ForkJoinTask<T>
    implements RunnableFuture<T> {
    final Runnable runnable;
    T result;
    AdaptedRunnable(Runnable runnable, T result) {
        //不能沒有runnable
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
        //對runnable作適配器時,能夠提交將結果傳入,並設置爲當前ForkJoinTask子類的result.
        //前面說過,ForkJoinTask不以result做爲完成標記,判斷一個任務是否完成或異常,使用status足以,
        //返回的結果才使用result.
        this.result = result; 
    }
    public final T getRawResult() { return result; }
    public final void setRawResult(T v) { result = v; }
    //前面說過提交入池的ForkJoinTask最終會運行doExec,而它會調用exec,此處會調用run.
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;//序列化用
}

//無結果的runnable適配器
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
    implements RunnableFuture<Void> {
    final Runnable runnable;
    AdaptedRunnableAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    //區別就是result固定爲null,也不能set
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;
}


//對runnable的適配器,但強制池中的工做線程在執行任務發現異常時拋出
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
    final Runnable runnable;
    RunnableExecuteAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    //默認null結果,set也是空實現
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    void internalPropagateException(Throwable ex) {
        //前面說過doExec會被執行,它會調exec並catch,在catch塊中設置當前任務爲異常完成態,
        //而後調用internalPropagateException方法,而在ForkJoinTask中默認爲空實現.
        //此處將異常從新拋出,將形成worker線程拋出異常.
        rethrow(ex);
    }
    private static final long serialVersionUID = 5232453952276885070L;
}


//對callable的適配器,當將callable提交至ForkJoinPool時使用.
static final class AdaptedCallable<T> extends ForkJoinTask<T>
    implements RunnableFuture<T> {
    final Callable<? extends T> callable;
    T result;
    AdaptedCallable(Callable<? extends T> callable) {
        if (callable == null) throw new NullPointerException();
        this.callable = callable;
    }
    //字段中有一個result,直接使用它返回.
    public final T getRawResult() { return result; }
    //result可外部直接設置.
    public final void setRawResult(T v) { result = v; }
    public final boolean exec() {
        try {
            //默認的result用call函數設置.
            result = callable.call();
            return true;
        
        } catch (Error err) {
            //catch住Error,拋出
            throw err;
        } catch (RuntimeException rex) {
            //catch住運行時異常,拋出
            throw rex;
        } catch (Exception ex) {
            //catch住受檢異常,包裝成運行時異常拋出.
            throw new RuntimeException(ex);
        }
    }
    //run方法同樣只是調用invoke,進而調用doExec.
    public final void run() { invoke(); }
    private static final long serialVersionUID = 2838392045355241008L;
}

//runnable生成適配器的工具方法
public static ForkJoinTask<?> adapt(Runnable runnable) {
    return new AdaptedRunnableAction(runnable);
}

//指定結果設置runnable的適配器工具方法
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
    return new AdaptedRunnable<T>(runnable, result);
}

//對callable生成適配器的方法.
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
    return new AdaptedCallable<T>(callable);
}

以上的代碼都不復雜,只要熟悉了ForkJoinTask的自己代碼結構,對於這一塊瞭解很是容易,這也間接說明了ForkJoinPool中是如何處理Runnable和Callable的(由於ForkJoinPool自己也是一種線程池,能夠接受提交Callable和Runnable).

將runnable提交到pool時,能夠指定result,也能夠不指定,也能夠用submit或execute方法區分異常處理行爲,ForkJoinPool會自行選擇相應的適配器.

將callable 提交到pool時,pool會選擇對callable的適配器,它的結果將爲task的結果,它的異常將爲task的異常.

到此爲止,ForkJoinTask的源碼分析完成.

後語

本文詳細分析了ForkJoinTask的源碼,並解釋了前文CompletableFuture中Completion與它的關聯,以及分析了Completion繼承自ForkJoinTask目前已帶來的功能利用(tag)和未來可能增長的功用(一個Completion產生若干多個Completion並在ForkJoinPool中運行,還支持工做竊取).

同時本文也對ForkJoinPool和ForkJoinWorkerThread,以及CountedCompleter和Stream api中的並行流進行了略微的描述.

在文章的最後,或許有一些新手讀者會好奇,咱們究竟何時會使用ForkJoinTask?

首先,若是你在項目中大肆使用了流式計算,並使用了並行流,那麼你已經在使用了.

前面提過,官方解釋ForkJoinTask能夠視做比線程輕量許多的實體,也是輕量的Future.結合在源碼中時不時出來秀存在感的ForkJoinWorkerThread,顯然它就是聽說比普通線程輕量一些的線程,在前面的源碼中能夠看出,它維護了一組任務的隊列,每一個線程負責完成隊列中的任務,也能夠偷其餘線程的任務,甚至池外的線程均可以時不時地來個join,順便幫助出隊執行任務.

顯然,對於重計算,輕io,輕阻塞的任務,適合使用ForkJoinPool,也就使用了ForkJoinTask,你不會認爲它能夠提交runnable和callable,就能夠不用ForkJoinTask了吧?前面的適配器ForkJoinPool在這種狀況下必用的,能夠去翻相應的源碼.

本章沒有去詳述CountedCompleter,但前面論述時說過,你能夠在exec()中將一個計算複雜的任務拆解爲小的子任務,而後將子任務入池執行,父任務合併子任務的結果.這種分治的算法此前基本是在單線程模式下運行,使用ForkJoinTask,則能夠將這種計算交給一個ForkJoinPool中的全部線程並行執行.

相關文章
相關標籤/搜索