多線程高併發編程(8) -- Fork/Join源碼分析

一.概念

  Fork/Join就是將一個大任務分解(fork)成許多個獨立的小任務,而後多線程並行去處理這些小任務,每一個小任務處理完獲得結果再進行合併(join)獲得最終的結果。html

  流程:任務繼承RecursiveTask,重寫compute方法,使用ForkJoinPool的submit提交任務,任務在某個線程中運行,工做任務中的compute方法的代碼開始對任務進行分析,若是符合條件就進行任務拆分,拆分紅多個子任務,每一個子任務進行數據的計算或操做,獲得結果返回給上一層任務開啓線程進行合併,最終經過get獲取總體處理結果。【只能將任務1個切分爲兩個,不能切分爲3個或其餘數量數組

  • ForkJoinTask:表明fork/join裏面的任務類型,通常用它的兩個子類RecursiveTask(任務有返回值)和RecursiveAction(任務沒有返回值),任務的處理邏輯包括任務的切分都是在重寫compute方法裏面進行處理。只有ForkJoinTask任務能夠被拆分運行和合並運行。可查看上篇Future源碼分析的類圖結構】【ForkJoinTask使用了模板模式進行設計,將ForkJoinTask的執行相關代碼進行隱藏,經過提供抽象類(即子類RecursiveTask、RecursiveAction)暴露用戶的實際業務處理。】
    • RecursiveTask:在進行exec以後會使用一個result的變量進行接受返回的結果;
      public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
          V result;
          protected abstract V compute();
      
          public final V getRawResult() {
              return result;
          }
      
          protected final void setRawResult(V value) {
              result = value;
          }
          protected final boolean exec() {
              result = compute();
              return true;
          }
      
      }
    • RecursiveAction:在進行exec以後沒有返回結果;
      public abstract class RecursiveAction extends ForkJoinTask<Void> {
         
          protected abstract void compute();
      
          public final Void getRawResult() { return null; }
      
          protected final void setRawResult(Void mustBeNull) { }
      
          protected final boolean exec() {
              compute();
              return true;
          }
      
      } 
  • ForkJoinPool:fork/join框架的管理者,最原始的任務都要交給它來處理。它負責控制整個fork/join有多少個工做線程,工做線程的建立、機會都是由它來控制。它還負責workQueue隊列的建立和分配,每當建立一個工做線程,它負責分配對應的workQueue,而後它把接到的活都交給工做線程去處理。是整個fork/join的容器。
    • ForkJoinPool.WorkQueue:雙端隊列,負責存儲接收的任務;
  • ForkJoinWorkerThread:fork/join裏面真正幹活的」工人「,它繼承了Thread,因此本質是一個線程。它有一個ForkJoinPool.WorkQueue的隊列存放着它要乾的活,接活以前它要向ForkJoinPool註冊(registerWorker),拿到相應的workQueue,而後就從workQueue裏面拿任務出來處理。它是依附於ForkJoinPool而存活,若是ForkJoinPool銷燬了,它也會跟着結束。【每個ForkJoinWorkerThread線程都具備一個獨立的任務等待隊列workQueue。】
    • 當使用ForkJoinPool進行submit任務提交時,建立1個workQueue將任務放進去,而後進行fork任務切分,若是切分後的任務放的進去以前的workQueue就放進去,不行就隨機選取workQueue放進去,若是還放不了就建立一個新的workQueue放進去;
public class ForkJoinWorkerThread extends Thread {
    final ForkJoinPool pool;
    final ForkJoinPool.WorkQueue workQueue;
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        super("aForkJoinWorkerThread");
        this.pool = pool;
        this.workQueue = pool.registerWorker(this);
    }
}

二.用法

  之前1+2+3+...+100這樣的處理能夠用for循環處理,如今使用fork/join來處理:從下面結果能夠看到,大任務被不斷的拆分紅小任務,而後添加到工做線程的隊列中,每一個小任務都會被工做線程從隊列中取出進行運行,而後每一個小任務的結果的合併也由工做線程執行,而後不斷的彙總成最終結果。【task經過ForkJoinPool來執行,分割的子任務添加到當前工做線程的隊列中,進入隊列的頭部,當一個工做線程中沒有任務時,會從其餘工做線程的隊列尾部獲取一個任務。(工做竊取:當前工做線程對應的隊列中沒有任務了,從其餘工做線程對應的隊列中取出任務進行操做,而後將操做結果返還給對應隊列的線程。)】多線程

public class MyFrokJoinTask extends RecursiveTask<Integer> {
    private int begin;
    private int end;

    public MyFrokJoinTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    public static void main(String[] args) throws Exception {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> result = pool.submit(new MyFrokJoinTask(1, 100));//提交任務
        System.out.println("計算的值:"+result.get());//獲得最終的結果

    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - begin <= 2) {
            for (int i = begin; i <= end; i++) {
                sum += i;
                System.out.println("i:"+i);
            }
        } else {
            MyFrokJoinTask d1 = new MyFrokJoinTask(begin, (begin + end) / 2);
            MyFrokJoinTask d2 = new MyFrokJoinTask((begin + end) / 2+1, end);
            d1.fork();//任務拆分
            d2.fork();//任務拆分
            Integer a = d1.join();//每一個任務的結果
            Integer b = d2.join();//每一個任務的結果
            sum = a + b;//彙總任務結果
            System.out.println("sum:" + sum + ",a:" + a + ",b:" + b);
        }
        System.out.println("name:"+Thread.currentThread().getName());
        return sum;
    }
}
//=========結果============
i:1
i:2
name:ForkJoinPool-1-worker-1
i:3
i:4
name:ForkJoinPool-1-worker-1
sum:10,a:3,b:7
name:ForkJoinPool-1-worker-1
i:5
i:6
i:7
name:ForkJoinPool-1-worker-1
sum:28,a:10,b:18
name:ForkJoinPool-1-worker-1
...............
...............
sum:91,a:28,b:63
sum:99,a:45,b:54
name:ForkJoinPool-1-worker-3
name:ForkJoinPool-1-worker-1
i:23
i:24
i:25
name:ForkJoinPool-1-worker-2
sum:135,a:63,b:72
name:ForkJoinPool-1-worker-2
sum:234,a:99,b:135
name:ForkJoinPool-1-worker-3
sum:325,a:91,b:234
name:ForkJoinPool-1-worker-1
sum:1275,a:325,b:950
name:ForkJoinPool-1-worker-1
sum:5050,a:1275,b:3775
name:ForkJoinPool-1-worker-1
計算的值:5050

三.分析

  ForkJoinPool

ForkJoinPool forkJoinPool = new ForkJoinPool();
//Runtime.getRuntime().availableProcessors()當前操做系統可使用的CPU內核數量
public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}
//this調用到下面這段代碼
public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism), //並行度
            checkFactory(factory), //工做線程建立工廠
            handler, //異常處理handler
            asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //任務隊列出隊模式 異步:先進先出,同步:後進先出
            "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}
//上面的this最終調用到下面這段代碼
private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
  • parallelism:可並行數量,fork/join框架將依據這個並行數量的設定,決定框架內並行執行的線程數量。並行的每個任務都會有一個線程進行處理;
  • factory當fork/join建立一個新的線程時,一樣會用到線程建立工廠。它實現了ForkJoinWorkerThreadFactory接口,使用默認的的接口實現類DefaultForkJoinWorkerThreadFactory來實現newThread方法建立一個新的工做線程;
    public static interface ForkJoinWorkerThreadFactory {
            /**
             * Returns a new worker thread operating in the given pool.
             */
            public ForkJoinWorkerThread newThread(ForkJoinPool pool);
        }
    
        static final class DefaultForkJoinWorkerThreadFactory
            implements ForkJoinWorkerThreadFactory {
            public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                return new ForkJoinWorkerThread(pool);
            }
        }
  • handler:異常捕獲處理器。當執行的任務出現異常,並從任務中被拋出時,就會被handler捕獲;
  • asyncMode:fork/join爲每個獨立的工做線程準備了對應的待執行任務隊列,這個任務隊列是使用數組進行組合的雙向隊列。便可以使用先進先出的工做模式,也可使用後進先出的工做模式;

   Fork()和Join()

  fork/join框架中提供的fork()和join()是最重要的兩個方法,它們和parallelism(」可並行任務數量「)配合工做,能夠致使拆分的子任務T1.一、T1.2甚至TX在fork/join中不一樣的運行效果(上面1+2....+100的每次運行的子任務都是不一樣的)。即TX子任務或等待其餘已存在的線程運行關聯的子任務(sum操做),或在運行TX的線程中」遞歸「執行其餘任務(將1-50進行拆分後的子任務遞歸運行),或啓動一個新的線程執行子任務(運行1-50另外一邊拆分的任務,即50-100的子任務)。併發

  fork()用於將新建立的子任務放入當前線程的workQueue隊列中,fork/join框架將根據當前正在併發執行ForkJoinTask任務的ForkJoinWorkerThread線程狀態,決定是讓這個任務在隊列中等待,仍是建立一個新的ForkJoinWorkedThread線程運行它,又或者是喚起其餘正在等待任務的ForkJoinWorkerThread線程運行它。框架

  join()用於讓當前線程阻塞,直到對應的子任務完成運行並返回執行結果。或者,若是這個子任務存在於當前線程的任務等待隊列workQueue中,則取出這個子任務進行」遞歸「執行,其目的是儘快獲得當前子任務的運行結果,而後繼續執行。dom

  提交任務:

  1.  sumbit的第一次提交:ForkJoinPool.submit(ForkJoinTask<T> task) -> externalPush(task) -> externalSubmit(task)

    1. submit:異步

      public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
              if (task == null)
                  throw new NullPointerException();
              externalPush(task);
              return task;
          }
      
          public <T> ForkJoinTask<T> submit(Callable<T> task) {
              ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
              externalPush(job);
              return job;
          }
      
          public <T> ForkJoinTask<T> submit(Runnable task, T result) {
              ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
              externalPush(job);
              return job;
          }
      
          public ForkJoinTask<?> submit(Runnable task) {
              if (task == null)
                  throw new NullPointerException();
              ForkJoinTask<?> job;
              if (task instanceof ForkJoinTask<?>) // avoid re-wrap
                  job = (ForkJoinTask<?>) task;
              else
                  job = new ForkJoinTask.AdaptedRunnableAction(task);
              externalPush(job);
              return job;
          }
    2. externalPush:將任務添加到隨機選取的隊列中或新建立的隊列中;
      final void externalPush(ForkJoinTask<?> task) {
              WorkQueue[] ws; WorkQueue q; int m;
              int r = ThreadLocalRandom.getProbe();//當前線程的一個隨機數
              int rs = runState;//當前容器的狀態
              //若是隨機選取的隊列還有空位置能夠存放、隊列加鎖鎖定成功,任務就放入隊列中
              if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                  (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                  U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                  ForkJoinTask<?>[] a; int am, n, s;
                  if ((a = q.array) != null &&
                      (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                      int j = ((am & s) << ASHIFT) + ABASE;
                      U.putOrderedObject(a, j, task);//任務加入隊列中
                      U.putOrderedInt(q, QTOP, s + 1);//挪動下次任務存放的槽的位置
                      U.putIntVolatile(q, QLOCK, 0);//隊列解鎖
                      if (n <= 1)//當前數組元素少時,進行喚醒當前線程;或者當沒有活動線程或線程數較少時,添加新的線程
                          signalWork(ws, q);
                      return;
                  }
                  U.compareAndSwapInt(q, QLOCK, 1, 0);//隊列解鎖
              }
              externalSubmit(task);//升級版的externalPush
          }
      
      
          volatile int runState;               // lockable status鎖定狀態
          // runState: SHUTDOWN爲負數,其餘的爲2的次冪
          private static final int  RSLOCK     = 1;
          private static final int  RSIGNAL    = 1 << 1;//喚醒
          private static final int  STARTED    = 1 << 2;//啓動
          private static final int  STOP       = 1 << 29;//中止
          private static final int  TERMINATED = 1 << 30;//結束
          private static final int  SHUTDOWN   = 1 << 31;//關閉
    3. externalSubmit:隊列添加任務失敗,進行升級版操做,即建立隊列數組和建立隊列後,將任務放入新建立的隊列中;
      private void externalSubmit(ForkJoinTask<?> task) {
          int r;                                    // initialize caller's probe
          if ((r = ThreadLocalRandom.getProbe()) == 0) {
              ThreadLocalRandom.localInit();
              r = ThreadLocalRandom.getProbe();
          }
          for (;;) {//自旋
              WorkQueue[] ws; WorkQueue q; int rs, m, k;
              boolean move = false;
              /**
              *ForkJoinPool執行器中止工做了,拋出異常
              *ForkJoinPool extends AbstractExecutorService
              *abstract class AbstractExecutorService implements ExecutorService
              *interface ExecutorService extends Executor
              *interface Executor執行提交的對象Runnable任務
              */
              if ((rs = runState) < 0) {
                  tryTerminate(false, false);    // help terminate
                  throw new RejectedExecutionException();
              }
              //第一次遍歷,隊列數組未建立,進行建立
              else if ((rs & STARTED) == 0 ||     // initialize初始化
                       ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                  int ns = 0;
                  rs = lockRunState();
                  try {
                      if ((rs & STARTED) == 0) {
                          U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                 new AtomicLong());
                          // create workQueues array with size a power of two
                          int p = config & SMASK; // ensure at least 2 slots,config是CPU核數
                          int n = (p > 1) ? p - 1 : 1;
                          n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                          n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                          workQueues = new WorkQueue[n];//建立
                          ns = STARTED;
                      }
                  } finally {
                      unlockRunState(rs, (rs & ~RSLOCK) | ns);
                  }
              }
              //第三次遍歷,把任務放入隊列中
              else if ((q = ws[k = r & m & SQMASK]) != null) {
                  if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                      ForkJoinTask<?>[] a = q.array;
                      int s = q.top;
                      boolean submitted = false; // initial submission or resizing
                      try {                      // locked version of push
                          if ((a != null && a.length > s + 1 - q.base) ||
                              (a = q.growArray()) != null) {
                              int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                              U.putOrderedObject(a, j, task);
                              U.putOrderedInt(q, QTOP, s + 1);
                              submitted = true;
                          }
                      } finally {
                          U.compareAndSwapInt(q, QLOCK, 1, 0);
                      }
                      if (submitted) {
                          signalWork(ws, q);
                          return;
                      }
                  }
                  move = true;                   // move on failure
              }
              //第二次遍歷,隊列數組爲空,建立隊列
              else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                  q = new WorkQueue(this, null);
                  q.hint = r;
                  q.config = k | SHARED_QUEUE;
                  q.scanState = INACTIVE;
                  rs = lockRunState();           // publish index
                  if (rs > 0 &&  (ws = workQueues) != null &&
                      k < ws.length && ws[k] == null)
                      ws[k] = q;                 // else terminated
                  unlockRunState(rs, rs & ~RSLOCK);
              }
              else
                  move = true;                   // move if busy
              if (move)
                  r = ThreadLocalRandom.advanceProbe(r);
          }
      }
  2. fork任務切分的提交:ForkJoinTask.fork() -> ForkJoinWorkerThread.workQueue.push(task)/ForkJoinPool.common.externalPush(task) -> ForkJoinPool.push(task)/externalPush(task)

    1. fork:
      public final ForkJoinTask<V> fork() {
              Thread t;
              if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)//當前線程是workerThread,任務直接放入workerThread當前的workQueue
                  ((ForkJoinWorkerThread)t).workQueue.push(this);
              else
                  ForkJoinPool.common.externalPush(this);//將任務添加到隨機選取的隊列中或新建立的隊列中
              return this;
          }
    2.  push:async

      public class ForkJoinPool extends AbstractExecutorService {
              static final class WorkQueue {
                  final void push(ForkJoinTask<?> task) {
                      ForkJoinTask<?>[] a; ForkJoinPool p;
                      int b = base, s = top, n;
                      if ((a = array) != null) {    // ignore if queue removed,隊列被移除忽略
                          int m = a.length - 1;     // fenced write for task visibility
                          U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);//任務加入隊列中
                          U.putOrderedInt(this, QTOP, s + 1);//挪動下次任務存放的槽的位置
                          if ((n = s - b) <= 1) {//當前數組元素少時,進行喚醒當前線程;或者當沒有活動線程或線程數較少時,添加新的線程
                              if ((p = pool) != null)
                                  p.signalWork(p.workQueues, this);
                          }
                          else if (n >= m)//數組全部元素都滿了進行2倍擴容
                              growArray();
                      }
                  }
                  final ForkJoinTask<?>[] growArray() {
                      ForkJoinTask<?>[] oldA = array;
                      int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;//2倍擴容或初始化
                      if (size > MAXIMUM_QUEUE_CAPACITY)
                          throw new RejectedExecutionException("Queue capacity exceeded");
                      int oldMask, t, b;
                      ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
                      if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
                          (t = top) - (b = base) > 0) {
                          int mask = size - 1;
                          do { // emulate poll from old array, push to new array遍歷從舊數組中取出放到新數組中
                              ForkJoinTask<?> x;
                              int oldj = ((b & oldMask) << ASHIFT) + ABASE;
                              int j    = ((b &    mask) << ASHIFT) + ABASE;
                              x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);//從舊數組中取出
                              if (x != null &&
                                  U.compareAndSwapObject(oldA, oldj, x, null))//將舊數組取出的位置的對象置爲null
                                  U.putObjectVolatile(a, j, x);//放入新數組
                          } while (++b != t);
                      }
                      return a;
                  }
              }
          }

  任務的消費

  任務的消費的執行鏈路是ForkJoinTask.doExec() -> RecursiveTask.exec()/RecursiveAction.exec() -> 覆蓋重寫的compute()ide

  1.  doExec:任務的執行入口

    final int doExec() {
            int s; boolean completed;
            if ((s = status) >= 0) {
                try {
                    completed = exec();//消費任務
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    s = setCompletion(NORMAL);//任務執行完設置狀態爲NORMAL,並喚醒其餘等待任務
            }
            return s;
        }
        protected abstract boolean exec();
        private int setCompletion(int completion) {
            for (int s;;) {
                if ((s = status) < 0)
                    return s;
                if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {//任務狀態修改成NORMAL
                    if ((s >>> 16) != 0)//狀態不是SMASK
                        synchronized (this) { notifyAll(); }//喚醒其餘等待任務
                    return completion;
                }
            }
        }
        /** The run status of this task 任務的運行狀態*/
        volatile int status; // accessed directly by pool and workers由ForkJoinPool池或ForkJoinWorkerThread控制
        static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
        static final int NORMAL      = 0xf0000000;  // must be negative
        static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
        static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
        static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
        static final int SMASK       = 0x0000ffff;  // short bits for tags

  任務真正執行處理邏輯

  任務提交到ForkJoinPool,最終真正的是由繼承Thread的ForkJoinWorkerThread的run方法來執行消費任務的,ForkJoinWorkerThread處理哪一個任務是由join來出隊的;源碼分析

    1. ForkJoinTask.join()

          public final V join() {
              int s;
              if ((s = doJoin() & DONE_MASK) != NORMAL)
                  reportException(s);
              return getRawResult();//獲得返回結果
          }
          private int doJoin() {
              int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
              /**
               * (s = status) < 0 判斷任務是否已經完成,完成直接返回s
               * 任務未完成:
               *          1)線程是ForkJoinWorkerThread,tryUnpush任務出隊而後消費任務doExec
               *              1.1)出隊或消費失敗,執行awaitJoin進行自旋,若是任務狀態是完成就退出,不然繼續嘗試出隊,直到任務完成或超時爲止;
               *          2)若是線程不是ForkJoinWorkerThread,執行externalAwaitDone進行出隊消費
               */
              return (s = status) < 0 ? s :
                  ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                  (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                  tryUnpush(this) && (s = doExec()) < 0 ? s :
                  wt.pool.awaitJoin(w, this, 0L) :
                  externalAwaitDone();
          }
          private void reportException(int s) {
              if (s == CANCELLED)//取消
                  throw new CancellationException();
              if (s == EXCEPTIONAL)//異常
                  rethrow(getThrowableException());
          }
      1. awaitJoin:
            public class ForkJoinPool{
                final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
                    int s = 0;
                    if (task != null && w != null) {
                        ForkJoinTask<?> prevJoin = w.currentJoin;
                        U.putOrderedObject(w, QCURRENTJOIN, task);
                        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                            (CountedCompleter<?>)task : null;
                        for (;;) {
                            if ((s = task.status) < 0)//任務完成退出
                                break;
                            if (cc != null)//當前任務即將完成,檢查是否還有其餘的等待任務,若是有
                                //運行當前隊列的其餘任務,若當前的隊列中沒有任務了,則竊取其餘隊列的任務並運行
                                helpComplete(w, cc, 0);
                            //當前隊列沒有任務了,或隊列只剩下最後一個任務執行完了
                            else if (w.base == w.top || w.tryRemoveAndExec(task))
                                helpStealer(w, task);//竊取其餘隊列的任務
                            if ((s = task.status) < 0)
                                break;
                            long ms, ns;
                            if (deadline == 0L)
                                ms = 0L;
                            else if ((ns = deadline - System.nanoTime()) <= 0L)//超時退出
                                break;
                            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                                ms = 1L;
                            if (tryCompensate(w)) {//當前隊列阻塞了
                                task.internalWait(ms);//進行等待
                                U.getAndAddLong(this, CTL, AC_UNIT);
                            }
                        }
                        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
                    }
                    return s;
                }
            }
      2. externalAwaitDone:
            private int externalAwaitDone() {
                /**
                *   當前任務是CountedCompleter
                *   1)是則執行ForkJoinPool.common.externalHelpComplete()
                *   2)不然執行ForkJoinPool.common.tryExternalUnpush(this)進行任務出隊
                *       2.1)出隊成功,進行doExec()消費,不然進行阻塞等待
                */
                int s = ((this instanceof CountedCompleter) ? // try helping
                         ForkJoinPool.common.externalHelpComplete(
                             (CountedCompleter<?>)this, 0) :
                         ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
                if (s >= 0 && (s = status) >= 0) {//任務未完成
                    boolean interrupted = false;
                    do {
                        if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {//任務狀態標記爲SIGNAL
                            synchronized (this) {
                                if (status >= 0) {
                                    try {
                                        wait(0L);//阻塞等待
                                    } catch (InterruptedException ie) {//有中斷異常
                                        interrupted = true;//設置中斷標識爲true
                                    }
                                }
                                else
                                    notifyAll();//任務完成喚醒其餘任務
                            }
                        }
                    } while ((s = status) >= 0);
                    if (interrupted)
                        Thread.currentThread().interrupt();//當前線程進行中斷
                }
                return s;
            }
            final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
                WorkQueue[] ws; int n;
                int r = ThreadLocalRandom.getProbe();
                //沒有任務直接結束,有任務則執行helpComplete
                //helpComplete:運行隨機選取的隊列的任務,若選取的隊列中沒有任務了,則竊取其餘隊列的任務並運行
                return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 :
                    helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks);
            } 
  1. run和工做竊取

  任務是由workThread來竊取的,workThread是一個線程。線程的全部邏輯都是由run()方法執行:

public class ForkJoinWorkerThread extends Thread {
    public void run() {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();//初始化狀態
                pool.runWorker(workQueue);//處理任務隊列
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception);
                }
            }
        }
    }
}
    public class ForkJoinPool{
        final void runWorker(WorkQueue w) {
            w.growArray();                   // allocate queue,隊列初始化
            int seed = w.hint;               // initially holds randomization hint
            int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
            for (ForkJoinTask<?> t;;) {//自旋
                if ((t = scan(w, r)) != null)//從隊列中竊取任務成功,scan()進行任務竊取
                    w.runTask(t);//執行任務,內部方法調用了doExec()進行任務的消費
                else if (!awaitWork(w, r))//隊列沒有任務了則結束
                    break;
                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
            }
        }
    }
    1. scan:
      private ForkJoinTask<?> scan(WorkQueue w, int r) {
              WorkQueue[] ws; int m;
              if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
                  int ss = w.scanState;                     // initially non-negative
                  for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                      WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                      int b, n; long c;
                      if ((q = ws[k]) != null) {   //隨機選中了非空隊列 q
                          if ((n = (b = q.base) - q.top) < 0 &&
                              (a = q.array) != null) {      // non-empty
                              long i = (((a.length - 1) & b) << ASHIFT) + ABASE;  //從尾部出隊,b是尾部下標
                              if ((t = ((ForkJoinTask<?>)
                                        U.getObjectVolatile(a, i))) != null &&
                                  q.base == b) {
                                  if (ss >= 0) {
                                      if (U.compareAndSwapObject(a, i, t, null)) { //利用cas出隊
                                          q.base = b + 1;
                                          if (n < -1)       // signal others
                                              signalWork(ws, q);
                                          return t;  //出隊成功,成功竊取一個任務!
                                      }
                                  }
                                  else if (oldSum == 0 &&   // try to activate 隊列沒有激活,嘗試激活
                                           w.scanState < 0)
                                      tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                              }
                              if (ss < 0)                   // refresh
                                  ss = w.scanState;
                              r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                              origin = k = r & m;           // move and rescan
                              oldSum = checkSum = 0;
                              continue;
                          }
                          checkSum += b;
                      }
         //k = k + 1表示取下一個隊列 若是(k + 1) & m == origin表示已經遍歷完全部隊列了 if ((k = (k + 1) & m) == origin) { // continue until stable if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }
    2. ForkJoinPool.runTask:
              volatile int scanState;    // versioned, <0: inactive; odd:scanning,版本標記,小於0暫停,奇數進行掃描其餘任務
              static final int SCANNING     = 1;             // false when running tasks,有任務執行是false
              /**
               * Executes the given task and any remaining local tasks.
               * 執行給定的任務和任何剩餘的本地任務
               */
              final void runTask(ForkJoinTask<?> task) {
                  if (task != null) {
                      scanState &= ~SCANNING; // mark as busy,暫停掃描,當前有任務執行
                      (currentSteal = task).doExec();//執行竊取的任務
                      U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC,竊取的任務執行完置爲null
                      execLocalTasks();//執行本地的任務
                      ForkJoinWorkerThread thread = owner;
                      if (++nsteals < 0)      // collect on overflow,竊取計數溢出
                          transferStealCount(pool);//重置竊取計數
                      scanState |= SCANNING;//繼續掃描隊列
                      if (thread != null)
                          thread.afterTopLevelExec();
                  }
              }
              static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
                      @Override // to erase ThreadLocals,清除threadLocals
                      void afterTopLevelExec() {
                          eraseThreadLocals();
                      }
                      /**
                       * Erases ThreadLocals by nulling out Thread maps.
                       */
                      final void eraseThreadLocals() {
                          U.putObject(this, THREADLOCALS, null);//threadLocals置爲null
                          U.putObject(this, INHERITABLETHREADLOCALS, null);//inheritablethreadlocals置爲null
                      }
              } 

四.總結

  對於fork/join來講,在使用時仍是存在下面的一些問題的:

  • 在使用JVM的時候咱們要考慮OOM的問題,若是咱們的任務處理時間很是耗時,而且處理的數據很是大的時候,會形成OOM;
  • ForkJoin是經過多線程的方式進行處理任務,那麼咱們不得不考慮是否應該使用ForkJoin。由於當數據量不是特別大的時候,咱們沒有必要使用ForkJoin。由於多線程會涉及到上下文的切換,因此數據量不大的時候使用串行比使用多線程快;
    • 項目中進行本地測試發現,業務層Service進行excel表數據(數據量幾百)的複雜處理,進行單線程for循環統計消耗時間,而後與使用fork/join進行處理統計消耗時間,發現fork/join的消耗時間是單線程for的2倍;
相關文章
相關標籤/搜索