ThreadPoolExecutor代碼解析

派生體系
java.util.concurrent
  ThreadPoolExecutor
    AbstractExecutorService
      ExecutorService
        Executor
 
這個類是Executor框的核心實現,它的名字向咱們代表,它是使用thread pool實現的。這個thread pool主要解決了兩個問題:
  1. 執行大量的單個異步任務,通常狀況下,它能提高總體的性能。
  2. 執行由多個任務組成的任務集合,經過Future列表返回每一個任務的執行結果。
 
設計原理

 
重要概念
爲了可以在更多上下文環境中使用,ThreadPool定義了一些概念,這些概念都直接或間接對應着可調節參數,若是不瞭解這些概念含義,很難正確地使用這些參數。下面來看一下這些概念及其含義:
 
當前,核心,最小,最大線程數(poolSize, corePoolSize, minimumPoolSize, maximumPoolSize)
poolSize: 當前處於運行和空閒狀態的總線程數。
corePoolSize: 核心線程數, 當poolSize<=corePoolSize時,存在的線程稱爲coreThread。
minimumPoolSize: 最小線程數,當minimumPoolsize = allowCoreThreadTimeOut ? 0 : corePoolSize ,
maximunPoolSize: 最大線程數。
ThreadPool在運行過程當中會自動的調節線程數量(poolSize), 通常來講,poolSize處於[corePoolSize maximumPoolSize]區間以內。
當用戶調用execute提交一個任務時,若是poolSize<corePoolSize, 會建立一個新線程處理這個任務。若是若是poolSize處於[corePoolSize maximumPoolSize]區間內,只有隊列盡是纔會建立新線程。不管如何,poolSize不會大於maximumPoolSize。
默認狀況下,ThreadPool沒有收到任何任務時pooSize = 0, 只有當ThreadPool開始收到任務以後纔會建立線程。可是能夠經過覆蓋prestartCoreThread或prestartAllCoreThreads方法改變這種行爲,提早建立線程。
 
線程工廠--ThreadFactory
ThreadPool使用實現了ThreadFactory接口的實現建立新線程。Executors工廠類提供了defaultThreadFactory方法,該方法返回一個默認的ThreadFactory實例。使用這個實例建立的線程具備相同的優先級,是非後臺線程,命名上使用相同的前綴。若是不滿意這這些行爲,能夠本身實現一個ThreadFactory交給ThreadPool使用。
 
保持存活時間(keepAliveTime)
若是poolSize > corePoolSize, 當一個線程的空閒時間大於keepAliveTime, 它會被終止掉。默認狀況下當poolSize <= corePoolSize時,keepAliveTime不會有影響,若是調用 allowCoreThreadTimeOut(true), 可讓keepAliveTime在這個時間也起做用。
 
任務排隊(queuing)
任何BlockingQueue的實例均可以用於保存排隊的任務。不一樣的BlockingQueue實現決定了不一樣的排隊策略:
SynchronousQueue: 同步隊列,當提交一個任務時,要求ThreadPool中當前有至少一個空閒線程,或者能夠建立新的線程(poolSize < maximumPoolSize)當即執行這個任務,不然ThreadPool會拒絕這個任務。
LinkedBlockingQueue: 無限制的隊列(只受限於可以使用的內存), 不會處於full狀態, offer方法不會返回false,這意味這ThreadPool的pooSize<=corePoolSize, 不會建立大於corePoolSize的線程數。
ArrayBlockingQueue: 有限制的隊列, 受限於它的capacity。當poolSize == corePoolSize且隊列沒滿時, 新提交的任務會追加到隊列中排隊執行。 當poolSize在[corePoolSize maximumPooSize)區間同時隊被填列滿時,將會建立新的線程。直到poolSize == maximumPoolSize位置。 若是隊列被填滿同時pooSize == maximumPoolSize,新的任務會被拒絕。
 
拒絕任務(rejected tasks)
當ThreadPool遇到如下兩種狀況時會觸發拒絕任務策略:
  1. 正常狀況下BlockingQueue被填滿,同時poolSize == maximumPoolSize。
  2. 被關閉
ThreadPool使用RejectedExecutionHandler處理丟棄動做,默認定義了4中丟棄策略:
ThreadPoolExecutor.AbortPolicy: 拋出RejectedExecutionException異常。
ThreadPoolExecutor.CallerRunsPolicy: 本身執行這個被拋棄的任務。
ThreadPoolExecutor.DiscardPolicy: 悄無聲息的丟棄掉這人任務。
 
狀態
ThreadPool定義了5狀態
RUNNING: 接受新提交的任務,執行隊列中發任務。
SHUTDOWN: 不接受新提交的任務,但仍然會執行隊列中的人。
STOP: 不接受新提交的任務,不執行隊列中的任務,並且會打斷正在執行中的任務。
TIDYING: 全部的任務都終止了,而且線程數爲0,當全部的線程都過渡到TIDYING狀態後會調用treminated方法。
TERMINATED: treminated方法調用已經完成。
 
狀態之間的轉換關係
RUNNING --> SHUTDOWN
調用shutdown()
(RUNNING或SHUTDOWN) -- > STOP
調用shutdownNow()
SHUTDOWN --> TIDYING
隊列爲空,同時線程數爲0
TIDYING --> TREMINATED
treminated()執行完成。
 
向ThreadPool提交任務: execute
 
ThreadPoolExecutor實例建立以後,在沒有調用execute提交任務以前,ThreadPool中是沒有線程的,線程的建立是依賴exeute來驅動的。能夠說,exeute是ThreadPoolExecutor運行的觸發器,全部我選擇先從exeute方法開始分析代碼。
public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) { // 若是線程數小於 corePoolSize, 建立一個新線程。
    if (addWorker(command, true))
     return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) { // 若是處於RUNNGIN狀態把任務放到隊列中
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command)) / /再次檢查線程狀態,若是不是RUNNING狀態,把任務從隊列刪除,而後拒絕這個任務
      reject(command);
    else if (workerCountOf(recheck) == 0) // 若是線程數爲0,建立一個新線程
      addWorker(null, false);
  }
  /* 若是運行到這裏說明當前不是出於RUNNING狀態,或處於RUNNING狀態但隊列已經被填滿
  *嘗試建立新的線程執行這個任務,若是失敗,拒絕這任務
  */
  else if (!addWorker(command, false))
    reject(command);
}
以上就是exeute代碼,它很簡單,但其中ctl成員變量比較費解。ctl是AtomicInteger類型,它被用來打包保存ThreadPoolExecutor的狀態和線程數。
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
它初始化時,把狀態設置成RUNNING,下面來看看它的結構
 
高位 ---- > 低位
運算狀態(run state)
線程數(workerCount)
31 -- 29
28 -- 0
狀態位
RUNNING
1
1
1
 
SHUTDOWN
 
0
0
0
 
STOP
0
0
1
 
TIDYING
0
1
0
 
TREMINATED
0
1
1
 
知道了這些數據的保存方式,把他們取出來,只須要一些簡單的位運算就能夠了。
狀態的大小關係 RUNNING < SHUTDOWN < STOP < TIDYING < TREMINATED,
runStateOf(clt.get()) < SHUTDOWN RUNNING狀態
runStateOf(clt.get()) >= SHUTDOWN 非RUNNING狀態
這個大小關係要記住,這樣理解代碼會更快。
 
建立新線程
ThreadPool把線程封裝成Worker對對象,添加worker就是添加線程,addWorker方法作的事情就是添加線程。
private boolean addWorker(Runnable firstTask, boolean core) {
    /* 這段代碼的做用是確保知足一下條件的任意一個時才建立新線程
   *1.  處於RUNNING 狀態, 能夠接受新任務,能夠繼續執行隊列中的任務
   *2. 處於SHUTDOWN狀態,  隊列不是空,且當前沒有提交新任務
   */
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&   // 非RUNNINGG狀態
            ! (rs == SHUTDOWN &&  
               firstTask == null &&  // 當前提交的新任務
               ! workQueue.isEmpty())) //  隊列不是空
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) // 若是當前調用建立的是core線程,  確保當前線程數 <corePoolSize, 不然確保當前線程數< maximumPoolSize
                return false;
            if (compareAndIncrementWorkerCount(c))  // 原子操做,增長線程數
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 執行到這裏表示已經經過檢查能夠建立新線程,而且線程數已經加1
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {  // 再次檢查,確保當前仍然知足容許建立線程的條件
                    if (t.isAlive()) //  確保Thread尚未調用start()
                        throw new IllegalThreadStateException();
                    workers.add(w); // 把worker線程放進HashSet中
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();  // 啓動新線程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
 
線程的主循環
Worker實現了Runnable接口
private final class Worker extends AbstractQueuedSynchronizer  implements Runnable
構造方法
Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
}
建立線程時把Worker實例自己當作線程的Runnable產生,因此當線程啓動後,將會調用Worker的run方法。
public void run() {
        runWorker(this);
}
線程的主循環就在runWorker方法中實現
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;  // 若是firstTask!=null, 先執行firstTask
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) { // 若是沒有firstTask,  從隊列中取出一個task, 若是沒有取到,退出線程
            w.lock();
        // 若是處於狀態>=STOP(前面已經講過狀態直接的大小關係), 確保線程處於interrupted狀態
       // 不然清除線程的interrupted狀態
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 執行任務前調用的方法,默認什麼都沒幹,用戶能夠根據須要覆蓋它
                Throwable thrown = null;
                try {
                    task.run(); // 執行任務
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 任務完成以後調用的方法, 默認什麼都沒幹,用戶能夠根據須要覆蓋它
                }
            } finally {
                task = null; // 把當前task置空,這樣才能調用getTask從隊列裏取出任務
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
       // 正常退出線程 completedAbruptly是true, 異常致使的線程退出爲false
        processWorkerExit(w, completedAbruptly);
    }
}
 
從隊列中獲得排隊的任務
在runWorker主循環中,除了第一次的任務從worker的firsTask(在它不是null的狀況下)取以外, 後面每次都是調用getTask從隊列中取出一個任務。
下面是getTask的代碼分析
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
 
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c); // 獲得當前狀態
 
        //  若是當前狀態 > SHUTDOWN 退出線程
    //  若是當前狀態 == SHUTDOWN 且 隊列爲空,退出線程
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();  //減小當前線程數
            return null;
        }
 
        int wc = workerCountOf(c); // 獲得當前的線程數
 
        // 線程是否容許超時的條件: 設置容許coreThread超時,或者當前線程數 > corePoolSize
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
    // 線程退出須要同時知足如下兩個條件條件: 
    // 1. 當前線程數>maximumPooSize 或 容許超時同時檢查到已經超時
    // 2. 當前線程數>1 或 隊列爲空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c)) // 減小當前線程數, 這個方法確保多線程環境下不會過多地結束線程。
                return null;
            continue;
        }
 
        try {
           // 取出一個任務。若是容許超時,調用poll,不然調用take
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true; // 已經超時,運行到這裏代表poll超時返回
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
getTask的功能除了取出一個任務之外,它還負責在條件知足的狀況下正常地結束一個線程
 
線程結束
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) //  若是線程是因爲異常緣由結束的,這裏要糾正線程數
        decrementWorkerCount();
 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);  // 把線程從HashSet中刪除
    } finally {
        mainLock.unlock();
    }
 
    tryTerminate(); // 嘗試終止整個ThreadPool
 
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {  // 若是當前狀態<STOP
        if (!completedAbruptly) { // 若是不是異常結束
             // 計算最小線程數min
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty()) 
                min = 1;
            if (workerCountOf(c) >= min)  // 若是當前線程數>=min直接返回
                return; // replacement not needed
        }
        // 建立新線程, 條件:
        // 當前線程正常結束
        // 當前線程異常結束,但當前線程數小於最小線程數
        addWorker(null, false);
    }
}
 
上面的代碼實現了線程的生命週期的管理,線程只有在ThreadPoolExecutor的狀態處於RUNNGIN或SHUTDOWN時才能夠存在。下面是這兩種狀態下線程的生存狀態:
RUNNING:
    容許coreThread超時: 線程空閒(意味着隊列爲空)時間超過 keepAliveTime, 線程會被結束, 直到線程數爲0。
    不容許coreThread超時:  線程空閒時間超過 keepAliveTime, 線程會被結束,直到線程數爲corePoolSize。
SHUDOWN:
      當線程把已經在隊列裏的全部任務執行完畢後,全部線程都會進入退出流程,最終退出。
 
整個ThreadPoolExecutor的狀態變遷
前面已經講過,ThreadPool的狀態和線程數被打包方進一個32整數中:
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
初始化把狀態設置成RUNNING, 線程爲0
調用shutdown時把狀態從RUNNING置爲SHUTDOWN,  隨後過渡到TIDYING->TREMINATED。
當調用shutdownNow時把狀態從(RUNNING 或 SHUTDOWN) 設置爲STOP,  隨後過渡到TIDYING->TREMINATED。
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);    // 只有當前狀態<SHUTDOWN時才執行狀態設置的動做
        interruptIdleWorkers();  // 打斷全部空閒的的線程,讓這些線程有機會本身結束
        onShutdown(); //  回調方法,默認什麼都沒作,子類能夠覆蓋
    } finally {
        mainLock.unlock();
    }
    tryTerminate(); // 嘗試執行ThreadPool的結束操做
}
shutdownNow和shutdown的操做大體同樣,不一樣的是它把狀態設置成STOP,還會返回隊列中沒有來得及執行的任務list。
tryTerminate方法做用是嘗試結束整個ThreadPool, 它不必定會執行真正的結束動做。它在三個地方被調用, worker線程結束時,shudown中,shutdownNow中。
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
     //知足如下三個條件中的任何一個就當即返回
    //1. 處於RUNNGING狀態
    //2. 狀態>= TIDYING
    //3. 處於SHUTDOWN狀態,且隊列不是空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
    // 若是處於STOP狀態,且線程數不爲0,通知一個處於空閒的線程結束本身
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
    // 執行到這裏表示目前狀態>=SHUTDOWN,線程數已是0
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 總有一個線程會運行到這裏,把狀態置爲 TIDYING
                try {
                    terminated(); // 調用回調方面,默認什麼都沒幹,子類能夠覆蓋
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));  // 把狀態置爲TREMINATED, 自此整個ThreadPool纔算終結
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
tryTerminate之因此要在三個地方調用,是爲了保證當調用shutdown或shutdownNow以後,總有一個線程會完成最後的終結工做。
 
參數設置
分析完前面代碼後,再來使用它,它的參數怎麼設置天然就瞭然於心。
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
public void allowCoreThreadTimeOut(boolean value)
public void setCorePoolSize(int corePoolSize) 
public void setKeepAliveTime(long time, TimeUnit unit)
public void setMaximumPoolSize(int maximumPoolSize)
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) 
public void setThreadFactory(ThreadFactory threadFactory)
相關文章
相關標籤/搜索