JAVA-ThreadPoolExecutor筆記

常量分析

// 線程池控制器,高3位表明狀態,低29位表示線程池的大小
// 例如:11100000000000000000000000000001,高3位111表明線程池處於RUNNING狀態,低29位00000000000000000000000000001表明當前有一個工做線程.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// COUNT_BITS爲29
private static final int COUNT_BITS = Integer.SIZE - 3;

// 線程池最大容量爲2^30 - 1,也就是ctl的低29位所有爲1.
// 二進制值:000111111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// RUNNING:11100000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

// SHUTDOWN:00000000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

// STOP:00100000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

// TIDYING:01000000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

// TERMINATED:01100000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 計算線程池當前的狀態.
// 例如:當前ctl(也就是傳入的參數c)爲01100000000000000000000000000111, ~CAPACITY爲CAPACITY取反.
// 那麼c & ~CAPACITY也就是01100000000000000000000000000111 & 11100000000000000000000000000000,結果爲:01100000000000000000000000000000
private static int runStateOf(int c)     { return c & ~CAPACITY; }

// 與runStateOf方法相似,不過這是求當前線程池中的線程數量
private static int workerCountOf(int c)  { return c & CAPACITY; }

// 求控制器,rs表明線程池的runState,wc表明線程池當前的線程數量,作或操做其實就是求出當前線程池的控制器的值.
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼

主流程解析

  • int corePoolSize:核心線程數.
  • int maximumPoolSize:最大線程數.
  • long keepAliveTime:除核心線程以外的其餘線程保持存活的時間.
  • TimeUnit unit:時間單位.
  • BlockingQueue workQueue:任務隊列.當前核心線程沒法知足任務需求時,先將任務放入隊列中.
  • ThreadFactory threadFactory:線程工廠.
  • RejectedExecutionHandler handler:拒絕策略處理器.
public void execute(Runnable command) {
    // 任務爲null,直接拋出異常
    if (command == null)
        throw new NullPointerException();
    
    // 獲取控制器    
    int c = ctl.get();
    // 計算工做線程數是否小於corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 添加工做線程
        if (addWorker(command, true))
            // 添加成功,返回
            return;
            
        // 從新獲取控制器,可能有其餘線程對其進行了修改.
        c = ctl.get();
    }
    
    // 線程池是RUNNING狀態,而且任務放入隊列成功.
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次獲取控制器
        int recheck = ctl.get();
        // 線程狀態不是RUNNING,從隊列中刪除任務,並調用RejectedExecutionHandler進行處理.
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0) // 判斷工做線程數是否爲0,爲0則添加工做線程.
            addWorker(null, false);
    }
    // 線程池的任務數大於corePoolSize,而且隊列也已經放滿,則添加工做線程來對任務進行處理,若是添加失敗,執行reject策略.
    else if (!addWorker(command, false))
        reject(command);
}
複製代碼

添加工做線程流程解析

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 狀態判斷
        // rs >= SHUTDOWN表示狀態爲SHUTDOWN、STOP、TIDYING或者TERMINATED
        // 總體意思爲,若是當前線程池的狀態爲非RUNNING狀態,而且狀態不爲SHUTDOWN或者入參任務不爲null或者任務隊列爲空的狀況下,再也不建立工做線程.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
            
        // 該循環的意思相似於CAS操做,經過循環不停的去對線程池的工做線程數量+1.
        for (;;) {
            // 獲取工做線程數量.
            int wc = workerCountOf(c);
            // 若是工做線程的數量大於CAPACITY(2^30 - 1)或者大於了設定的參數值(當core爲true時是corePoolSize,不然爲maximumPoolSize),直接返回失敗
            // 返回失敗以後的處理有兩種狀況:若是core爲true的話,會把任務放入任務隊列中;若是core爲false的話,會執行reject策略.
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 利用原子類的CAS操做對工做線程數量+1
            if (compareAndIncrementWorkerCount(c))
                // +1成功,退出retry循環.
                break retry;
            c = ctl.get();  // Re-read ctl
            // 狀態發生了變化,則從新進行retry循環,從新進行狀態檢查,不然只需在當前循環中繼續cas操做.
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 獲取鎖
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                // 若是狀態爲RUNNING或者狀態爲SHUTDOWN而且任務爲null.
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 判斷線程是否已啓動.
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 加入工做線程隊列
                    workers.add(w);
                    
                    // 更新線程池的大小(largestPoolSize:線程池的工做線程的最大數量),加鎖主要是爲了更新該字段.
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            
            // 若是添加到工做線程隊列成功,則啓動線程,將workerStarted參數設置爲true.
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 添加工做線程失敗的處理邏輯.
            addWorkerFailed(w);
    }
    return workerStarted;
}
複製代碼

addWorkerFailed(w)方法解析程序員

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            // worker不爲null,從工做線程隊列中移除.
            workers.remove(w);
        // worker數量-1.
        decrementWorkerCount();
        // 檢查terminate.
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}


final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        
        // 若是狀態時RUNNING,或者狀態是TIDYING或TERMINATED,或者狀態是SHUTDOWN而且任務隊列不爲空的狀況,直接返回.
        // 狀態時RUNNING的狀況,不能執行terminate操做
        // 狀態時TIDYING或者TERMINATED的狀況,不須要再執行一次terminate操做.
        // 當是SHUTDOWN狀態可是任務隊列不爲空的話,說明還有任務須要執行,也沒法執行terminate操做.
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
            
        // 若是worker的數量不爲0,則將其中一個worker的中斷標誌設置爲true,並返回.
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 將狀態設置爲TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // hook方法.空實現
                    terminated();
                } finally {
                    // 將狀態設置爲TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 喚醒全部在maintain lock上等待的線程.
                    // 好比,若是主線程使用了executor的awaitTermination方法,那麼就會在該語句以後被喚醒.
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}


private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 線程處於非interrupted狀態,而且獲取到了worker對象的鎖
            // 這裏獲取worker對象鎖的主要緣由是防止線程正在執行任務而被中斷.
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
複製代碼

Worker解析

Worker繼承了AQS,而且實現了Runnable接口.bash

Worker(Runnable firstTask) {
  // 這裏state是AQS中一個volatile變量,Worker中的tryLock和unlock方法都是基於該變量實現的.
  // 將state的值設置爲-1,禁止在執行runWorker以前被中斷.
  setState(-1);
  this.firstTask = firstTask;
  this.thread = getThreadFactory().newThread(this);
}

public void run() {
    // 重點方法.
    runWorker(this);
}


final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 釋放鎖(將state的值設置爲0),使調用interruptIdleWorkers()方法的線程能夠獲取鎖來設置線程的中斷標誌.
    w.unlock(); // allow interrupts
    
    // 是否忽然完成,好比用戶代碼發生異常,致使直接走到finally代碼塊中.
    boolean completedAbruptly = true;
    try {
        // task不爲null,或者從任務隊列中獲取task不爲null.
        // 若設置了中斷標誌爲true,那麼在getTask方法上阻塞的線程能夠直接拋出InterruptedException,從而結束線程.
        while (task != null || (task = getTask()) != null) {
            // 加鎖,防止在執行任務的過程當中線程被中斷.
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 上面的英文意思很清楚,這裏想說的是爲何代碼要這麼寫
            // 查看Thread的源碼能夠看到Thread.interrupted()方法會清除掉當前線程的中斷標誌.
            // 所以當||操做符前面的條件爲false時(也就是狀態不爲STOP、TIDYING、TERMINATED)時,就會調用Thread.interrupted()方法,將線程的中斷標誌清除掉.
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
                
            // 執行任務.
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 請看下面的代碼分析.
        processWorkerExit(w, completedAbruptly);
    }
}


private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 狀態檢查
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // 判斷是否須要超時
        // wc > corePoolSize表示工做線程數大於corePoolSize,在獲取任務的時候要加超時操做.
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 就是檢查上一次操做是否超時以及任務隊列是否爲空.
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // worker線程數-1
            if (compareAndDecrementWorkerCount(c))
                // -1成功,返回null,使線程退出while循環,正常結束.
                return null;
            // 繼續for循環.
            continue;
        }
        
        try {
            // 從任務隊列中拉取任務,若是timed爲false,則線程會一直阻塞,直到任務隊列中有值爲止;若是timed爲true,超時返回null,將timedOut設置爲true,而後在上面的if判斷中進行判斷.
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}


private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // completedAbruptly若是是true,也就是忽然完成,業務代碼拋出了異常,worker的數量-1.
    // 若是是worker是正常退出的話,只有多是設置了allowCoreThreadTimeOut,該字段的意思是容許核心worker線程超時.
    // 意思是就算工做線程數小於或等於corePoolSize,當某個線程未在keepAliveTime內獲取到任務時,也將退出循環,結束線程.
    if (completedAbruptly)
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 完成的任務數+1
        completedTaskCount += w.completedTasks;
        // 從隊列中移除worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
    // 參考上面該方法的解析
    tryTerminate();
    int c = ctl.get();

    if (runStateLessThan(c, STOP)) {
        // 線程正常退出.
        if (!completedAbruptly) {
            // 若是容許核心線程數超時,則min爲0,不然爲corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 若是min爲0,而且任務隊列不爲空,則最小線程數爲1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            
            // 若是工做線程的數量大於等於min,則退出方法.
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 添加工做線程,參考上面addWorker方法解析.
        addWorker(null, false);
    }
}
複製代碼

拒絕策略

在ThreadPoolExecutor中實現了4種默認的拒絕策略ui

  • CallerRunsPolicy:在executor沒有被shutdown的狀況下,直接執行任務.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}
複製代碼
  • AbortPolicy:這個就厲害了,直接拋出RejectedExecutionException異常!
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                            " rejected from " +
                                            e.toString());
}
複製代碼
  • DiscardPolicy:直譯過來就是,悄咪咪的將任務丟棄掉,噓!不能讓程序員知道!
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
複製代碼
  • DiscardOldestPolicy:將任務隊列中最老的任務丟棄掉(這麼老了就別佔着茅坑不拉屎啦,對於FIFO隊列是丟棄掉下一個將被執行的任務,若是是優先級隊列那麼這種策略將會丟棄優先級最高的任務,所以最好不要將該策略與優先級隊列一塊兒使用),而後將新任務放入execute方法中執行(能不能執行或者佔個坑就看運氣了)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  if (!e.isShutdown()) {
    e.getQueue().poll();
    e.execute(r);
  }
}
複製代碼

shutDown和shutDownNow

shutdown和shutDownNow的區別主要是,shutdown會等線程將當前任務執行完成才進行interrupt操做,而shutDownNow是無論線程是否正在執行任務都進行interrupt操做.shutdown是將線程池狀態修改成SHUTDOWN,而shutDownNow是將線程池的狀態修改成STOP.this

  • shutdown:
public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
      checkShutdownAccess();
      // 將狀態設置爲SHUTDOWN
      advanceRunState(SHUTDOWN);
      // 這個方法是和shutDownNow的區別.interruptIdleWorkers方法上面已經分析過
      interruptIdleWorkers();
      onShutdown(); // hook for ScheduledThreadPoolExecutor
  } finally {
      mainLock.unlock();
  }
  tryTerminate();
}
複製代碼
  • shutDownNow:
public List<Runnable> shutdownNow() {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
      checkShutdownAccess();
      // 將狀態設置爲STOP
      advanceRunState(STOP);
      // 和shutdown調用的方法也不同.
      interruptWorkers();
      tasks = drainQueue();
  } finally {
      mainLock.unlock();
  }
  tryTerminate();
  return tasks;
}

private void interruptWorkers() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
      for (Worker w : workers)
          // 主要是該方法.
          w.interruptIfStarted();
  } finally {
      mainLock.unlock();
  }
}

// Worker對象的方法
void interruptIfStarted() {
  Thread t;
  // state的狀態只有在Worker對象未調用runWorker以前纔會是負數,所以只要worker調用了runWorker方法,不論是加鎖仍是未加鎖,getState方法的返回值都是大於等於0的.
  if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
          t.interrupt();
      } catch (SecurityException ignore) {
      }
  }
}
複製代碼

狀態機

相關文章
相關標籤/搜索