Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。也是當前執行速度最快的併發框架。算法
工做竊取(work-stealing)算法是指某個線程從其餘隊列裏竊取任務來執行。工做竊取的運行流程圖以下:安全
那麼爲何須要使用工做竊取算法呢?假如咱們須要作一個比較大的任務,咱們能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,因而把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,好比A線程負責處理A隊列裏的任務。可是有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。多線程
工做竊取算法的優勢是充分利用線程進行並行計算,並減小了線程間的競爭,其缺點是在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且消耗了更多的系統資源,好比建立多個線程和多個雙端隊列。併發
咱們能夠經過一個實例的改進來逐步剖析fork/join框架的使用,而後再來對其任務的源碼進行分析其實現方式。框架
咱們先來創建一個實驗,該實驗是投擲兩粒骰子一億次,並獲取出現每種結果(兩骰子的點數相加的和,必然在2到12之間)與其出現機率的狀況,咱們先採用線程調度和等待線程池中的某項任務完成來處理。 dom
public class ManualDiceRollsOne { //投擲兩次骰子的次數 private static final int N = 100000000; //一次的佔比 private final double fraction; //每次投2次骰子的點數之和與機率的映射 private final Map<Integer,Double> results; //計算機線程數 private final int numbersOfThreads; //線程池 private final ExecutorService executor; //每一個線程的工做次數 private final int workPerThread; public ManualDiceRollsOne() { fraction = 1.0 / N; results = new ConcurrentHashMap<>(); numbersOfThreads = Runtime.getRuntime().availableProcessors() * 2; executor = Executors.newFixedThreadPool(numbersOfThreads); workPerThread = N / numbersOfThreads; } public void simulateDiceRoles() { //計算全部投擲2次骰子的結果機率 List<Future<?>> futures = submitJobs(); //等待結果,拿取結果 awaitCompletion(futures); //打印結果 printResults(); } private void printResults() { //等同於results.entrySet().forEach(entry -> System.out.println(entry)); results.entrySet().forEach(System.out::println); } private List<Future<?>> submitJobs() { List<Future<?>> futures = new ArrayList<>(); for (int i = 0;i < numbersOfThreads;i++) { //我把個人全部計算任務所有交給Future集合,彼此間不影響 futures.add(executor.submit(makeJob())); } return futures; } private Runnable makeJob() { return () -> { //ThreadLocalRandom對應於不一樣線程都有一個線程的隨機種子值 //在多線程下當使用ThreadLocalRandom來生成隨機數 ThreadLocalRandom random = ThreadLocalRandom.current(); for (int i = 0;i < workPerThread;i++) { int entry = twoDiceThrows(random); //獲取每次投擲2個骰子的點數之和,增長每次的機率(億分之一),存入 //線程安全集合ConcurrentHashMap中 accumuLateResult(entry); } }; } private void accumuLateResult(int entry) { //Map的compute方法第二參數爲BiFunction的函數式接口,給定兩種不一樣的參數對象,返回另外一個結果對象,這三種對象 //能夠相同,能夠不一樣 //若是results的entry鍵的值爲null(該鍵不存在),則把該值設爲fraction(單次機率億分之一) //不然將該鍵的值設爲原值加上fraction(單次機率億分之一) results.compute(entry,(key,previous) -> previous == null ? fraction : previous + fraction); } private int twoDiceThrows(ThreadLocalRandom random) { int firstThrow = random.nextInt(1,7); int secondThrow = random.nextInt(1,7); return firstThrow + secondThrow; } private void awaitCompletion(List<Future<?>> futures) { //等待全部的計算任務完成後,拿取計算結果,關閉線程池 futures.forEach(future -> { try { future.get(); }catch (Exception e) { e.printStackTrace(); } }); executor.shutdown(); } public static void main(String[] args) { ManualDiceRollsOne rolls = new ManualDiceRollsOne(); long start = System.currentTimeMillis(); rolls.simulateDiceRoles(); System.out.println(System.currentTimeMillis() - start); } }
運行結果異步
2=0.027757480001947783async
3=0.05559653000661176ide
4=0.08333084999680387函數
5=0.11108438998219564
6=0.13888012996756519
7=0.16669714995292353
8=0.13887485996756796
9=0.11109162998219183
10=0.0833178699968107
11=0.05558395000660965
12=0.02778516000195242
5638
這是一個傳統多線程的調度計算,因爲有ConcurrentHashMap的存在,在多線程中運行速度較慢,運行完時間爲5秒6,此時甚至比不過單線程的速度。
public class ManualDiceRollsThree { //投擲兩次骰子的次數 private static final int N = 100000000; //一次的佔比 private double fraction = 1.0 / N; //每次投2次骰子的點數之和與機率的映射 private Map<Integer,Double> results = new HashMap<>(); private void printResults() { //等同於results.entrySet().forEach(entry -> System.out.println(entry)); results.entrySet().forEach(System.out::println); } public void simulateDiceRoles() throws InterruptedException { for (int i = 0;i < N;i++) { int entry = twoDiceThrows(); results.compute(entry,(k,v) -> v == null ? fraction : v + fraction); } printResults(); } private int twoDiceThrows() { ThreadLocalRandom random = ThreadLocalRandom.current(); int firstThrow = random.nextInt(1,7); int secondThrow = random.nextInt(1,7); return firstThrow + secondThrow; } public static void main(String[] args) throws InterruptedException { ManualDiceRollsThree manualDiceRollsThree = new ManualDiceRollsThree(); long start = System.currentTimeMillis(); manualDiceRollsThree.simulateDiceRoles(); System.out.println(System.currentTimeMillis() - start); } }
運行結果
2=0.027763110001948726
3=0.05556761000660691
4=0.08331852999681036
5=0.11113696998216796
6=0.13886518996757305
7=0.16663615995295564
8=0.13883302996758998
9=0.11116849998215136
10=0.08339849999676827
11=0.055518730006598724
12=0.027793670001953846
1600
此時咱們對多線程的例子進行一次fork/join框架的改造
public class ManualDiceRollsFour { //投擲兩次骰子的次數 private static final int N = 100000000; //一次的佔比 private double fraction = 1.0 / N; //每次投2次骰子的點數之和與機率的映射 private Map<Integer,Double> results = new ConcurrentHashMap<>(); private final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); private AtomicInteger count = new AtomicInteger(0); private void printResults() { //等同於results.entrySet().forEach(entry -> System.out.println(entry)); results.entrySet().forEach(System.out::println); } public void simulateDiceRoles() throws InterruptedException { ForkJoinTask<Void> task = forkJoinPool.submit(makeJob()); task.join(); //打印結果 printResults(); // System.out.println(count.get()); } private CountTask makeJob() { CountTask countTask = new CountTask(0,N); return countTask; } private void accumuLateResult(int entry) { //Map的compute方法第二參數爲BiFunction的函數式接口,給定兩種不一樣的參數對象,返回另外一個結果對象,這三種對象 //能夠相同,能夠不一樣 //若是results的entry鍵的值爲null(該鍵不存在),則把該值設爲fraction(單次機率億分之一) //不然將該鍵的值設爲原值加上fraction(單次機率億分之一) results.compute(entry,(key,previous) -> previous == null ? fraction : previous + fraction); } private int twoDiceThrows() { ThreadLocalRandom random = ThreadLocalRandom.current(); int firstThrow = random.nextInt(1,7); int secondThrow = random.nextInt(1,7); return firstThrow + secondThrow; } private class CountTask extends RecursiveAction { private static final int THRESHOLD = 2000000; private int start; private int end; public CountTask(int start,int end) { this.start = start; this.end = end; } @Override protected void compute() { boolean canCompute = (end - start) <= THRESHOLD; //最終計算,全部的最終拆分都是在這裏計算 if (canCompute) { for (int i = start;i < end;i++) { int entry = twoDiceThrows(); accumuLateResult(entry); // count.incrementAndGet(); } }else { //並行計算的規模,拆分紅50個並行計算 int step = (start + end) / 50; //建立子任務線程集合 List<CountTask> subTasks = new ArrayList<>(); //每一個並行子任務的開始值 int pos = start; //並行執行50個分叉線程 for (int i = 0; i < 50; i++) { //每一個並行子任務的結束值 int lastOne = pos + step; if (lastOne > end) { lastOne = end; } //創建一個子任務的線程 CountTask subTask = new CountTask(pos, lastOne); //建立下一個並行子任務的開始值 pos += step + 1; //將當前子任務線程添加到線程集合 subTasks.add(subTask); //執行該線程,實際上是一個遞歸,判斷lastOne-pos是否小於THRESHOLD,小於則真正執行,不然繼續分叉50個子線程 subTask.fork(); } for (CountTask task : subTasks) { task.join(); } } } } public static void main(String[] args) throws InterruptedException { ManualDiceRollsFour manualDiceRollsFour = new ManualDiceRollsFour(); long start = System.currentTimeMillis(); manualDiceRollsFour.simulateDiceRoles(); System.out.println(System.currentTimeMillis() - start); } }
運行結果
2=0.027765680001949157
3=0.055569410006607214
4=0.08334217999679791
5=0.11114915998216154
6=0.13889079996755957
7=0.16668756995292858
8=0.13887695996756685
9=0.11111537998217932
10=0.08329060999682505
11=0.055536280006601664
12=0.0277754800019508
6185
按照常理來講,多線程在如此大數據量的狀況下理應快過單線程,形成這種狀況的結果,只能說明是ConcurrentHashMap在億級運算的並行下阻礙了運行的速度,如今咱們要將ConcurrentHashMap去掉,徹底在沒有ConcurrentHashMap的狀況下使用fork/join框架。
public class ManualDiceRollsFive { //投擲兩次骰子的次數 private static final int N = 100000000; //一次的佔比 private double fraction = 1.0 / N; //每次投2次骰子的點數之和與機率的映射 private Map<Integer,Double> results; private final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); private void printResults() { //等同於results.entrySet().forEach(entry -> System.out.println(entry)); results.entrySet().forEach(System.out::println); } public void simulateDiceRoles() throws InterruptedException { ForkJoinTask<Map<Integer, Double>> result = forkJoinPool.submit(makeJob()); try { this.results = result.get(); } catch (ExecutionException e) { e.printStackTrace(); } //打印結果 printResults(); forkJoinPool.shutdown(); // System.out.println(count.get()); } private CountTask makeJob() { CountTask countTask = new CountTask(0,N); return countTask; } private int twoDiceThrows(ThreadLocalRandom random) { int firstThrow = random.nextInt(1,7); int secondThrow = random.nextInt(1,7); return firstThrow + secondThrow; } private class CountTask extends RecursiveTask<Map<Integer,Double>> { private static final int THRESHOLD = 2000000; private int start; private int end; public CountTask(int start,int end) { this.start = start; this.end = end; } @Override protected Map<Integer,Double> compute() { Map<Integer,Double> result = new HashMap<>(); IntStream.range(2,13).sequential().forEach(i -> result.put(i,0.0)); ThreadLocalRandom random = ThreadLocalRandom.current(); boolean canCompute = (end - start) <= THRESHOLD; //最終計算,全部的最終拆分都是在這裏計算 if (canCompute) { for (int i = start;i < end;i++) { int entry = twoDiceThrows(random); result.compute(entry,(k,v) -> v == 0.0 ? fraction : v + fraction); // accumuLateResult(entry); // count.incrementAndGet(); } }else { //並行計算的規模,拆分紅50個並行計算 int step = (start + end) / 50; //建立子任務線程集合 List<CountTask> subTasks = new ArrayList<>(); //每一個並行子任務的開始值 int pos = start; //並行執行50個分叉線程 for (int i = 0; i < 50; i++) { //每一個並行子任務的結束值 int lastOne = pos + step; if (lastOne > end) { lastOne = end; } //創建一個子任務的線程 CountTask subTask = new CountTask(pos, lastOne); //建立下一個並行子任務的開始值 pos += step + 1; //將當前子任務線程添加到線程集合 subTasks.add(subTask); //執行該線程,實際上是一個遞歸,判斷lastOne-pos是否小於THRESHOLD,小於則真正執行,不然繼續分叉50個子線程 subTask.fork(); } for (CountTask task : subTasks) { Map<Integer,Double> taskMap = task.join(); result.entrySet().stream().forEach(entry -> result.compute(entry.getKey(), (k,v) -> v == 0.0 ? taskMap.get(k) : v + taskMap.get(k))); } } return result; } } public static void main(String[] args) throws InterruptedException { ManualDiceRollsFive manualDiceRollsFive = new ManualDiceRollsFive(); long start = System.currentTimeMillis(); manualDiceRollsFive.simulateDiceRoles(); System.out.println(System.currentTimeMillis() - start); } }
運行結果
2=0.02779156000000586
3=0.055543890000069124
4=0.0833797699999038
5=0.11111334999973911
6=0.13887251999957423
7=0.16666815999940918
8=0.13892032999957402
9=0.11105938999973942
10=0.08332794999990413
11=0.05555078000006909
12=0.02777181000000576
546
這個纔是多線程應有的速度,徹底不存在鎖的限制。經過這樣一個例子的改造,咱們能夠看到CountTask任務類繼承過兩種父類RecursiveTask和RecursiveAction,而這兩種類其實又都繼承於同一個父類ForkJoinTask。
a.RecursiveAction:用於沒有返回結果的任務
b.RecursiveTask:用於有返回結果的任務
而全部這些任務對象須要提交到ForkJoinPool線程池來執行
private final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
跟進源碼咱們能夠看到
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; //工做線程名前綴 this.factory = factory; //工做線程建立工廠 this.ueh = handler; //異常處理handler this.config = (parallelism & SMASK) | mode; //並行度,當前機器的cpu核數 mode 任務隊列出隊模式 異步:先進先出,同步:後進先出 long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
看完初始化的代碼咱們能夠知道原來建立ForkJoinPool建立workerThread的工做都是統一由一個叫ForkJoinWorkerThreadFactory的工廠去建立,建立出來的線程都有一個統一的前輟名稱"ForkJoinPool-" + nextPoolId() + "-worker-".隊列出隊模式是LIFO(後進先出),那這樣後面的入隊的任務是會被先處理的。因此上面代碼到50個分岔,越後面的任務會越先處理,這實際上是對代碼的一種優化!
提交咱們用的是submit()方法,咱們來看一下該方法的源代碼
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; }
final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe(); int rs = runState; if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putIntVolatile(q, QLOCK, 0); if (n <= 1) signalWork(ws, q); return; } U.compareAndSwapInt(q, QLOCK, 1, 0); } externalSubmit(task); }
private void externalSubmit(ForkJoinTask<?> task) { int r; // initialize caller's probe if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { //採用循環入隊的方式 WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { if ((rs & STARTED) == 0) { // initialize 初始化操做 U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two int p = config & SMASK; // ensure at least 2 slots //config就是cpu的核數 int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; //算出workQueues的大小n,n必定是2的次方數 workQueues = new WorkQueue[n]; //初始化隊列,而後跳到最外面的循環繼續把任務入隊 ns = STARTED; } } finally { unlockRunState(rs, (rs & ~RSLOCK) | ns); } } else if ((q = ws[k = r & m & SQMASK]) != null) { //選中了一個一個非空隊列 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //利用cas操做加鎖成功! ForkJoinTask<?>[] a = q.array; int s = q.top; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; //計算出任務在隊列中的位置 U.putOrderedObject(a, j, task); //把任務放在隊列中 U.putOrderedInt(q, QTOP, s + 1); //更新一次存放的位置 submitted = true; } } finally { U.compareAndSwapInt(q, QLOCK, 1, 0); //利用cas操做釋放鎖! } if (submitted) { signalWork(ws, q); return; //任務入隊成功了!跳出循環! } } move = true; // move on failure } else if (((rs = runState) & RSLOCK) == 0) { // create new queue 選中的隊列是空,初始化完隊列,而後繼續入隊! q = new WorkQueue(this, null); q.hint = r; q.config = k | SHARED_QUEUE; q.scanState = INACTIVE; rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
咱們大體說一下上面這些代碼的含義:
經過對externalSubmit方法的代碼進行分析,咱們知道了第一次提交任務給forkJoinPool時是在無限循環for (;;)中入隊。第一步先檢查workQueues是否是尚未建立,若是沒有,則進行建立。以後跳到外層for循環並隨機選取workQueues裏面一個隊列,並判斷隊列是否已建立。沒有建立,則進行建立!後又跳到外層for循環直到選到一個非空隊列而且加鎖成功!這樣最後才把任務入隊~。
因此咱們知道fork/join的任務隊列workQueues並非初始化的時候就建立好了,而是在有任務提交的時候才建立!而且每次入隊時都須要利用cas操做來進行加鎖和釋放鎖!
而後咱們來看一下任務的分岔fork()方法,此處又被稱爲二次提交
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); //workerThread直接入本身的workQueue else ForkJoinPool.common.externalPush(this); return this; }
final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe(); int rs = runState; if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //隨機選取了一個非空隊列,而且加鎖成功!下面是普通的入隊過程~ ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putIntVolatile(q, QLOCK, 0); if (n <= 1) signalWork(ws, q); return; //結束方法 } U.compareAndSwapInt(q, QLOCK, 1, 0); //必定要釋放鎖! } externalSubmit(task); //這個就是上面的externalSubmit方法,邏輯是同樣的~ }
從代碼中咱們知道了提交一個fork任務的過程和第一次提交到forkJoinPool的過程是大同小異的。主要區分了提交任務的線程是否是workerThread,若是是,任務直接入workerThread當前的workQueue,不是則嘗試選中一個workQueue q。若是q非空而且加鎖成功則進行入隊,不然執行與第一次任務提交到forkJoinPool差很少的邏輯~。
咱們再來看一下join()方法
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
執行處理前先判斷staus是否是已完成,若是完成了就直接返回
由於這個任務可能被其它線程竊取過去處理完了
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
代碼的調用鏈是從上到下。總體處理邏輯以下:
線程是workerThread:
先判斷任務是否已經處理完成,任務完成直接返回,沒有則直接嘗試出隊tryUnpush(this) 而後執行任務處理doExec()。若是沒有出隊成功或者處理成功,則執行wt.pool.awaitJoin(w, this, 0L)。wt.pool.awaitJoin(w, this, 0L)的處理邏輯簡單來講也是在一個for(;;)中不斷的輪詢任務的狀態是否是已完成,完成就直接退出方法。否就繼續嘗試出隊處理。直到任務完成或者超時爲止。
線程不是workerThread:
直接進行入externalAwaitDone()
private int externalAwaitDone() { int s = ((this instanceof CountedCompleter) ? // try helping ForkJoinPool.common.externalHelpComplete( (CountedCompleter<?>)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { boolean interrupted = false; do { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { wait(0L); } catch (InterruptedException ie) { interrupted = true; } } else notifyAll(); } } } while ((s = status) >= 0); if (interrupted) Thread.currentThread().interrupt(); } return s; }
externalAwaitDone的處理邏輯其實也比較簡單,當前線程本身先嚐試把任務出隊ForkJoinPool.common.tryExternalUnpush(this) ? doExec()而後處理掉,若是不成功就交給workerThread去處理,而後利用object/wait的經典方法去監放任務status的狀態變動。
最後說一下工做竊取,須要看一下ForkJoinWorkerThread
public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); pool.runWorker(workQueue); //在這裏處理任務隊列! } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } }
final void runWorker(WorkQueue w) { w.growArray(); // allocate queue 進行隊列的初始化 int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask<?> t;;) { //又是無限循環處理任務! if ((t = scan(w, r)) != null) //在這裏獲取任務! w.runTask(t); else if (!awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }
private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { int ss = w.scanState; // initially non-negative for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; if ((q = ws[k]) != null) { //隨機選中了非空隊列 q if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //從尾部出隊,b是尾部下標 if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { if (ss >= 0) { if (U.compareAndSwapObject(a, i, t, null)) { //利用cas出隊 q.base = b + 1; if (n < -1) // signal others signalWork(ws, q); return t; //出隊成功,成功竊取一個任務! } } else if (oldSum == 0 && // try to activate 隊列沒有激活,嘗試激活 w.scanState < 0) tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; } if ((k = (k + 1) & m) == origin) { // continue until stable k = k + 1表示取下一個隊列 若是(k + 1) & m == origin表示 已經遍歷完所 //有隊列了 if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }
因此咱們知道任務的竊取從workerThread運行的那一刻就已經開始了!先隨機選中一條隊列看能不能竊取到任務,取不到則竊取下一條隊列,直接遍歷完一遍全部的隊列,若是都竊取不到就返回null。