fork/join做爲一個併發框架在jdk7的時候就加入到了咱們的java併發包java.util.concurrent中,而且在java 8 的lambda並行流中充當着底層框架的角色。這樣一個優秀的框架設計,我本身想了解一下它的底層代碼是如何實現的,因此我嘗試的去閱讀了JDK相關的源碼。下面我打算分享一下閱讀完以後的心得~。
瞭解設計原理,這僅僅是第一步!要了解別人整個的實現思路, 還須要瞭解別人爲了實現這個框架定義了哪些角色,並瞭解這些角色的職責範圍是什麼的。由於知道誰負責了什麼,誰作什麼,這樣整個邏輯才能串起來!在JAVA裏面角色是以類的形式定義的,而瞭解類的行爲最直接的方式就是看定義的公共方法~。框架
ForkJoinPool.WorkQueue: 雙端隊列就是它,它負責存儲接收的任務。ide
public class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 2; //閥值 private int start; private int end; public CountTask(int start,int end){ this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRESHOLD; if(canCompute){ for(int i = start; i <= end; i++){ sum += i; } }else{ int middle = (start + end) / 2; CountTask leftTask = new CountTask(start,middle); CountTask rightTask = new CountTask(middle + 1,end); //執行子任務 leftTask.fork(); rightTask.fork(); //等待子任務執行完,並獲得其結果 Integer rightResult = rightTask.join(); Integer leftResult = leftTask.join(); //合併子任務 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask countTask = new CountTask(1,200); ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(countTask); System.out.println(forkJoinTask.get()); } }
其實顯然易見負責整個fork/join初始化工做的就是ForkJoinPool!初始化代碼就是那一行 ForkJoinPool forkJoinPool = new ForkJoinPool(),點進去查看源碼。
ForkJoinPool forkJoinPool = new ForkJoinPool(); //最終調用到這段代碼 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), //並行度,當前機器的cpu核數 checkFactory(factory), //工做線程建立工廠 handler, //異常處理handler asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //任務隊列出隊模式 異步:先進先出,同步:後進先出 "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
看完初始化的代碼咱們能夠知道原來建立ForkJoinPool建立workerThread的工做都是統一由一個叫ForkJoinWorkerThreadFactory的工廠去建立,建立出來的線程都有一個統一的前輟名稱"ForkJoinPool-" + nextPoolId() + "-worker-".隊列出隊模式是LIFO(後進先出),那這樣後面的入隊的任務是會被先處理的。因此上面提到對代碼作了一些修改就是先處理rightTask,再處理leftTask。這實際上是對代碼的一種優化!
//執行子任務 leftTask.fork(); rightTask.fork(); Integer rightResult = rightTask.join(); Integer leftResult = leftTask.join();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(countTask);
leftTask.fork(); rightTask.fork();
代碼調用路徑 submit(ForkJoinTask<T> task) -> externalPush(ForkJoinTask<?> task) -> externalSubmit(ForkJoinTask<?> task)
private void externalSubmit(ForkJoinTask<?> task) { int r; // initialize caller's probe if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { //採用循環入隊的方式 WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } else if ((rs & STARTED) == 0 || // initialize 初始化操做 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two int p = config & SMASK; // ensure at least 2 slots //config就是cpu的核數 int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; //算出workQueues的大小n,n必定是2的次方數 workQueues = new WorkQueue[n]; //初始化隊列,而後跳到最外面的循環繼續把任務入隊~ ns = STARTED; } } finally { unlockRunState(rs, (rs & ~RSLOCK) | ns); } } else if ((q = ws[k = r & m & SQMASK]) != null) { //選中了一個一個非空隊列 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //利用cas操做加鎖成功! ForkJoinTask<?>[] a = q.array; int s =; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; //計算出任務在隊列中的位置 U.putOrderedObject(a, j, task); //把任務放在隊列中 U.putOrderedInt(q, QTOP, s + 1); //更新一次存放的位置 submitted = true; } } finally { U.compareAndSwapInt(q, QLOCK, 1, 0); //利用cas操做釋放鎖! } if (submitted) { signalWork(ws, q); return; //任務入隊成功了!跳出循環! } } move = true; // move on failure } else if (((rs = runState) & RSLOCK) == 0) { // create new queue 選中的隊列是空,初始化完隊列,而後繼續入隊! q = new WorkQueue(this, null); q.hint = r; q.config = k | SHARED_QUEUE; q.scanState = INACTIVE; rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
經過對externalSubmit方法的代碼進行分析,咱們知道了第一次提交任務給forkJoinPool時是在無限循環for (;;)中入隊。第一步先檢查workQueues是否是尚未建立,若是沒有,則進行建立。以後跳到外層for循環並隨機選取workQueues裏面一個隊列,並判斷隊列是否已建立。沒有建立,則進行建立!後又跳到外層for循環直到選到一個非空隊列而且加鎖成功!這樣最後才把任務入隊~。
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); //workerThread直接入本身的workQueue else ForkJoinPool.common.externalPush(this); return this; }
final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe(); int rs = runState; if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //隨機選取了一個非空隊列,而且加鎖成功!下面是普通的入隊過程~ ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putIntVolatile(q, QLOCK, 0); if (n <= 1) signalWork(ws, q); return; //結束方法 } U.compareAndSwapInt(q, QLOCK, 1, 0); //必定要釋放鎖! }
//這個就是上面的externalSummit方法,邏輯是同樣的~ externalSubmit(task); }
從代碼中咱們知道了提交一個fork任務的過程和第一次提交到forkJoinPool的過程是大同小異的。主要區分了提交任務的線程是否是workerThread,若是是,任務直接入workerThread當前的workQueue,不是則嘗試選中一個workQueue q。若是q非空而且加鎖成功則進行入隊,不然執行與第一次任務提交到forkJoinPool差很少的邏輯~。
/** The run status of this task */ volatile int status; // accessed directly by pool and workers static final int DONE_MASK = 0xf0000000; // mask out non-completion bits static final int NORMAL = 0xf0000000; // must be negative static final int CANCELLED = 0xc0000000; // must be < NORMAL static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED static final int SIGNAL = 0x00010000; // must be >= 1 << 16 static final int SMASK = 0x0000ffff; // short bits for tags
final int doExec() { //任務的執行入口 int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
public abstract class RecursiveTask<V> extends ForkJoinTask<V> { private static final long serialVersionUID = 5232453952276485270L; /** * The result of the computation. */ V result; /** * The main computation performed by this task. * @return the result of the computation */ protected abstract V compute(); //咱們實現的處理邏輯 public final V getRawResult() { //獲取返回計算結果 return result; } protected final void setRawResult(V value) { result = value; } /** * Implements execution conventions for RecursiveTask. */ protected final boolean exec() { result = compute(); //存儲計算結果 return true; } }
在代碼中咱們看到任務的真正執行鏈路是 doExec -> exec -> compute -> 最後設置status 和 result。既然定義狀態status而且仍是volatile類型咱們能夠推斷出workerThread在獲取到執行任務以後都會先判斷status是否是已完成或者異常狀態,才決定要不要處理該任務。
Integer rightResult = rightTask.join() public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } //執行處理前先判斷staus是否是已完成,若是完成了就直接返回 //由於這個任務可能被其它線程竊取過去處理完了 private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
先判斷任務是否已經處理完成,任務完成直接返回,沒有則直接嘗試出隊tryUnpush(this) 而後執行任務處理doExec()。若是沒有出隊成功或者處理成功,則執行wt.pool.awaitJoin(w, this, 0L)。wt.pool.awaitJoin(w, this, 0L)的處理邏輯簡單來講也是在一個for(;;)中不斷的輪詢任務的狀態是否是已完成,完成就直接退出方法。否就繼續嘗試出隊處理。直到任務完成或者超時爲止。
private int externalAwaitDone() { int s = ((this instanceof CountedCompleter) ? // try helping ForkJoinPool.common.externalHelpComplete( (CountedCompleter<?>)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { boolean interrupted = false; do { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { wait(0L); } catch (InterruptedException ie) { interrupted = true; } } else notifyAll(); } } } while ((s = status) >= 0); if (interrupted) Thread.currentThread().interrupt(); } return s;
externalAwaitDone的處理邏輯其實也比較簡單,當前線程本身先嚐試把任務出隊ForkJoinPool.common.tryExternalUnpush(this) ? doExec()而後處理掉,若是不成功就交給workerThread去處理,而後利用object/wait的經典方法去監放任務status的狀態變動。
public void run() { //線程run方法 if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); pool.runWorker(workQueue); //在這裏處理任務隊列! } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } } /** * Top-level runloop for workers, called by */ final void runWorker(WorkQueue w) { w.growArray(); // allocate queue 進行隊列的初始化 int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask<?> t;;) { //又是無限循環處理任務! if ((t = scan(w, r)) != null) //在這裏獲取任務! w.runTask(t); else if (!awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }
其實只要看下面的英文註釋就知道了大概scan(WorkQueue w, int r)就是用來竊取任務的!
/** * Scans for and tries to steal a top-level task. Scans start at a * random location, randomly moving on apparent contention, * otherwise continuing linearly until reaching two consecutive * empty passes over all queues with the same checksum (summing * each base index of each queue, that moves on each steal), at * which point the worker tries to inactivate and then re-scans, * attempting to re-activate (itself or some other worker) if * finding a task; otherwise returning null to await work. Scans * otherwise touch as little memory as possible, to reduce * disruption on other scanning threads. * * @param w the worker (via its WorkQueue) * @param r a random seed * @return a task, or null if none found */ private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { int ss = w.scanState; // initially non-negative for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; if ((q = ws[k]) != null) { //隨機選中了非空隊列 q if ((n = (b = q.base) - < 0 && (a = q.array) != null) { // non-empty long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //從尾部出隊,b是尾部下標 if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { if (ss >= 0) { if (U.compareAndSwapObject(a, i, t, null)) { //利用cas出隊 q.base = b + 1; if (n < -1) // signal others signalWork(ws, q); return t; //出隊成功,成功竊取一個任務! } } else if (oldSum == 0 && // try to activate 隊列沒有激活,嘗試激活 w.scanState < 0) tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; }
//k = k + 1表示取下一個隊列 若是(k + 1) & m == origin表示 已經遍歷完全部隊列了 if ((k = (k + 1) & m) == origin) { // continue until stable if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }