JAVA線程池原理與源碼分析


一、線程池經常使用接口介紹

1.一、Executor

public interface Executor {
void execute(Runnable command);
}

執行提交的Runnable任務。其中的execute方法在未來的某個時候執行給定的任務,該任務能夠在新線程、池化線程或調用線程中執行,具體由Executor的實現者決定。java

1.二、ExecutorService

ExecutorService繼承自Executor,下面挑幾個方法介紹:併發

1.2.一、shutdown()
void shutdown();

啓動有序關閉線程池,在此過程當中執行先前提交的任務,但不接受任何新任務。若是線程池已經關閉,調用此方法不會產生額外的效果。此方法不等待之前提交的任務完成執行,可使用awaitTermination去實現。異步

1.2.二、shutdownNow()
List<Runnable> shutdownNow();

嘗試中止全部正在積極執行的任務, 中止處理等待的任務,並返回等待執行的任務列表。 此方法不等待之前提交的任務完成執行,可使用awaitTermination去實現。除了盡最大努力中止處理積極執行的任務外,沒有任何保證。例如,典型的實現是:經過Thread#interrupt取消任務執行,可是任何未能響應中斷的任務均可能永遠不會終止。函數

1.2.三、isShutdown()
boolean isShutdown();

返回線程池關閉狀態。oop

1.2.四、isTerminated()
boolean isTerminated();

若是關閉後全部任務都已完成,則返回 true。注意,除非首先調用了shutdown或shutdownNow,不然isTerminated永遠不會返回true。測試

1.2.五、awaitTermination(long timeout, TimeUnit unit)
boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

線程阻塞阻塞,直到全部任務都在shutdown請求以後執行完畢,或者超時發生,或者當前線程被中斷(以先發生的狀況爲準)。this

1.2.六、submit
<T> Future<T> submit(Callable<T> task);

提交一個value-returning任務以執行,並返回一個表示該任務未決結果的Future。 Future的 get方法將在成功完成任務後返回任務的結果。spa

1.三、ScheduledExecutorService

安排命令在給定的延遲以後運行,或者按期執行,繼承自ExecutorService接口由如下四個方法組成:線程

//在給定延遲以後啓動任務,返回ScheduledFuture
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//建立並執行一個週期性操做,該操做在給定的初始延遲以後首次啓動,而後在給定的週期內執行;
//若是任務的任何執行遇到異常,則禁止後續執行。不然,任務只會經過執行器的取消或終止而終止。
//若是此任務的任何執行時間超過其週期,則後續執行可能會延遲開始,但不會併發執行。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//建立並執行一個週期性操做,該操做在給定的初始延遲以後首次啓動,而後在一次執行的終止和下一次執行的開始之間使用給定的延遲。
//若是任務的任何執行遇到異常,則禁止後續執行。不然,任務只會經過執行器的取消或終止而終止。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

1.四、ThreadFactory

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

按需建立新線程的對象。設計

1.五、Callable

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

返回任務結果也可能拋出異常。

1.六、Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;

Future表示異步計算的結果。方法用於檢查計算是否完成,等待計算完成並檢索計算結果。只有當計算完成時,纔可使用方法get檢索結果,若是須要,能夠阻塞,直到準備好爲止。取消由cancel方法執行。還提供了其餘方法來肯定任務是否正常完成或被取消。一旦計算完成,就不能取消計算。

1.七、Delayed

public interface Delayed extends Comparable<Delayed> {
    //在給定的時間單位中返回與此對象關聯的剩餘延遲
    long getDelay(TimeUnit unit);
}

一種混合風格的接口,用於標記在給定延遲以後應該執行的對象。

1.八、ScheduledFuture

public interface ScheduledFuture<V> extends Delayed, Future<V> {}

二、線程池工做流程

線程池的主要工做流程.jpg

新任務進來時:

  1. 若是當前運行的線程少於corePoolSize,則建立新線程(核心線程)來執行任務。
  2. 若是運行的線程等於或多於corePoolSize ,則將任務加入BlockingQueue。
  3. 若是BlockingQueue隊列已滿,則建立新的線程(非核心)來處理任務。
  4. 若是核心線程與非核心線程總數超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler拒絕策略。

三、ThreadPoolExecutor介紹

構造方法:

public ThreadPoolExecutor(
int corePoolSize,int maximumPoolSize,
long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

參數說明:

  • corePoolSize

除非設置了 allowCoreThreadTimeOut,不然要保留在線程池中的線程數(即便它們是空閒的)。

  • maximumPoolSize

線程池中容許的最大線程數。

  • keepAliveTime

當線程數大於corePoolSize時,這是多餘的空閒線程在終止新任務以前等待新任務的最長時間。

  • unit

keepAliveTime參數的時間單位。

  • workQueue

用於在任務執行前保存任務的隊列。這個隊列只包含execute方法提交的Runnable任務。

  • threadFactory

執行程序建立新線程時使用的工廠。

  • handler

因爲達到線程邊界和隊列容量而阻塞執行時使用的處理程序。

3.一、BlockingQueue

  • SynchronousQueue
    不存儲元素的阻塞隊列,一個插入操做,必須等待移除操做結束,每一個任務一個線程。使用的時候maximumPoolSize通常指定成Integer.MAX_VALUE。
  • LinkedBlockingQueue
    若是當前線程數大於等於核心線程數,則進入隊列等待。因爲這個隊列沒有最大值限制,即全部超過核心線程數的任務都將被添加到隊列中。
  • ArrayBlockingQueue
    能夠限定隊列的長度,接收到任務的時候,若是沒有達到corePoolSize的值,則新建線程(核心線程)執行任務,若是達到了,則入隊等候,若是隊列已滿,則新建線程(非核心線程)執行任務,又若是總線程數到了maximumPoolSize,而且隊列也滿了,則執行拒絕策略。
  • DelayQueue
    隊列內元素必須實現Delayed接口,這就意味着你傳進去的任務必須先實現Delayed接口。這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,纔會執行任務。
  • priorityBlockingQuene
    具備優先級的無界阻塞隊列。

3.二、RejectedExecutionHandler

有4個ThreeadPoolExecutor內部類。

  • AbortPolicy

直接拋出異常,默認策略。

  • CallerRunsPolicy

用調用者所在的線程來執行任務。

  • DiscardOldestPolicy

丟棄阻塞隊列中靠最前的任務,並執行當前任務。
四、DiscardPolicy
直接丟棄任務。

最好自定義飽和策略,實現RejectedExecutionHandler接口,如:記錄日誌或持久化存儲不能處理的任務。

3.三、線程池大小設置

  • CPU密集型

儘可能使用較小的線程池,減小CUP上下文切換,通常設置爲CPU核心數+1。

  • IO密集型

能夠適當加大線程池數量,IO多,因此在等待IO的時候,充分利用CPU,通常設置爲CPU核心數2倍。
可是對於一些特別耗時的IO操做,盲目的用線程池可能也不是很好,經過異步+單線程輪詢,上層再配合上一個固定的線程池,效果可能更好,參考Reactor模型。

  • 混合型

視具體狀況而定。

3.四、任務提交

  • Callable

經過submit函數提交,返回Future對象。

  • Runnable

經過execute提交,沒有返回結果。

3.五、關閉線程池

  • shutdown()

僅中止阻塞隊列中等待的線程,那些正在執行的線程就會讓他們執行結束。

  • shutdownNow()

不只會中止阻塞隊列中的線程,並且會中止正在執行的線程。

四、線程池實現原理

4.一、 線程池狀態

線程池的內部狀態由AtomicInteger修飾的ctl表示,其高3位表示線程池的運行狀態,低29位表示線程池中的線程數量。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

主池控制狀態ctl是一個原子整數,包含兩個概念字段:

  • workerCount:指示有效線程數。
  • runState:指示是否運行、關閉等。

爲了將這兩個字段打包成一個整型,因此將workerCount限制爲(2^29)-1個線程,而不是(2^31)-1個線程。

workerCount是工做線程數量。該值可能與實際活動線程的數量存在暫時性差別,例如,當ThreadFactory在被請求時沒法建立線程,以及退出的線程在終止前仍在執行bookkeeping時。 用戶可見的池大小報告爲工做線程集的當前大小。

runState提供了生命週期,具備如下值:

  • RUNNING:接受新任務並處理排隊的任務
  • SHUTDOWN:不接受新任務,而是處理隊列的任務。
  • STOP:不接受新任務,不處理隊列的任務,中斷正在進行的任務。
  • TIDYING:全部任務都已終止,workerCount爲零,過渡到狀態TIDYING的線程將運行terminated()鉤子方法。
  • TERMINATED:terminated()方法執行完畢。

爲了容許有序比較,這些值之間的數值順序很重要。運行狀態會隨着時間單調地增長,但不須要達到每一個狀態。轉換:
線程池內部狀態轉換圖.png

  • RUNNING -> SHUTDOWN

在調用shutdown()時,能夠隱式地在finalize()中調用。

  • (RUNNING or SHUTDOWN) -> STOP

調用shutdownNow()。

  • SHUTDOWN -> TIDYING

當隊列和池都爲空時。

  • STOP -> TIDYING

當池是空的時候。

  • TIDYING -> TERMINATED

當terminated()鉤子方法完成時。

當狀態達到TERMINATED時,在awaitTermination()中等待的線程將返回。

下面看如下其餘狀態信息:

//Integer.SIZE爲32,COUNT_BITS爲29
private static final int COUNT_BITS = Integer.SIZE - 3;
//2^29-1 最大線程數
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 即高3位爲111,該狀態的線程池會接收新任務,並處理阻塞隊列中的任務;
* 111 0 0000 0000 0000 0000 0000 0000 0000
* -1 原碼:0000 ... 0001 反碼:1111 ... 1110 補碼:1111 ... 1111
* 左移操做:後面補 0
* 111 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 即高3位爲000,該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務;
* 000 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 即高3位爲001,該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,並且會中斷正在* 運行的任務;
* 001 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 即高3位爲010,全部任務都已終止,workerCount爲零,過渡到狀態TIDYING的線程將運行terminated()鉤子方法;
* 010 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 即高3位爲011,terminated()方法執行完畢;
* 011 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
//根據ctl計算runState
private static int runStateOf(int c) {
//2^29   =  001 0 0000 0000 0000 0000 0000 0000 0000
//2^29-1 =  000 1 1111 1111 1111 1111 1111 1111 1111
//~(2^29-1)=111 0 0000 0000 0000 0000 0000 0000 0000
//假設c爲 STOP 001 0 0000 0000 0000 0000 0000 0000 0000
// 最終值:    001 0 0000 0000 0000 0000 0000 0000 0000
    return c & ~CAPACITY;
}
//根據ctl計算 workerCount
private static int workerCountOf(int c) {
//2^29-1 =  000 1 1111 1111 1111 1111 1111 1111 1111
//假設c =   000 0 0000 0000 0000 0000 0000 0000 0001  1個線程
//最終值:  000 0 0000 0000 0000 0000 0000 0000 0001  1
    return c & CAPACITY;
}
// 根據runState和workerCount計算ctl
private static int ctlOf(int rs, int wc) {
//假設 rs: STOP  001 0 0000 0000 0000 0000 0000 0000 0000
//假設 wc:       000 0 0000 0000 0000 0000 0000 0000 0001  1個線程
//最終值:       001 0 0000 0000 0000 0000 0000 0000 0001
    return rs | wc;
}
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
//RUNNING狀態爲負數,確定小於SHUTDOWN,返回線程池是否爲運行狀態
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
//試圖增長ctl的workerCount字段值。
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
//嘗試減小ctl的workerCount字段值。
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
//遞減ctl的workerCount字段。這隻在線程忽然終止時調用(請參閱processWorkerExit)。在getTask中執行其餘遞減。
private void decrementWorkerCount() {
    do {
    } while (!compareAndDecrementWorkerCount(ctl.get()));
}

Doug Lea大神的設計啊,感受計算機的基礎真的是數學。

4.二、 內部類Worker

Worker繼承了AbstractQueuedSynchronizer,而且實現了Runnable接口。
維護瞭如下三個變量,其中completedTasks由volatile修飾。

//線程這個工做程序正在運行。若是工廠失敗,則爲空。
final Thread thread;
//要運行的初始任務。多是null。
Runnable firstTask;
//線程任務計數器
volatile long completedTasks;

構造方法:

//使用ThreadFactory中給定的第一個任務和線程建立。
Worker(Runnable firstTask) {
    //禁止中斷,直到運行工做程序
    setState(-1); 
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

既然實現了Runnable接口,必然實現run方法:

//Delegates main run loop to outer runWorker
public void run() {
    //核心
    runWorker(this);
}

4.三、runWorker(Worker w)執行任務

先看一眼執行流程圖,再看源碼,會更清晰一點:
runWorker.png

首先來看runWorker(Worker w)源碼:

final void runWorker(Worker w) {
    //獲取當前線程
    Thread wt = Thread.currentThread();
    //獲取第一個任務
    Runnable task = w.firstTask;
    //第一個任務位置置空
    w.firstTask = null;
    //由於Worker實現了AQS,此處是釋放鎖,new Worker()是state==-1,此處是調用Worker類的 release(1)方法,將state置爲0。Worker中interruptIfStarted()中只有state>=0才容許調用中斷
    w.unlock();
    //是否忽然完成,若是是因爲異常致使的進入finally,那麼completedAbruptly==true就是忽然完成的
    boolean completedAbruptly = true;
    try {
        //先處理firstTask,以後依次處理其餘任務
        while (task != null || (task = getTask()) != null) {
            //獲取鎖
            w.lock();
            //若是池中止,確保線程被中斷;若是沒有,請確保線程沒有中斷。這須要在第二種狀況下從新檢查,以處理清除中斷時的shutdownNow競爭
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                //自定義實現
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //執行任務
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    //自定義實現
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //任務完成數+1
                w.completedTasks++;
                //釋放鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //Worker的結束後的處理工做
        processWorkerExit(w, completedAbruptly);
    }
}

下面再來看上述源碼中的getTask()與processWorkerExit(w, completedAbruptly)方法:

4.3.一、getTask()

根據當前配置設置執行阻塞或定時等待任務,或者若是該worker由於任何緣由必須退出,則返回null,在這種狀況下workerCount將遞減。

返回空的狀況:

  1. 大於 maximumPoolSize 個 workers(因爲調用setMaximumPoolSize)
  2. 線程池關閉
  3. 線程池關閉了而且隊列爲空
  4. 這個worker超時等待任務,超時的worker在超時等待以前和以後均可能終止(即allowCoreThreadTimeOut || workerCount > corePoolSize),若是隊列不是空的,那麼這個worker不是池中的最後一個線程。
private Runnable getTask() {
    // Did the last poll() time out?
    boolean timedOut = false;
    for (; ; ) {
        //獲取線程池狀態
        int c = ctl.get();
        int rs = runStateOf(c);
        //僅在必要時檢查隊列是否爲空。
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //遞減ctl的workerCount字段
            decrementWorkerCount();
            return null;
        }
        //獲取workerCount數量
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //線程超時控制
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            //嘗試減小ctl的workerCount字段
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //若是有超時控制,則使用帶超時時間的poll,不然使用take,沒有任務的時候一直阻塞,這兩個方法都會拋出InterruptedException
            Runnable r = timed ?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :workQueue.take();
            //有任務就返回
            if (r != null)
                return r;
            //獲取任務超時,確定是走了poll邏輯
            timedOut = true;
        } catch (InterruptedException retry) {
            //被中斷
            timedOut = false;
        }
    }
}
4.3.一、processWorkerExit(Worker w, boolean completedAbruptly)

爲垂死的worker進行清理和bookkeeping。僅從工做線程調用。除非completedAbruptly被設置,不然假定workerCount已經被調整以考慮退出。此方法從工做集中移除線程,若是線程池因爲用戶任務異常而退出,或者運行的工做池小於corePoolSize,或者隊列非空但沒有工做池, 則可能終止線程池或替換工做池。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // If abrupt, then workerCount wasn't adjusted
    // true:用戶線程運行異常,須要扣減
    // false:getTask方法中扣減線程數量
    if (completedAbruptly)
        //遞減ctl的workerCount字段。
        decrementWorkerCount();
    //獲取主鎖,鎖定
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //更新完成任務計數器
        completedTaskCount += w.completedTasks;
        //移除worker
        workers.remove(w);
    } finally {
        //解鎖
        mainLock.unlock();
    }
    // 有worker線程移除,多是最後一個線程退出須要嘗試終止線程池
    tryTerminate();
    int c = ctl.get();
    // 若是線程爲running或shutdown狀態,即tryTerminate()沒有成功終止線程池,則判斷是否有必要一個worker
    if (runStateLessThan(c, STOP)) {
        // 正常退出,計算min:須要維護的最小線程數量
        if (!completedAbruptly) {
            // allowCoreThreadTimeOut 默認false:是否須要維持核心線程的數量
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 若是min ==0 或者workerQueue爲空,min = 1
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            // 若是線程數量大於最少數量min,直接返回,不須要新增線程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 添加一個沒有firstTask的worker
        addWorker(null, false);
    }
}

4.四、任務提交

提交有兩種:

  • Executor#execute(Runnable command)

Executor接口提供的方法,在未來的某個時候執行給定的命令.該命令能夠在新線程、池化線程或調用線程中執行,具體由Executor的實現者決定。

  • ExecutorService#submit(Callable<T> task)

提交一個value-returning任務以執行,並返回一個表示該任務未決結果的Future。Future的get方法將在成功完成任務後返回任務的結果。

4.五、任務執行

4.5.一、 execute(Runnable command)

任務執行流程圖:

execute.png

三步處理:

  1. 若是運行的線程小於corePoolSize,則嘗試用給定的命令做爲第一個任務啓動一個新線程。對addWorker的調用原子性地檢查runState和workerCount,所以能夠經過返回false來防止錯誤警報,由於錯誤警報會在不該該添加線程的時候添加線程。
  2. 若是一個任務能夠成功排隊,那麼咱們仍然須要再次檢查是否應該添加一個線程 (由於自上次檢查以來已有的線程已經死亡),或者池在進入這個方法後關閉。所以,咱們從新檢查狀態,若是必要的話,若是中止,則回滾隊列;若是沒有,則啓動一個新線程。
  3. 若是沒法對任務排隊,則嘗試添加新線程。 若是它失敗了,咱們知道pool被關閉或飽和,因此拒絕任務。
public void execute(Runnable command) {
    //任務爲空,拋出異常
    if (command == null)
        throw new NullPointerException();
   //獲取線程控制字段的值
   int c = ctl.get();
   //若是當前工做線程數量少於corePoolSize(核心線程數)
   if (workerCountOf(c) < corePoolSize) {
       //建立新的線程並執行任務,若是成功就返回
       if (addWorker(command, true))
            return;
       //上一步失敗,從新獲取ctl
       c = ctl.get();
    }
    //若是線城池正在運行,且入隊成功
    if (isRunning(c) && workQueue.offer(command)) {
        //從新獲取ctl
        int recheck = ctl.get();
        //若是線程沒有運行且刪除任務成功
        if (!isRunning(recheck) && remove(command))
            //拒絕任務
            reject(command);
        //若是當前的工做線程數量爲0,只要還有活動的worker線程,就能夠消費workerQueue中的任務
        else if (workerCountOf(recheck) == 0)
            //第一個參數爲null,說明只爲新建一個worker線程,沒有指定firstTask
            addWorker(null, false);
    } else if (!addWorker(command, false))
    //若是線程池不是running狀態 或者 沒法入隊列,嘗試開啓新線程,擴容至maxPoolSize,若是addWork(command, false)失敗了,拒絕當前command
        reject(command);
}

下面詳細看一下上述代碼中出現的方法:addWorker(Runnable firstTask, boolean core)。

4.5.1.一、addWorker(Runnable firstTask, boolean core)

addWorker.jpg

檢查是否能夠根據當前池狀態和給定的界限(核心或最大值)添加新worker,若是是這樣,worker計數將相應地進行調整,若是可能,將建立並啓動一個新worker, 並將運行firstTask做爲其第一個任務。 若是池已中止或有資格關閉,則此方法返回false。若是線程工廠在被請求時沒有建立線程,則返回false。若是線程建立失敗,要麼是因爲線程工廠返回null,要麼是因爲異常 (一般是Thread.start()中的OutOfMemoryError)),咱們將回滾。

private boolean addWorker(Runnable firstTask, boolean core) {
    //很久沒見過這種寫法了
    retry:
    //線程池狀態與工做線程數量處理,worker數量+1
    for (; ; ) {
        //獲取當前線程池狀態與線程數
        int c = ctl.get();
        //獲取當前線程池狀態
        int rs = runStateOf(c);
        // 僅在必要時檢查隊列是否爲空。若是池子處於SHUTDOWN,STOP,TIDYING,TERMINATED的時候 不處理提交的任務,判斷線程池是否能夠添加worker線程
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;
        //線程池處於工做狀態
        for (; ; ) {
            //獲取工做線程數量
            int wc = workerCountOf(c);
            //若是線程數量超過最大值或者超過corePoolSize或者超過maximumPoolSize 拒絕執行任務
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //試圖增長ctl的workerCount字段
            if (compareAndIncrementWorkerCount(c))
                //中斷外層循環
                break retry;
            // Re-read ctl
            c = ctl.get();
            //若是當前線程池狀態已經改變
            if (runStateOf(c) != rs)
                //繼續外層循環
                continue retry;
            //不然CAS因workerCount更改而失敗;重試內循環
        }
    }
    //添加到worker線程集合,並啓動線程,工做線程狀態
    boolean workerStarted = false;
    boolean workerAdded = false;
    //繼承AQS並實現了Runnable接口
    Worker w = null;
    try {
        //將任務封裝
        w = new Worker(firstTask);
        //獲取當前線程
        final Thread t = w.thread;
        if (t != null) {
            //獲取全局鎖
            final ReentrantLock mainLock = this.mainLock;
            //全局鎖定
            mainLock.lock();
            try {
                //持鎖時從新檢查。退出ThreadFactory故障,或者在獲取鎖以前關閉。
                int rs = runStateOf(ctl.get());
                //若是當前線程池關閉了
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                   //測試該線程是否活動。若是線程已經啓動而且尚未死,那麼它就是活的。                                       
                   if (t.isAlive())
                        throw new IllegalThreadStateException();
                   //入工做線程池
                   workers.add(w);
                   int s = workers.size();
                   //跟蹤最大的池大小
                   if (s > largestPoolSize)
                        largestPoolSize = s;
                   //狀態
                   workerAdded = true;
                }
            } finally {
                //釋放鎖
                mainLock.unlock();
            }
            //若是工做線程加入成功,開始線程的執行,並設置狀態
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //判斷工做線程是否啓動成功
        if (!workerStarted)
            //回滾工做線程建立
            addWorkerFailed(w);
    }
    //返回工做線程狀態
    return workerStarted;
}

再分析回滾工做線程建立邏輯方法:addWorkerFailed(w)。
回滾工做線程建立,若是存在,則從worker中移除worker, 遞減ctl的workerCount字段。,從新檢查終止,以防這個worker的存在致使終止。

private void addWorkerFailed(Worker w) {
    //獲取全局鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //若是存在,則從worker中移除worker
        if (w != null)
            workers.remove(w);
        //遞減ctl的workerCount字段。
        decrementWorkerCount();
        //從新檢查終止
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

其中的tryTerminate()方法:
若是是SHUTDOWN或者STOP 且池子爲空,轉爲TERMINATED狀態。若是有條件終止,可是workerCount不爲零,則中斷空閒worker,以確保關機信號傳播。必須在任何可能使終止成爲可能的操做以後調用此方法--在關機期間減小worker數量或從隊列中刪除任務。該方法是非私有的,容許從ScheduledThreadPoolExecutor訪問。

final void tryTerminate() {
    for (; ; ) {
        int c = ctl.get();
        //若是線程池處於運行中,或者阻塞隊列中仍有任務,返回
        if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
           return;
        //還有工做線程
        if (workerCountOf(c) != 0) {
            //中斷空閒工做線程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //獲取全局鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //設置ctl狀態TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //方法在執行程序終止時調用,默認什麼都不執行
                    terminated();
                } finally {
                    //完成terminated()方法,狀態爲TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    //喚醒全部等待條件的節點
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

//方法在執行程序終止時調用,默認什麼都不執行
protected void terminated() {}

4.5.1.二、 reject(Runnable command)拒絕策略

爲給定的命令調用被拒絕的執行處理程序。

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

tencent.jpg

相關文章
相關標籤/搜索