Fork/Join就是將一個大任務分解(fork)成許多個獨立的小任務,而後多線程並行去處理這些小任務,每一個小任務處理完獲得結果再進行合併(join)獲得最終的結果。html
流程:任務繼承RecursiveTask,重寫compute方法,使用ForkJoinPool的submit提交任務,任務在某個線程中運行,工做任務中的compute方法的代碼開始對任務進行分析,若是符合條件就進行任務拆分,拆分紅多個子任務,每一個子任務進行數據的計算或操做,獲得結果返回給上一層任務開啓線程進行合併,最終經過get獲取總體處理結果。【只能將任務1個切分爲兩個,不能切分爲3個或其餘數量】數組
public abstract class RecursiveTask<V> extends ForkJoinTask<V> { V result; protected abstract V compute(); public final V getRawResult() { return result; } protected final void setRawResult(V value) { result = value; } protected final boolean exec() { result = compute(); return true; } }
public abstract class RecursiveAction extends ForkJoinTask<Void> { protected abstract void compute(); public final Void getRawResult() { return null; } protected final void setRawResult(Void mustBeNull) { } protected final boolean exec() { compute(); return true; } }
public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; final ForkJoinPool.WorkQueue workQueue; protected ForkJoinWorkerThread(ForkJoinPool pool) { super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); } }
之前1+2+3+...+100這樣的處理能夠用for循環處理,如今使用fork/join來處理:從下面結果能夠看到,大任務被不斷的拆分紅小任務,而後添加到工做線程的隊列中,每一個小任務都會被工做線程從隊列中取出進行運行,而後每一個小任務的結果的合併也由工做線程執行,而後不斷的彙總成最終結果。【task經過ForkJoinPool來執行,分割的子任務添加到當前工做線程的隊列中,進入隊列的頭部,當一個工做線程中沒有任務時,會從其餘工做線程的隊列尾部獲取一個任務。(工做竊取:當前工做線程對應的隊列中沒有任務了,從其餘工做線程對應的隊列中取出任務進行操做,而後將操做結果返還給對應隊列的線程。)】多線程
public class MyFrokJoinTask extends RecursiveTask<Integer> { private int begin; private int end; public MyFrokJoinTask(int begin, int end) { this.begin = begin; this.end = end; } public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> result = pool.submit(new MyFrokJoinTask(1, 100));//提交任務 System.out.println("計算的值:"+result.get());//獲得最終的結果 } @Override protected Integer compute() { int sum = 0; if (end - begin <= 2) { for (int i = begin; i <= end; i++) { sum += i; System.out.println("i:"+i); } } else { MyFrokJoinTask d1 = new MyFrokJoinTask(begin, (begin + end) / 2); MyFrokJoinTask d2 = new MyFrokJoinTask((begin + end) / 2+1, end); d1.fork();//任務拆分 d2.fork();//任務拆分 Integer a = d1.join();//每一個任務的結果 Integer b = d2.join();//每一個任務的結果 sum = a + b;//彙總任務結果 System.out.println("sum:" + sum + ",a:" + a + ",b:" + b); } System.out.println("name:"+Thread.currentThread().getName()); return sum; } } //=========結果============ i:1 i:2 name:ForkJoinPool-1-worker-1 i:3 i:4 name:ForkJoinPool-1-worker-1 sum:10,a:3,b:7 name:ForkJoinPool-1-worker-1 i:5 i:6 i:7 name:ForkJoinPool-1-worker-1 sum:28,a:10,b:18 name:ForkJoinPool-1-worker-1 ............... ............... sum:91,a:28,b:63 sum:99,a:45,b:54 name:ForkJoinPool-1-worker-3 name:ForkJoinPool-1-worker-1 i:23 i:24 i:25 name:ForkJoinPool-1-worker-2 sum:135,a:63,b:72 name:ForkJoinPool-1-worker-2 sum:234,a:99,b:135 name:ForkJoinPool-1-worker-3 sum:325,a:91,b:234 name:ForkJoinPool-1-worker-1 sum:1275,a:325,b:950 name:ForkJoinPool-1-worker-1 sum:5050,a:1275,b:3775 name:ForkJoinPool-1-worker-1 計算的值:5050
ForkJoinPool forkJoinPool = new ForkJoinPool(); //Runtime.getRuntime().availableProcessors()當前操做系統可使用的CPU內核數量 public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } //this調用到下面這段代碼 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), //並行度 checkFactory(factory), //工做線程建立工廠 handler, //異常處理handler asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //任務隊列出隊模式 異步:先進先出,同步:後進先出 "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } //上面的this最終調用到下面這段代碼 private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
public static interface ForkJoinWorkerThreadFactory { /** * Returns a new worker thread operating in the given pool. */ public ForkJoinWorkerThread newThread(ForkJoinPool pool); } static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } }
fork/join框架中提供的fork()和join()是最重要的兩個方法,它們和parallelism(」可並行任務數量「)配合工做,能夠致使拆分的子任務T1.一、T1.2甚至TX在fork/join中不一樣的運行效果(上面1+2....+100的每次運行的子任務都是不一樣的)。即TX子任務或等待其餘已存在的線程運行關聯的子任務(sum操做),或在運行TX的線程中」遞歸「執行其餘任務(將1-50進行拆分後的子任務遞歸運行),或啓動一個新的線程執行子任務(運行1-50另外一邊拆分的任務,即50-100的子任務)。併發
fork()用於將新建立的子任務放入當前線程的workQueue隊列中,fork/join框架將根據當前正在併發執行ForkJoinTask任務的ForkJoinWorkerThread線程狀態,決定是讓這個任務在隊列中等待,仍是建立一個新的ForkJoinWorkedThread線程運行它,又或者是喚起其餘正在等待任務的ForkJoinWorkerThread線程運行它。框架
join()用於讓當前線程阻塞,直到對應的子任務完成運行並返回執行結果。或者,若是這個子任務存在於當前線程的任務等待隊列workQueue中,則取出這個子任務進行」遞歸「執行,其目的是儘快獲得當前子任務的運行結果,而後繼續執行。dom
submit:異步
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } public <T> ForkJoinTask<T> submit(Callable<T> task) { ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task); externalPush(job); return job; } public <T> ForkJoinTask<T> submit(Runnable task, T result) { ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result); externalPush(job); return job; } public ForkJoinTask<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.AdaptedRunnableAction(task); externalPush(job); return job; }
final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe();//當前線程的一個隨機數 int rs = runState;//當前容器的狀態 //若是隨機選取的隊列還有空位置能夠存放、隊列加鎖鎖定成功,任務就放入隊列中 if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task);//任務加入隊列中 U.putOrderedInt(q, QTOP, s + 1);//挪動下次任務存放的槽的位置 U.putIntVolatile(q, QLOCK, 0);//隊列解鎖 if (n <= 1)//當前數組元素少時,進行喚醒當前線程;或者當沒有活動線程或線程數較少時,添加新的線程 signalWork(ws, q); return; } U.compareAndSwapInt(q, QLOCK, 1, 0);//隊列解鎖 } externalSubmit(task);//升級版的externalPush } volatile int runState; // lockable status鎖定狀態 // runState: SHUTDOWN爲負數,其餘的爲2的次冪 private static final int RSLOCK = 1; private static final int RSIGNAL = 1 << 1;//喚醒 private static final int STARTED = 1 << 2;//啓動 private static final int STOP = 1 << 29;//中止 private static final int TERMINATED = 1 << 30;//結束 private static final int SHUTDOWN = 1 << 31;//關閉
private void externalSubmit(ForkJoinTask<?> task) { int r; // initialize caller's probe if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) {//自旋 WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; /** *ForkJoinPool執行器中止工做了,拋出異常 *ForkJoinPool extends AbstractExecutorService *abstract class AbstractExecutorService implements ExecutorService *interface ExecutorService extends Executor *interface Executor執行提交的對象Runnable任務 */ if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } //第一次遍歷,隊列數組未建立,進行建立 else if ((rs & STARTED) == 0 || // initialize初始化 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two int p = config & SMASK; // ensure at least 2 slots,config是CPU核數 int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n];//建立 ns = STARTED; } } finally { unlockRunState(rs, (rs & ~RSLOCK) | ns); } } //第三次遍歷,把任務放入隊列中 else if ((q = ws[k = r & m & SQMASK]) != null) { if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); submitted = true; } } finally { U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { signalWork(ws, q); return; } } move = true; // move on failure } //第二次遍歷,隊列數組爲空,建立隊列 else if (((rs = runState) & RSLOCK) == 0) { // create new queue q = new WorkQueue(this, null); q.hint = r; q.config = k | SHARED_QUEUE; q.scanState = INACTIVE; rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)//當前線程是workerThread,任務直接放入workerThread當前的workQueue ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this);//將任務添加到隨機選取的隊列中或新建立的隊列中 return this; }
push:async
public class ForkJoinPool extends AbstractExecutorService { static final class WorkQueue { final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed,隊列被移除忽略 int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);//任務加入隊列中 U.putOrderedInt(this, QTOP, s + 1);//挪動下次任務存放的槽的位置 if ((n = s - b) <= 1) {//當前數組元素少時,進行喚醒當前線程;或者當沒有活動線程或線程數較少時,添加新的線程 if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m)//數組全部元素都滿了進行2倍擴容 growArray(); } } final ForkJoinTask<?>[] growArray() { ForkJoinTask<?>[] oldA = array; int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;//2倍擴容或初始化 if (size > MAXIMUM_QUEUE_CAPACITY) throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { int mask = size - 1; do { // emulate poll from old array, push to new array遍歷從舊數組中取出放到新數組中 ForkJoinTask<?> x; int oldj = ((b & oldMask) << ASHIFT) + ABASE; int j = ((b & mask) << ASHIFT) + ABASE; x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);//從舊數組中取出 if (x != null && U.compareAndSwapObject(oldA, oldj, x, null))//將舊數組取出的位置的對象置爲null U.putObjectVolatile(a, j, x);//放入新數組 } while (++b != t); } return a; } } }
任務的消費的執行鏈路是ForkJoinTask.doExec() -> RecursiveTask.exec()/RecursiveAction.exec() -> 覆蓋重寫的compute()ide
final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec();//消費任務 } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL);//任務執行完設置狀態爲NORMAL,並喚醒其餘等待任務 } return s; } protected abstract boolean exec(); private int setCompletion(int completion) { for (int s;;) { if ((s = status) < 0) return s; if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {//任務狀態修改成NORMAL if ((s >>> 16) != 0)//狀態不是SMASK synchronized (this) { notifyAll(); }//喚醒其餘等待任務 return completion; } } } /** The run status of this task 任務的運行狀態*/ volatile int status; // accessed directly by pool and workers由ForkJoinPool池或ForkJoinWorkerThread控制 static final int DONE_MASK = 0xf0000000; // mask out non-completion bits static final int NORMAL = 0xf0000000; // must be negative static final int CANCELLED = 0xc0000000; // must be < NORMAL static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED static final int SIGNAL = 0x00010000; // must be >= 1 << 16 static final int SMASK = 0x0000ffff; // short bits for tags
任務提交到ForkJoinPool,最終真正的是由繼承Thread的ForkJoinWorkerThread的run方法來執行消費任務的,ForkJoinWorkerThread處理哪一個任務是由join來出隊的;源碼分析
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult();//獲得返回結果 } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; /** * (s = status) < 0 判斷任務是否已經完成,完成直接返回s * 任務未完成: * 1)線程是ForkJoinWorkerThread,tryUnpush任務出隊而後消費任務doExec * 1.1)出隊或消費失敗,執行awaitJoin進行自旋,若是任務狀態是完成就退出,不然繼續嘗試出隊,直到任務完成或超時爲止; * 2)若是線程不是ForkJoinWorkerThread,執行externalAwaitDone進行出隊消費 */ return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } private void reportException(int s) { if (s == CANCELLED)//取消 throw new CancellationException(); if (s == EXCEPTIONAL)//異常 rethrow(getThrowableException()); }
public class ForkJoinPool{ final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { int s = 0; if (task != null && w != null) { ForkJoinTask<?> prevJoin = w.currentJoin; U.putOrderedObject(w, QCURRENTJOIN, task); CountedCompleter<?> cc = (task instanceof CountedCompleter) ? (CountedCompleter<?>)task : null; for (;;) { if ((s = task.status) < 0)//任務完成退出 break; if (cc != null)//當前任務即將完成,檢查是否還有其餘的等待任務,若是有 //運行當前隊列的其餘任務,若當前的隊列中沒有任務了,則竊取其餘隊列的任務並運行 helpComplete(w, cc, 0); //當前隊列沒有任務了,或隊列只剩下最後一個任務執行完了 else if (w.base == w.top || w.tryRemoveAndExec(task)) helpStealer(w, task);//竊取其餘隊列的任務 if ((s = task.status) < 0) break; long ms, ns; if (deadline == 0L) ms = 0L; else if ((ns = deadline - System.nanoTime()) <= 0L)//超時退出 break; else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) ms = 1L; if (tryCompensate(w)) {//當前隊列阻塞了 task.internalWait(ms);//進行等待 U.getAndAddLong(this, CTL, AC_UNIT); } } U.putOrderedObject(w, QCURRENTJOIN, prevJoin); } return s; } }
private int externalAwaitDone() { /** * 當前任務是CountedCompleter * 1)是則執行ForkJoinPool.common.externalHelpComplete() * 2)不然執行ForkJoinPool.common.tryExternalUnpush(this)進行任務出隊 * 2.1)出隊成功,進行doExec()消費,不然進行阻塞等待 */ int s = ((this instanceof CountedCompleter) ? // try helping ForkJoinPool.common.externalHelpComplete( (CountedCompleter<?>)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) {//任務未完成 boolean interrupted = false; do { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {//任務狀態標記爲SIGNAL synchronized (this) { if (status >= 0) { try { wait(0L);//阻塞等待 } catch (InterruptedException ie) {//有中斷異常 interrupted = true;//設置中斷標識爲true } } else notifyAll();//任務完成喚醒其餘任務 } } } while ((s = status) >= 0); if (interrupted) Thread.currentThread().interrupt();//當前線程進行中斷 } return s; } final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { WorkQueue[] ws; int n; int r = ThreadLocalRandom.getProbe(); //沒有任務直接結束,有任務則執行helpComplete //helpComplete:運行隨機選取的隊列的任務,若選取的隊列中沒有任務了,則竊取其餘隊列的任務並運行 return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 : helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks); }
任務是由workThread來竊取的,workThread是一個線程。線程的全部邏輯都是由run()方法執行:
public class ForkJoinWorkerThread extends Thread { public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart();//初始化狀態 pool.runWorker(workQueue);//處理任務隊列 } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } } } public class ForkJoinPool{ final void runWorker(WorkQueue w) { w.growArray(); // allocate queue,隊列初始化 int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask<?> t;;) {//自旋 if ((t = scan(w, r)) != null)//從隊列中竊取任務成功,scan()進行任務竊取 w.runTask(t);//執行任務,內部方法調用了doExec()進行任務的消費 else if (!awaitWork(w, r))//隊列沒有任務了則結束 break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } } }
private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { int ss = w.scanState; // initially non-negative for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; if ((q = ws[k]) != null) { //隨機選中了非空隊列 q if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //從尾部出隊,b是尾部下標 if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { if (ss >= 0) { if (U.compareAndSwapObject(a, i, t, null)) { //利用cas出隊 q.base = b + 1; if (n < -1) // signal others signalWork(ws, q); return t; //出隊成功,成功竊取一個任務! } } else if (oldSum == 0 && // try to activate 隊列沒有激活,嘗試激活 w.scanState < 0) tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; }
//k = k + 1表示取下一個隊列 若是(k + 1) & m == origin表示已經遍歷完全部隊列了 if ((k = (k + 1) & m) == origin) { // continue until stable if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }
volatile int scanState; // versioned, <0: inactive; odd:scanning,版本標記,小於0暫停,奇數進行掃描其餘任務 static final int SCANNING = 1; // false when running tasks,有任務執行是false /** * Executes the given task and any remaining local tasks. * 執行給定的任務和任何剩餘的本地任務 */ final void runTask(ForkJoinTask<?> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy,暫停掃描,當前有任務執行 (currentSteal = task).doExec();//執行竊取的任務 U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC,竊取的任務執行完置爲null execLocalTasks();//執行本地的任務 ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow,竊取計數溢出 transferStealCount(pool);//重置竊取計數 scanState |= SCANNING;//繼續掃描隊列 if (thread != null) thread.afterTopLevelExec(); } } static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread { @Override // to erase ThreadLocals,清除threadLocals void afterTopLevelExec() { eraseThreadLocals(); } /** * Erases ThreadLocals by nulling out Thread maps. */ final void eraseThreadLocals() { U.putObject(this, THREADLOCALS, null);//threadLocals置爲null U.putObject(this, INHERITABLETHREADLOCALS, null);//inheritablethreadlocals置爲null } }
對於fork/join來講,在使用時仍是存在下面的一些問題的: