點贊再看,養成習慣,公衆號搜一搜【一角錢技術】關注更多原創技術文章。本文 GitHub org_hejianhui/JavaStudy 已收錄,有個人系列文章。java
前面咱們介紹了線程池框架(ExecutorService)的兩個具體實現:git
線程池爲線程生命週期的開銷和資源不足問題提供瞭解決方案。經過對多個任務重用線程,線程建立的開銷被分攤到多個任務上。Java7 又提供了的一個用於並行執行的任務的框架 Fork/Join ,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。在介紹Fork/Join 框架以前咱們先了解幾個概念:CPU密集型、IO密集型,再逐步深刻去認識Fork/Join 框架。github
CPU密集型也叫計算密集型,指的是系統的硬盤、內存性能相對於CPU要好很好多,此時,系統運做大部分的情況是 CPU Loading 100%,CPU要讀/寫 I/O(硬盤/內存),I/O在很短的時間就能夠完成,而CPU還有許多運算要處理,CPU Loading很高。web
在多重程序系統中,大部分時間用來作計算、邏輯判斷等CPU動做的程序稱之 CPU bound。例如一個計算圓周率至小數點一千位如下的程序,在執行的過程中絕大部分時間在用三角函數和開根號的計算,即是屬於CPU bound的程序。算法
CPU bound的程序通常而言CPU佔用率至關高。這多是由於任務自己不太須要訪問I/O設備,也多是由於程序是多線程實現所以屏蔽了等待I/O的時間。編程
線程數通常設置爲:線程數 = CPU核數 + 1(現代CPU支持超線程)數組
I/O密集型指的是系統的CPU性能相對硬盤、內存要好不少,此時,系統運做,大部分的情況是 CPU 在等 I/O(硬盤/內存)的讀/寫操做,此時 CPU Loading 並不高。markdown
I/O bound的程序通常在達到性能極限時,CPU佔用率仍然較低。這多是由於任務自己須要大量I/O操做,而 pipeline 作的不是很好,沒有充分利用處理器能力。網絡
線程數通常設置爲:線程數 = ((線程等待時間 + 線程CPU時間) / 線程CPU時間) * CPU數目數據結構
咱們能夠把任務分爲計算密集型和I/O密集型。
計算密集型任務的特色是要進行大量的計算,消耗CPU資源,好比計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也能夠用多任務完成,可是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,因此,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。
計算密集型任務因爲主要消耗CPU資源,所以,代碼運行效率相當重要。Python這樣的腳本語言運行效率很低,徹底不適合計算密集型任務。對於計算密集型任務,最好用C語言編寫。
第二種任務的類型是I/O密集型,涉及到網絡、磁盤I/O的任務都是I/O密集型任務,這類任務的特色是CPU消耗不多,任務的大部分時間都在等待I/O操做完成(由於I/O的速度遠遠低於CPU和內存的速度)。對於I/O密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是I/O密集型任務,好比Web應用。
I/O密集型任務執行期間,99%的時間都花在I/O上,花在CPU上的時間不多,所以,用運行速度極快的C語言替換用Python這樣運行速度極低的腳本語言,徹底沒法提高運行效率。對於I/O密集型任務,最合適的語言就是開發效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。
Fork/Join 框架是 Java7 提供了的一個用於並行執行的任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。
Fork 就是把一個大任務切分爲若干個子任務並行的執行,Join 就是合併這些子任務的執行結果,最後獲得這個大任務的結果。好比計算 1+2+......+10000,能夠分割成10個子任務,每一個子任務對1000個數進行求和,最終彙總這10個子任務的結果。以下圖所示: Fork/Join的特性:
關於「分而治之」的算法,能夠查看《分治、回溯的實現和特性》
工做竊取(work-stealing)算法 是指某個線程從其餘隊列裏竊取任務來執行。
咱們須要作一個比較大的任務,咱們能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,因而把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,好比A線程負責處理A隊列裏的任務。
可是有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
工做竊取算法的優勢是充分利用線程進行並行計算,並減小了線程間的競爭,其缺點是在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且消耗了更多的系統資源,好比建立多個線程和多個雙端隊列。
使用場景示例
定義fork/join任務,以下示例,隨機生成2000w條數據在數組當中,而後求和_
package com.niuh.forkjoin.recursivetask;
import java.util.concurrent.RecursiveTask;
/** * RecursiveTask 並行計算,同步有返回值 * ForkJoin框架處理的任務基本都能使用遞歸處理,好比求斐波那契數列等,但遞歸算法的缺陷是: * 一隻會只用單線程處理, * 二是遞歸次數過多時會致使堆棧溢出; * ForkJoin解決了這兩個問題,使用多線程併發處理,充分利用計算資源來提升效率,同時避免堆棧溢出發生。 * 固然像求斐波那契數列這種小問題直接使用線性算法搞定可能更簡單,實際應用中徹底不必使用ForkJoin框架, * 因此ForkJoin是核彈,是用來對付你們夥的,好比超大數組排序。 * 最佳應用場景:多核、多內存、能夠分割計算再合併的計算密集型任務 */
class LongSum extends RecursiveTask<Long> {
//任務拆分的最小閥值
static final int SEQUENTIAL_THRESHOLD = 1000;
static final long NPS = (1000L * 1000 * 1000);
static final boolean extraWork = true; // change to add more than just a sum
int low;
int high;
int[] array;
LongSum(int[] arr, int lo, int hi) {
array = arr;
low = lo;
high = hi;
}
/** * fork()方法:將任務放入隊列並安排異步執行,一個任務應該只調用一次fork()函數,除非已經執行完畢並從新初始化。 * tryUnfork()方法:嘗試把任務從隊列中拿出單獨處理,但不必定成功。 * join()方法:等待計算完成並返回計算結果。 * isCompletedAbnormally()方法:用於判斷任務計算是否發生異常。 */
protected Long compute() {
if (high - low <= SEQUENTIAL_THRESHOLD) {
long sum = 0;
for (int i = low; i < high; ++i) {
sum += array[i];
}
return sum;
} else {
int mid = low + (high - low) / 2;
LongSum left = new LongSum(array, low, mid);
LongSum right = new LongSum(array, mid, high);
left.fork();
right.fork();
long rightAns = right.join();
long leftAns = left.join();
return leftAns + rightAns;
}
}
}
複製代碼
執行fork/join任務
package com.niuh.forkjoin.recursivetask;
import com.niuh.forkjoin.utils.Utils;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class LongSumMain {
//獲取邏輯處理器數量
static final int NCPU = Runtime.getRuntime().availableProcessors();
/** * for time conversion */
static final long NPS = (1000L * 1000 * 1000);
static long calcSum;
static final boolean reportSteals = true;
public static void main(String[] args) throws Exception {
int[] array = Utils.buildRandomIntArray(2000000);
System.out.println("cpu-num:" + NCPU);
//單線程下計算數組數據總和
long start = System.currentTimeMillis();
calcSum = seqSum(array);
System.out.println("seq sum=" + calcSum);
System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
//採用fork/join方式將數組求和任務進行拆分執行,最後合併結果
LongSum ls = new LongSum(array, 0, array.length);
ForkJoinPool fjp = new ForkJoinPool(NCPU); //使用的線程數
ForkJoinTask<Long> task = fjp.submit(ls);
System.out.println("forkjoin sum=" + task.get());
System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));
if (task.isCompletedAbnormally()) {
System.out.println(task.getException());
}
fjp.shutdown();
}
static long seqSum(int[] array) {
long sum = 0;
for (int i = 0; i < array.length; ++i) {
sum += array[i];
}
return sum;
}
}
複製代碼
Fork/Join 其實就是指由ForkJoinPool做爲線程池、ForkJoinTask(一般實現其三個抽象子類)爲任務、ForkJoinWorkerThread做爲執行任務的具體線程實體這三者構成的任務調度機制。
ForkJoinWorkerThread 直接繼承了Thread,可是僅僅是爲了增長一些額外的功能,並無對線程的調度執行作任何更改。 ForkJoinWorkerThread 是被ForkJoinPool管理的工做線程,在建立出來以後都被設置成爲了守護線程,由它來執行ForkJoinTasks。該類主要爲了維護建立線程實例時經過ForkJoinPool爲其建立的任務隊列,與其餘兩個線程池整個線程池只有一個任務隊列不一樣,ForkJoinPool管理的全部工做線程都擁有本身的工做隊列,爲了實現任務竊取機制,該隊列被設計成一個雙端隊列,而ForkJoinWorkerThread的首要任務就是執行本身的這個雙端任務隊列中的任務,其次是竊取其餘線程的工做隊列,如下是其代碼片斷:
public class ForkJoinWorkerThread extends Thread {
// 這個線程工做的ForkJoinPool池
final ForkJoinPool pool;
// 這個線程擁有的工做竊取機制的工做隊列
final ForkJoinPool.WorkQueue workQueue;
//建立在給定ForkJoinPool池中執行的ForkJoinWorkerThread。
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
//向ForkJoinPool執行池註冊當前工做線程,ForkJoinPool爲其分配一個工做隊列
this.workQueue = pool.registerWorker(this);
}
//該工做線程的執行內容就是執行工做隊列中的任務
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue); //執行工做隊列中的任務
} catch (Throwable ex) {
exception = ex; //記錄異常
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception); //撤銷工做
}
}
}
}
.....
}
複製代碼
ForkJoinTask :與FutureTask同樣, ForkJoinTask也是Future的子類,不過它是一個抽象類。 ForkJoinTask :咱們要使用 ForkJoin 框架,必須首先建立一個 ForkJoin 任務。它提供在任務中執行 fork() 和 join() 操做的機制,一般狀況下咱們不須要直接繼承 ForkJoinTask 類,而只須要繼承它的子類,Fork/Join框架提供類如下幾個子類:
ForkJoinTask 有一個int類型的status字段:
任務未完成以前status大於等於0,完成以後就是NORMAL、CANCELLED或EXCEPTIONAL這幾個小於0的值,這幾個值也是按大小順序的:0(初始狀態) > NORMAL > CANCELLED > EXCEPTIONAL.
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/** 該任務的執行狀態 */
volatile int status; // accessed directly by pool and workers
static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
static final int SIGNAL = 0x00010000; // must be >= 1 << 16
static final int SMASK = 0x0000ffff; // short bits for tags
// 異常哈希表
//被任務拋出的異常數組,爲了報告給調用者。由於異常不多見,因此咱們不直接將它們保存在task對象中,而是使用弱引用數組。注意,取消異常不會出如今數組,而是記錄在statue字段中
//注意這些都是 static 類屬性,全部的ForkJoinTask共用的。
private static final ExceptionNode[] exceptionTable; //異常哈希鏈表數組
private static final ReentrantLock exceptionTableLock;
private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收以後,相應的異常節點對象的引用隊列
/** * 固定容量的exceptionTable. */
private static final int EXCEPTION_MAP_CAPACITY = 32;
//異常數組的鍵值對節點。
//該哈希鏈表數組使用線程id進行比較,該數組具備固定的容量,由於它只維護任務異常足夠長,以便參與者訪問它們,因此在持續的時間內不該該變得很是大。可是,因爲咱們不知道最後一個joiner什麼時候完成,咱們必須使用弱引用並刪除它們。咱們對每一個操做都這樣作(所以徹底鎖定)。此外,任何ForkJoinPool池中的一些線程在其池變爲isQuiescent時都會調用helpExpungeStaleExceptions
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
final Throwable ex;
ExceptionNode next;
final long thrower; // 拋出異常的線程id
final int hashCode; // 在弱引用消失以前存儲hashCode
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收以後,會將該節點加入隊列exceptionTableRefQueue
this.ex = ex;
this.next = next;
this.thrower = Thread.currentThread().getId();
this.hashCode = System.identityHashCode(task);
}
}
.................
}
複製代碼
除了status記錄任務的執行狀態以外,其餘字段主要是爲了對任務執行的異常的處理,ForkJoinTask採用了哈希數組 + 鏈表的數據結構(JDK8之前的HashMap實現方法)存放全部(由於這些字段是static)的ForkJoinTask任務的執行異常。
fork() 作的工做只有一件事,既是把任務推入當前工做線程的工做隊列裏(安排任務異步執行)。能夠參看如下的源代碼:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
複製代碼
該方法其實就是將任務經過push方法加入到當前工做線程的工做隊列或者提交隊列(外部非ForkJoinWorkerThread線程經過submit、execute方法提交的任務),等待被線程池調度執行,這是一個非阻塞的當即返回方法。
這裏須要知道,ForkJoinPool線程池經過哈希數組+雙端隊列的方式將全部的工做線程擁有的任務隊列和從外部提交的任務分別映射到哈希數組的不一樣槽位上。
join() 的工做則複雜得多,也是 join() 可使得線程免於被阻塞的緣由——不像同名的 Thread.join()。
將上述流程畫成序列圖的話就是這個樣子:
源代碼以下:
//當計算完成時返回計算結果。此方法與get()的不一樣之處在於,異常完成會致使RuntimeException或Error,而不是ExecutionException,調用線程被中斷不會經過拋出InterruptedException致使方法忽然返回。
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s); //非正常結束,拋出相關的異常堆棧信息
return getRawResult(); //正常結束,返回結果
}
//等待任務執行結束並返回其狀態status,該方法實現了join, get, quietlyJoin. 直接處理已經完成的,外部等待和unfork+exec的狀況,其它狀況轉發到ForkJoinPool.awaitJoin
//若是 status < 0 則返回s;
//不然,若不是ForkJoinWorkerThread ,則等待 externalAwaitDone() 返回
//不然,若 (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 則 返回s;
//不然,返回 wt.pool.awaitJoin(w, this, 0L)
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s : //status爲負數表示任務已經執行結束,直接返回status。
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s : //調用pool的執行邏輯,並等待返回執行結果狀態
wt.pool.awaitJoin(w, this, 0L) : //調用pool的等待機制
externalAwaitDone(); //不是ForkJoinWorkerThread,
}
//拋出與給定狀態關聯的異常(若是有),被取消是CancellationException。
private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}
public abstract V getRawResult();
//返回給定任務的執行異常(若是有的話),爲了提供準確的異常堆棧信息,若異常不是由當前線程拋出的,將嘗試以記錄的異常爲緣由建立一個與拋出異常類型相同的新異常。
//若是沒有那樣的構造方法將嘗試使用無參的構造函數,並經過設置initCause方法以達到一樣的效果,儘管它可能包含誤導的堆棧跟蹤信息。
private Throwable getThrowableException() {
if ((status & DONE_MASK) != EXCEPTIONAL)
return null;
//1. 經過當前任務對象的哈希值到哈希鏈表數組中找到相應的異常節點
int h = System.identityHashCode(this); //當前任務的hash值
ExceptionNode e;
final ReentrantLock lock = exceptionTableLock;
lock.lock(); //加鎖
try {
expungeStaleExceptions(); //清理被GC回收的任務的異常節點
ExceptionNode[] t = exceptionTable;
e = t[h & (t.length - 1)]; //經過取模對應得索引獲取哈希數組槽位中得節點
while (e != null && e.get() != this)
e = e.next; //遍歷找到當前任務對應的異常節點
} finally {
lock.unlock();
}
Throwable ex;
if (e == null || (ex = e.ex) == null) //表示沒有出現任何異常
return null;
if (e.thrower != Thread.currentThread().getId()) { //有異常可是不是由當前線程拋出的
Class<? extends Throwable> ec = ex.getClass();
try {
Constructor<?> noArgCtor = null;
Constructor<?>[] cs = ec.getConstructors();// public ctors only
//經過反射找到構造方法,並構造新異常
for (int i = 0; i < cs.length; ++i) {
Constructor<?> c = cs[i];
Class<?>[] ps = c.getParameterTypes();
if (ps.length == 0)
noArgCtor = c; //記錄下無參構造方法,以備沒有找到指望的構造方法時使用
else if (ps.length == 1 && ps[0] == Throwable.class) {
Throwable wx = (Throwable)c.newInstance(ex); //發現了咱們指望的Throwable類型的參數的構造方法
return (wx == null) ? ex : wx;
}
}
if (noArgCtor != null) { //沒有找到指望的構造方法,只能經過無參構造方法建立新異常
Throwable wx = (Throwable)(noArgCtor.newInstance());
if (wx != null) {
wx.initCause(ex); //將原始異常設置進去
return wx;
}
}
} catch (Exception ignore) {
}
}
return ex;
}
//清除哈希鏈表數組中已經被GC回收掉的任務的異常節點。從exceptionTableRefQueue節點引用隊列中獲取異常節點並移除哈希鏈表數組中得對應節點
private static void expungeStaleExceptions() {
for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
if (x instanceof ExceptionNode) {
int hashCode = ((ExceptionNode)x).hashCode; //節點hash
ExceptionNode[] t = exceptionTable;
int i = hashCode & (t.length - 1); //取模獲得哈希表索引
ExceptionNode e = t[i];
ExceptionNode pred = null;
while (e != null) {
ExceptionNode next = e.next;
if (e == x) { //找到了目標節點
if (pred == null)
t[i] = next;
else
pred.next = next;
break;
}
pred = e; //日後遍歷鏈表
e = next;
}
}
}
}
//竊取任務的主要執行方法,除非已經完成了,不然調用exec()並記錄完成時的狀態。
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) { //任務還未完成
try {
completed = exec(); 調用exec()並記錄完成時的狀態。
} catch (Throwable rex) {
return setExceptionalCompletion(rex); //記錄異常並返回相關狀態,並喚醒經過join等待此任務的線程。
}
if (completed)
s = setCompletion(NORMAL); //更新狀態爲正常結束,並喚醒經過join等待此任務的線程。
}
return s;
}
//當即執行此任務的基本操做。返回true表示該任務已經正常完成,不然返回false表示此任務不必定完成(或不知道是否完成)。
//此方法還可能拋出(未捕獲的)異常,以指示異常退出。此方法旨在支持擴展,通常不該以其餘方式調用。
protected abstract boolean exec();
//等待未完成的非ForkJoinWorkerThread線程提交的任務執行結束,並返回任務狀態status
private int externalAwaitDone() {
//如果CountedCompleter任務,等待ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>)this, 0) 返回
//不然,若ForkJoinPool.common.tryExternalUnpush(this),返回 doExec() 結果;
//不然,返回0
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) : //輔助完成外部提交的CountedCompleter任務
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); //輔助完成外部提交的非CountedCompleter任務
if (s >= 0 && (s = status) >= 0) { //表示任務還沒結束,須要阻塞等待。
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //標記有線程須要被喚醒
synchronized (this) {
if (status >= 0) {
try {
wait(0L); //任務還沒結束,無限期阻塞直到被喚醒
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll(); //已經結束了喚醒全部阻塞的線程
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt(); //恢復中斷標識
}
return s;
}
//記錄異常,更新status狀態,喚醒全部等待線程
private int setExceptionalCompletion(Throwable ex) {
int s = recordExceptionalCompletion(ex);
if ((s & DONE_MASK) == EXCEPTIONAL)
internalPropagateException(ex); //調用鉤子函數傳播異常
return s;
}
/** * 對任務異常結束的異常傳播支持的鉤子函數 */
void internalPropagateException(Throwable ex) {
}
//記錄異常並設置狀態status
final int recordExceptionalCompletion(Throwable ex) {
int s;
if ((s = status) >= 0) {
int h = System.identityHashCode(this); //哈希值
final ReentrantLock lock = exceptionTableLock;
lock.lock(); //加鎖
try {
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
int i = h & (t.length - 1);
for (ExceptionNode e = t[i]; ; e = e.next) {
if (e == null) { //遍歷完了都沒找到,說明哈希鏈表數組中不存在該任務對於的異常節點
t[i] = new ExceptionNode(this, ex, t[i]); //建立一個異常節點用頭插法插入哈希鏈表數組
break;
}
if (e.get() == this) // 哈希鏈表數組中已經存在相應的異常節點,退出
break;
}
} finally {
lock.unlock();
}
s = setCompletion(EXCEPTIONAL);
}
return s;
}
//標記任務完成標誌,並喚醒經過join等待此任務的線程。
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { //更新狀態
if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); } //喚醒全部等待線程
return completion;
}
}
}
複製代碼
既然ForkJoinTask也是Future的子類,那麼Future最重要的獲取異步任務結果的get方法也必然要實現:
//若是須要,等待計算完成,而後檢索其結果。
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : //是ForkJoinWorkerThread,執行doJoin
externalInterruptibleAwaitDone(); //執行externalInterruptibleAwaitDone
Throwable ex;
if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException(); //被取消的拋出CancellationException
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex); //執行中出現異常的拋出相應的異常
return getRawResult(); //返回正常結果
}
//阻塞非ForkJoinWorkerThread線程,直到完成或中斷。
private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
if (Thread.interrupted())
throw new InterruptedException();
if ((s = status) >= 0 &&
(s = ((this instanceof CountedCompleter) ?
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
0)) >= 0) { //根據不一樣的任務類型 返回執行或暫時等待被執行的狀態
while ((s = status) >= 0) { //須要阻塞等待
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
wait(0L); //阻塞等待
else
notifyAll(); //喚醒全部等待線程
}
}
}
}
return s;
}
複製代碼
get方法也是經過實現join方法的doJoin方法實現的,不一樣的是,調用get方法的線程若是被中斷的話,get方法會當即拋出InterruptedException異常,而join方法則不會;另外任務異常完成的的相關異常,get方法會將相關異常都封裝成ExecutionException異常,而join方法則是原樣拋出相關的異常不會被封裝成ExecutionException異常。get方法採用的wait/notifyAll這種線程通訊機制來實現阻塞與喚醒。另外還有超時版本的get方法也相似,因而可知get支持可中斷和/或定時等待完成。
//開始執行此任務,若是須要等待其完成,並返回其結果,若是底層執行此任務時出現異常,則拋出相應的(未捕獲的)RuntimeException或Error。
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
// invoke, quietlyInvoke的實現
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s : //執行此任務,完成返回其status
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //若未完成或須要等待就根據不一樣任務類型執行不一樣的等待邏輯
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
複製代碼
invoke的實現會利用當前調用invoke的線程當即執行exec方法,固然若是exec方法的實現使用了fork/join,其仍是會利用ForkJoinPool線程池的遞歸調度執行策略,等待子任務執行完成,一步步的合併成最終的任務結果,並返回。值得注意的是,該方法不會由於線程被中斷而當即返回,而必須在等到任務執行有告終果以後纔會對中斷狀態進行補償。
//執行兩個任務
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork(); //t2任務交給線程池調度執行
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) //t1任務當即由當前線程執行
t1.reportException(s1); //若t1異常結束,則拋出異常,包括被取消的CancellationException
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) //等待t2執行結束
t2.reportException(s2); //若t2異常結束,則拋出異常,包括被取消的CancellationException
}
//執行任務數組
public static void invokeAll(ForkJoinTask<?>... tasks) {
Throwable ex = null;
int last = tasks.length - 1;
for (int i = last; i >= 0; --i) {
ForkJoinTask<?> t = tasks[i];
if (t == null) {
if (ex == null) //都不能爲null
ex = new NullPointerException();
}
else if (i != 0)
t.fork(); //除了第一個任務都交給線程池調度執行
else if (t.doInvoke() < NORMAL && ex == null) //由當前線程執行第一個任務
ex = t.getException(); //記錄第一個任務的異常
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = tasks[i];
if (t != null) {
if (ex != null) //第一個任務異常結束,取消其餘全部任務
t.cancel(false);
else if (t.doJoin() < NORMAL) //有任務異常結束,記錄異常
ex = t.getException();
}
}
if (ex != null)
rethrow(ex); //如有任務異常結束,拋出數組最前面那個異常結束的任務的異常
}
//批量執行任務,返回每一個任務對應的ForkJoinTask實例,
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()])); //將任務封裝成ForkJoinTask,調用上面那個方法實現
return tasks;
}
//下面的邏輯與上面那個invokeAll也是同樣的。
@SuppressWarnings("unchecked")
List<? extends ForkJoinTask<?>> ts = (List<? extends ForkJoinTask<?>>) tasks;
Throwable ex = null;
int last = ts.size() - 1;
for (int i = last; i >= 0; --i) {
ForkJoinTask<?> t = ts.get(i);
if (t == null) {
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = ts.get(i);
if (t != null) {
if (ex != null)
t.cancel(false);
else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
if (ex != null)
rethrow(ex);
return tasks;
}
複製代碼
批量任務的執行其實現都是排在前面的任務(只有兩個參數是,第一個參數就是排在前面的任務,是數組或者隊列時,索引越小的就是排在越前面的)由當前線程執行,後面的任務交給線程池調度執行,若是有多個任務都出現異常,只會拋出排在最前面那個任務的異常。
public final void quietlyJoin() {
doJoin();
}
public final void quietlyInvoke() {
doInvoke();
}
複製代碼
quietlyInvoke(),quietlyJoin()這兩個方法就僅僅了是調用了doInvoke和doJoin,而後就沒有而後了,它們就是不關心執行結果版本的invoke和Join,固然異常結束的也不會將異常拋出來,當執行一組任務而且須要將結果或異常的處理延遲到所有任務完成時,這可能頗有用。
public boolean cancel(boolean mayInterruptIfRunning) {
return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
複製代碼
其主要經過setCompletion標記還沒有完成的任務的狀態爲CANCELLED,並喚醒經過join等待此任務的線程。已經執行完成的任務沒法被取消,返回true表示取消成功。注意該方法傳入的mayInterruptIfRunning並無使用,所以,ForkJoinTask不支持在取消任務時中斷已經開始執行的任務,固然ForkJoinTask的子類能夠重寫實現。
//取消任務的執行計劃。若是此任務是當前線程最近纔剛剛經過fork安排執行,而且還沒有在另外一個線程中開始執行,則此方法一般會成功,但也不是100%保證會成功。
public boolean tryUnfork() {
Thread t;
return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : //針對ForkJoinWorkerThread的取消邏輯
ForkJoinPool.common.tryExternalUnpush(this)); //針對外部提交任務的取消邏輯
}
複製代碼
tryUnfork嘗試將該任務從任務隊列中彈出,彈出以後線程池天然不會再調度該任務。該方法的實現只會在任務剛剛被推入任務隊列,而且還處於任務隊列的棧頂時纔可能會成功,不然100%失敗。
public void reinitialize() {
if ((status & DONE_MASK) == EXCEPTIONAL) //有異常
clearExceptionalCompletion(); //從哈希鏈表數組中移除當前任務的異常節點,並將status重置爲0
else
status = 0;
}
複製代碼
若是任務異常結束,會從異常哈希表中清除該任務的異常記錄,該方法僅僅是將任務狀態status重置爲0,使得該任務能夠被從新執行。
任務的執行狀態能夠在多個級別上查詢:
ForkJoinTask 在執行的時候可能會拋出異常,可是咱們沒辦法在主線程裏直接捕獲異常,因此 ForkJoinTask 提供了 isCompletedAbnormally() 方法來檢查任務是否已經拋出異常或已經被取消了,而且能夠經過 ForkJoinTask 的 getException 方法獲取異常。示例以下:
if(task.isCompletedAbnormally()){
System.out.println(task.getException());
}
複製代碼
getException 方法返回 Throwable 對象,若是任務被取消了則返回CancellationException。若是任務沒有完成或者沒有拋出異常則返回 null。
adapt方法主要是爲了兼容傳統的Runnable和Callable任務,經過adapt方法能夠將它們封裝成ForkJoinTask任務,當將 ForkJoinTask 與其餘類型的任務混合執行時,可使用這些方法。
getPool能夠返回執行該任務的線程所在的線程池實例,inForkJonPool能夠斷定當前任務是不是由ForkJoinWorkerThread線程提交的,通常來講這意味着當前任務是內部拆分以後的子任務。
getQueuedTaskCount方法返回已經經過fork安排給當前工做線程執行,但尚未被執行的任務數量,該值是一個瞬間值。由於工做線程調度執行的任務經過fork提交的任務仍是進入的該工做線程的任務隊列,所以能夠經過該任務得知該值。
其它一些方法:
//可能會在承載當前任務的執行池處於靜默(空閒)狀態時執行任務。這個方法可能在有不少任務都經過fork被安排執行,可是一個顯示的join調用都沒有,直到它們都被執行完的設計中使用。
//其實就是若是有一批任務被安排執行,而且不知道它們何時結束,若是但願在這些任務都執行結束以後再安排一個任務,就可使用helpQuiesce。
public static void helpQuiesce() {
Thread t;
//根據執行線程的不一樣類型,調用不一樣的靜默執行邏輯
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
wt.pool.helpQuiescePool(wt.workQueue);
}
else
ForkJoinPool.quiesceCommonPool();
}
//返回被當前工做線程持有的任務數a比其它可能竊取其任務的其它工做線程持有的任務數b多多少的估計值,就是 a - b 的差值。若當前工做線程不是在ForkJoinPool中,則返回0
//一般該值被恆定在一個很小的值3,若超過這個閾值,則就在本地處理。
public static int getSurplusQueuedTaskCount() {
return ForkJoinPool.getSurplusQueuedTaskCount();
}
//獲取但不移除(即不取消執行計劃)安排給當前線程的可能即將被執行的下一個任務。但不能保證該任務將在接下來實際被當即執行。該方法可能在即便任務存在但由於競爭而不可訪問而返回null
//該方法主要是爲了支持擴展,不然可能不會被使用。
protected static ForkJoinTask<?> peekNextLocalTask() {
Thread t; ForkJoinPool.WorkQueue q;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
q = ((ForkJoinWorkerThread)t).workQueue;
else
q = ForkJoinPool.commonSubmitterQueue();
return (q == null) ? null : q.peek();
}
//獲取而且移除(即取消執行)安排給當前線程的可能即將被執行的下一個任務。
//該方法主要是爲了支持擴展,不然可能不會被使用。
protected static ForkJoinTask<?> pollNextLocalTask() {
Thread t;
return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
null;
}
//若是當前線程被ForkJoinPool運行,獲取而且移除(即取消執行)當前線程即將可能執行的下一個任務。該任務多是從其它線程中竊取來的。
//返回nulll並不必定意味着此任務正在操做的ForkJoinPool處於靜止狀態。該方法主要是爲了支持擴展,不然可能不會被使用。
protected static ForkJoinTask<?> pollTask() {
Thread t; ForkJoinWorkerThread wt;
return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
null;
}
複製代碼
一般ForkJoinTask只適用於非循環依賴的純函數的計算或孤立對象的操做,不然,執行可能會遇到某種形式的死鎖,由於任務循環地等待彼此。可是,這個框架支持其餘方法和技術(例如使用Phaser、helpQuiesce和complete),這些方法和技術可用於構造解決這種依賴任務的ForkJoinTask子類,爲了支持這些用法,可使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地標記一個short類型的值,並使用getForkJoinTaskTag進行檢查。ForkJoinTask實現沒有將這些受保護的方法或標記用於任何目的,可是它們能夠用於構造專門的子類,由此可使用提供的方法來避免從新訪問已經處理過的節點/任務。
ForkJoinTask應該執行相對較少的計算,而且應該避免不肯定的循環。大任務應該被分解成更小的子任務,一般經過遞歸分解。若是任務太大,那麼並行性就不能提升吞吐量。若是過小,那麼內存和內部任務維護開銷可能會超過處理開銷。
ForkJoinTask是可序列化的,這使它們可以在諸如遠程執行框架之類的擴展中使用。只在執行以前或以後序列化任務纔是明智的,而不是在執行期間。
ForkJoinPool:ForkJoinTask 須要經過 ForkJoinPool 來執行,任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,進入隊列的頭部。當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘工做線程的隊列的尾部獲取一個任務。
ForkJoinPool 與 內部類 WorkQueue 共享的一些常量
// Constants shared across ForkJoinPool and WorkQueue
// 限定參數
static final int SMASK = 0xffff; // 低位掩碼,也是最大索引位
static final int MAX_CAP = 0x7fff; // 工做線程最大容量
static final int EVENMASK = 0xfffe; // 偶數低位掩碼
static final int SQMASK = 0x007e; // workQueues 數組最多64個槽位
// ctl 子域和 WorkQueue.scanState 的掩碼和標誌位
static final int SCANNING = 1; // 標記是否正在運行任務
static final int INACTIVE = 1 << 31; // 失活狀態 負數
static final int SS_SEQ = 1 << 16; // 版本戳,防止ABA問題
// ForkJoinPool.config 和 WorkQueue.config 的配置信息標記
static final int MODE_MASK = 0xffff << 16; // 模式掩碼
static final int LIFO_QUEUE = 0; // LIFO隊列
static final int FIFO_QUEUE = 1 << 16; // FIFO隊列
static final int SHARED_QUEUE = 1 << 31; // 共享模式隊列,負數 ForkJoinPool 中的相關常量和實例字段:
複製代碼
ForkJoinPool 中的相關常量和實例字段
// 低位和高位掩碼
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;
// 活躍線程數
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數增量
private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數掩碼
// 工做線程數
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工做線程數增量
private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // 建立工做線程標誌
// 池狀態
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;
// 實例字段
volatile long ctl; // 主控制參數
volatile int runState; // 運行狀態鎖
final int config; // 並行度|模式
int indexSeed; // 用於生成工做線程索引
volatile WorkQueue[] workQueues; // 主對象註冊信息,workQueue
final ForkJoinWorkerThreadFactory factory;// 線程工廠
final UncaughtExceptionHandler ueh; // 每一個工做線程的異常信息
final String workerNamePrefix; // 用於建立工做線程的名稱
volatile AtomicLong stealCounter; // 偷取任務總數,也可做爲同步監視器
/** 靜態初始化字段 */
//線程工廠
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
//啓動或殺死線程的方法調用者的權限
private static final RuntimePermission modifyThreadPermission;
// 公共靜態pool
static final ForkJoinPool common;
//並行度,對應內部common池
static final int commonParallelism;
//備用線程數,在tryCompensate中使用
private static int commonMaxSpares;
//建立workerNamePrefix(工做線程名稱前綴)時的序號
private static int poolNumberSequence;
//線程阻塞等待新的任務的超時值(以納秒爲單位),默認2秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
//空閒超時時間,防止timer未命中
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
//默認備用線程數
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
//阻塞前自旋的次數,用在在awaitRunStateLock和awaitWork中
private static final int SPINS = 0;
//indexSeed的增量
private static final int SEED_INCREMENT = 0x9e3779b9;
複製代碼
ForkJoinPool 的內部狀態都是經過一個64位的 long 型 變量ctl來存儲,它由四個16位的子域組成:
ForkJoinPool.WorkQueue 中的相關屬性:
//初始隊列容量,2的冪
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大隊列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// 實例字段
volatile int scanState; // Woker狀態, <0: inactive; odd:scanning
int stackPred; // 記錄前一個棧頂的ctl
int nsteals; // 偷取任務數
int hint; // 記錄偷取者索引,初始爲隨機索引
int config; // 池索引和模式
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // 下一個poll操做的索引(棧底/隊列頭)
int top; // 一個push操做的索引(棧頂/隊列尾)
ForkJoinTask<?>[] array; // 任務數組
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // 當前工做隊列的工做線程,共享模式下爲null
volatile Thread parker; // 調用park阻塞期間爲owner,其餘狀況爲null
volatile ForkJoinTask<?> currentJoin; // 記錄被join過來的任務
volatile ForkJoinTask<?> currentSteal; // 記錄從其餘工做隊列偷取過來的任務
複製代碼
ForkJoinPool採用了哈希數組 + 雙端隊列的方式存聽任務,但這裏的任務分爲兩類:
ForkJoinPool並無把這兩種任務混在一個任務隊列中,對於外部任務,會利用Thread內部的隨機probe值映射到哈希數組的偶數槽位中的提交隊列中,這種提交隊列是一種數組實現的雙端隊列稱之爲Submission Queue,專門存放外部提交的任務。
對於ForkJoinWorkerThread工做線程,每個工做線程都分配了一個工做隊列,這也是一個雙端隊列,稱之爲Work Queue,這種隊列都會被映射到哈希數組的奇數槽位,每個工做線程fork/join分解的任務都會被添加到本身擁有的那個工做隊列中。
在ForkJoinPool中的屬性 WorkQueue[] workQueues 就是咱們所說的哈希數組,其元素就是內部類WorkQueue實現的基於數組的雙端隊列。該哈希數組的長度爲2的冪,而且支持擴容。以下就是該哈希數組的示意結構圖:
如圖,提交隊列位於哈希數組workQueue的奇數索引槽位,工做線程的工做隊列位於偶數槽位。
其完整構造方法以下
private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
複製代碼
重要參數解釋
asyncMode表示工做線程內的任務隊列是採用何種方式進行調度,能夠是先進先出FIFO,也能夠是後進先出LIFO。若是爲true,則線程池中的工做線程則使用先進先出方式進行任務調度,默認狀況下是false。
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
//提交到工做隊列
externalPush(task);
return task;
}
複製代碼
ForkJoinPool 自身擁有工做隊列,這些工做隊列的做用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務,而這些工做隊列被稱爲 submitting queue 。 submit() 和 fork() 其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操做)。submitting queue 和其餘 work queue 同樣,是工做線程」竊取「的對象,所以當其中的任務被一個工做線程成功竊取時,就意味着提交的任務真正開始進入執行階段。
PS:以上代碼提交在 Github :github.com/Niuh-Study/…
文章持續更新,能夠公衆號搜一搜「 一角錢技術 」第一時間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經收錄,歡迎 Star。