Java----線程池ThreadPoolExecutor實現原理剖析

繼承結構

繼承結構看起來很清晰,最頂層的Executor只提供了一個最簡單的void execute(Runnable command)方法,而後是ExecutorService,ExecutorService提供了一些管理相關的方法,例如關閉、判斷當前線程池的狀態等,另外不一樣於Executor#execute,ExecutorService提供了一系列方法,能夠將任務包裝成一個Future,從而使得任務提交方能夠跟蹤任務的狀態。而父類AbstractExecutorService則提供了一些默認的實現。安全

構造器

ThreadPoolExecutor的構造器提供了很是多的參數,每個參數都很是的重要,一不當心就容易踩坑,所以設置的時候,你必需要知道本身在幹什麼。性能優化

public ThreadPoolExecutor(int corePoolSize,

                          int maximumPoolSize,

                          long keepAliveTime,

                          TimeUnit unit,

                          BlockingQueue<Runnable> workQueue,

                          ThreadFactory threadFactory,

                          RejectedExecutionHandler handler) {

    if (corePoolSize < 0 ||

        maximumPoolSize <= 0 ||

        maximumPoolSize < corePoolSize ||

        keepAliveTime < 0)

        throw new IllegalArgumentException();

    if (workQueue == null || threadFactory == null || handler == null)

        throw new NullPointerException();

    this.acc = System.getSecurityManager() == null ?

            null :

            AccessController.getContext();

    this.corePoolSize = corePoolSize;

    this.maximumPoolSize = maximumPoolSize;

    this.workQueue = workQueue;

    this.keepAliveTime = unit.toNanos(keepAliveTime);

    this.threadFactory = threadFactory;

    this.handler = handler;

}
  1. corePoolSize、 maximumPoolSize。線程池會自動根據corePoolSize和maximumPoolSize去調整當前線程池的大小。當你經過submit或者execute方法提交任務的時候,若是當前線程池的線程數小於corePoolSize,那麼線程池就會建立一個新的線程處理任務, 即便其餘的core線程是空閒的。若是當前線程數大於corePoolSize而且小於maximumPoolSize,那麼只有在隊列"滿"的時候纔會建立新的線程。所以這裏會有不少的坑,好比你的core和max線程數設置的不同,但願請求積壓在隊列的時候可以實時的擴容,但若是制定了一個無界隊列,那麼就不會擴容了,由於隊列不存在滿的概念。
  2. keepAliveTime。若是當前線程池中的線程數超過了corePoolSize,那麼若是在keepAliveTime時間內都沒有新的任務須要處理,那麼超過corePoolSize的這部分線程就會被銷燬。默認狀況下是不會回收core線程的,能夠經過設置allowCoreThreadTimeOut改變這一行爲。
  3. workQueue。即實際用於存儲任務的隊列,這個能夠說是最核心的一個參數了,直接決定了線程池的行爲,好比說傳入一個有界隊列,那麼隊列滿的時候,線程池就會根據core和max參數的設置狀況決定是否須要擴容,若是傳入了一個SynchronousQueue,這個隊列只有在另外一個線程在同步remove的時候才能夠put成功,對應到線程池中,簡單來講就是若是有線程池任務處理完了,調用poll或者take方法獲取新的任務的時候,新提交的任務纔會put成功,不然若是當前的線程都在忙着處理任務,那麼就會put失敗,也就會走擴容的邏輯,若是傳入了一個DelayedWorkQueue,顧名思義,任務就會根據過時時間來決定何時彈出,即爲ScheduledThreadPoolExecutor的機制。
  4. threadFactory。建立線程都是經過ThreadFactory來實現的,若是沒指定的話,默認會使用Executors.defaultThreadFactory(),通常來講,咱們會在這裏對線程設置名稱、異常處理器等。
  5. handler。即當任務提交失敗的時候,會調用這個處理器,ThreadPoolExecutor內置了多個實現,好比拋異常、直接拋棄等。這裏也須要根據業務場景進行設置,好比說當隊列積壓的時候,針對性的對線程池擴容或者發送告警等策略。

看完這幾個參數的含義,咱們看一下Executors提供的一些工具方法,只要是爲了方便使用,可是我建議最好少用這個類,而是直接用ThreadPoolExecutor的構造函數,多瞭解一下這幾個參數究竟是什麼意思,本身的業務場景是什麼樣的,好比線程池需不須要擴容、用不用回收空閒的線程等。多線程

public class Executors {



/*

* 提供一個固定大小的線程池,而且線程不會回收,因爲傳入的是一個無界隊列,至關於隊列永遠不會滿

* 也就不會擴容,所以須要特別注意任務積壓在隊列中致使內存爆掉的問題

*/

public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,

                                  0L, TimeUnit.MILLISECONDS,

                                  new LinkedBlockingQueue<Runnable>());

}





/*

*  這個線程池會一直擴容,因爲SynchronousQueue的特性,若是當前全部的線程都在處理任務,那麼

*  新的請求過來,就會致使建立一個新的線程處理任務。若是線程一分鐘沒有新任務處理,就會被回 

*  收掉。特別注意,若是每個任務都比較耗時,併發又比較高,那麼可能每次任務過來都會建立一個線 

*  程

*/

public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                  60L, TimeUnit.SECONDS,

                                  new SynchronousQueue<Runnable>());

}

}

源碼分析

既然是個線程池,那就必然有其生命週期:運行中、關閉、中止等。ThreadPoolExecutor是用一個AtomicInteger去的前三位表示這個狀態的,另外又重用了低29位用於表示線程數,能夠支持最大大概5億多,絕逼夠用了,若是之後硬件真的發展到可以啓動這麼多線程,改爲AtomicLong就能夠了。架構

狀態這裏主要分爲下面幾種:併發

  1. RUNNING:表示當前線程池正在運行中,能夠接受新任務以及處理隊列中的任務
  2. SHUTDOWN:再也不接受新的任務,但會繼續處理隊列中的任務
  3. STOP:再也不接受新的任務,也不處理隊列中的任務了,而且會中斷正在進行中的任務
  4. TIDYING:全部任務都已經處理完畢,線程數爲0,轉爲爲TIDYING狀態以後,會調用terminated()回調
  5. TERMINATED:terminated()已經執行完畢

同時咱們能夠看到全部的狀態都是用二進制位表示的,而且依次遞增,從而方便進行比較,好比想獲取當前狀態是否至少爲SHUTDOWN等,同時狀態以前有幾種轉換:框架

  1. RUNNING -> SHUTDOWN。調用了shutdown()以後,或者執行了finalize()
  2. (RUNNING 或者 SHUTDOWN) -> STOP。調用了shutdownNow()以後會轉換這個狀態
  3. SHUTDOWN -> TIDYING。當線程池和隊列都爲空的時候
  4. STOP -> TIDYING。當線程池爲空的時候
  5. IDYING -> TERMINATED。執行完terminated()回調以後會轉換爲這個狀態
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;



private static final int RUNNING    = -1 << COUNT_BITS;

private static final int SHUTDOWN   =  0 << COUNT_BITS;

private static final int STOP       =  1 << COUNT_BITS;

private static final int TIDYING    =  2 << COUNT_BITS;

private static final int TERMINATED =  3 << COUNT_BITS;



//因爲前三位表示狀態,所以將CAPACITY取反,和進行與操做便可

private static int runStateOf(int c)     { return c & ~CAPACITY; }

private static int workerCountOf(int c)  { return c & CAPACITY; }



//高三位+第三位進行或操做便可

private static int ctlOf(int rs, int wc) { return rs | wc; }



private static boolean runStateLessThan(int c, int s) {

    return c < s;

}



private static boolean runStateAtLeast(int c, int s) {

    return c >= s;

}



private static boolean isRunning(int c) {

    return c < SHUTDOWN;

}



//下面三個方法,經過CAS修改worker的數目

private boolean compareAndIncrementWorkerCount(int expect) {

    return ctl.compareAndSet(expect, expect + 1);

}



//只嘗試一次,失敗了則返回,是否重試由調用方決定

private boolean compareAndDecrementWorkerCount(int expect) {

    return ctl.compareAndSet(expect, expect - 1);

}



//跟上一個不同,會一直重試

private void decrementWorkerCount() {

    do {} while (! compareAndDecrementWorkerCount(ctl.get()));

}

下面是比較核心的字段,這裏workers採用的是非線程安全的HashSet,而不是線程安全的版本,主要是由於這裏有些複合的操做,好比說將worker添加到workers後,咱們還須要判斷是否須要更新largestPoolSize等,workers只在獲取到mainLock的狀況下才會進行讀寫,另外這裏的mainLock也用於在中斷線程的時候串行執行,不然若是不加鎖的話,可能會形成併發去中斷線程,引發沒必要要的中斷風暴。分佈式

private final ReentrantLock mainLock = new ReentrantLock();



private final HashSet<Worker> workers = new HashSet<Worker>();



private final Condition termination = mainLock.newCondition();



private int largestPoolSize;



private long completedTaskCount;

核心方法

拿到一個線程池以後,咱們就能夠開始提交任務,讓它去執行了,那麼咱們看一下submit方法是如何實現的。函數

public Future<?> submit(Runnable task) {

    if (task == null) throw new NullPointerException();

    RunnableFuture<Void> ftask = newTaskFor(task, null);

    execute(ftask);

    return ftask;

}



public <T> Future<T> submit(Callable<T> task) {

    if (task == null) throw new NullPointerException();

    RunnableFuture<T> ftask = newTaskFor(task);

    execute(ftask);

    return ftask;

}

這兩個方法都很簡單,首先將提交過來的任務(有兩種形式:Callable、Runnable)都包裝成統一的RunnableFuture,而後調用execute方法,execute能夠說是線程池最核心的一個方法。微服務

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

    int c = ctl.get();

    /*

        獲取當前worker的數目,若是小於corePoolSize那麼就擴容,

        這裏不會判斷是否已經有core線程,而是隻要小於corePoolSize就會直接增長worker

     */

    if (workerCountOf(c) < corePoolSize) {

        /*

            調用addWorker(Runnable firstTask, boolean core)方法擴容

            firstTask表示爲該worker啓動以後要執行的第一個任務,core表示要增長的爲core線程

         */

        if (addWorker(command, true))

            return;

        //若是增長失敗了那麼從新獲取ctl的快照,好比可能線程池在這期間關閉了

        c = ctl.get();

    }

    /*

         若是當前線程池正在運行中,而且將任務丟到隊列中成功了,

         那麼就會進行一次double check,看下在這期間線程池是否關閉了,

         若是關閉了,好比處於SHUTDOWN狀態,如上文所講的,SHUTDOWN狀態的時候,

         再也不接受新任務,remove成功後調用拒絕處理器。而若是仍然處於運行中的狀態,

         那麼這裏就double check下當前的worker數,若是爲0,有可能在上述邏輯的執行

         過程當中,有worker銷燬了,好比說任務拋出了未捕獲異常等,那麼就會進行一次擴容,

         但不一樣於擴容core線程,這裏因爲任務已經丟到隊列中去了,所以就不須要再傳遞firstTask了,

         同時要注意,這裏擴容的是非core線程

     */

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

    else if (!addWorker(command, false))

        /*

            若是在上一步中,將任務丟到隊列中失敗了,那麼就進行一次擴容,

            這裏會將任務傳遞到firstTask參數中,而且擴容的是非core線程,

            若是擴容失敗了,那麼就執行拒絕策略。

         */

        reject(command);

}

這裏要特別注意下防止隊列失敗的邏輯,不一樣的隊列丟任務的邏輯也不同,例如說無界隊列,那麼就永遠不會put失敗,也就是說擴容也永遠不會執行,若是是有界隊列,那麼當隊列滿的時候,會擴容非core線程,若是是SynchronousQueue,這個隊列比較特殊,當有另一個線程正在同步獲取任務的時候,你才能put成功,所以若是當前線程池中全部的worker都忙着處理任務的時候,那麼後續的每次新任務都會致使擴容,固然若是worker沒有任務處理了,阻塞在獲取任務這一步的時候,新任務的提交就會直接丟到隊列中去,而不會擴容。高併發

上文中屢次提到了擴容,那麼咱們下面看一下線程池具體是如何進行擴容的:

private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (;;) {

        int c = ctl.get();

        //獲取當前線程池的狀態

        int rs = runStateOf(c);



        /*

            若是狀態爲大於SHUTDOWN, 好比說STOP,STOP上文說過隊列中的任務不處理了,也不接受新任務,

            所以能夠直接返回false不擴容了,若是狀態爲SHUTDOWN而且firstTask爲null,同時隊列非空,

            那麼就能夠擴容

         */

        if (rs >= SHUTDOWN &&

            ! (rs == SHUTDOWN &&

                firstTask == null &&

                ! workQueue.isEmpty()))

            return false;



        for (;;) {

            int wc = workerCountOf(c);

            /*

                若worker的數目大於CAPACITY則直接返回,

                而後根據要擴容的是core線程仍是非core線程,進行判斷worker數目

                是否超過設置的值,超過則返回

             */

            if (wc >= CAPACITY ||

                wc >= (core ? corePoolSize : maximumPoolSize))

                return false;

            /*

                經過CAS的方式自增worker的數目,成功了則直接跳出循環

             */

            if (compareAndIncrementWorkerCount(c))

                break retry;

            //從新讀取狀態變量,若是狀態改變了,好比線程池關閉了,那麼就跳到最外層的for循環,

            //注意這裏跳出的是retry。

            c = ctl.get();  // Re-read ctl

            if (runStateOf(c) != rs)

                continue retry;

            // else CAS failed due to workerCount change; retry inner loop

        }

    }



    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try {

        //建立Worker

        w = new Worker(firstTask);

        final Thread t = w.thread;

        if (t != null) {

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                /*

                    獲取鎖,並判斷線程池是否已經關閉

                 */

                int rs = runStateOf(ctl.get());



                if (rs < SHUTDOWN ||

                    (rs == SHUTDOWN && firstTask == null)) {

                    if (t.isAlive()) // 若線程已經啓動了,好比說已經調用了start()方法,那麼就拋異常,

                        throw new IllegalThreadStateException();

                    //添加到workers中

                    workers.add(w);

                    int s = workers.size();

                    if (s > largestPoolSize) //更新largestPoolSize

                        largestPoolSize = s;

                    workerAdded = true;

                }

            } finally {

                mainLock.unlock();

            }

            if (workerAdded) {

                //若Worker建立成功,則啓動線程,這麼時候worker就會開始執行任務了

                t.start();

                workerStarted = true;

            }

        }

    } finally {

        if (! workerStarted)

            //添加失敗

            addWorkerFailed(w);

    }

    return workerStarted;

} 



private void addWorkerFailed(Worker w) {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        if (w != null)

            workers.remove(w);

        decrementWorkerCount();

        //每次減小worker或者從隊列中移除任務的時候都須要調用這個方法

        tryTerminate();

    } finally {

        mainLock.unlock();

    }

}

這裏有個貌似不太起眼的方法tryTerminate,這個方法會在全部可能致使線程池終結的地方調用,好比說減小worker的數目等,若是知足條件的話,那麼將線程池轉換爲TERMINATED狀態。另外這個方法沒有用private修飾,由於ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,而ScheduledThreadPoolExecutor也會調用這個方法。

final void tryTerminate() {

    for (;;) {

        int c = ctl.get();

        /*

            若是當前線程處於運行中、TIDYING、TERMINATED狀態則直接返回,運行中的沒

            什麼好說的,後面兩種狀態能夠說線程池已經正在終結了,另外若是處於SHUTDOWN狀態,

            而且workQueue非空,代表還有任務須要處理,也直接返回

         */

        if (isRunning(c) ||

            runStateAtLeast(c, TIDYING) ||

            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

            return;

        //能夠退出,可是線程數非0,那麼就中斷一個線程,從而使得關閉的信號可以傳遞下去,

        //中斷worker後,worker捕獲異常後,會嘗試退出,並在這裏繼續執行tryTerminate()方法,

        //從而使得信號傳遞下去

        if (workerCountOf(c) != 0) {

            interruptIdleWorkers(ONLY_ONE);

            return;

        }



        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            //嘗試轉換成TIDYING狀態,執行完terminated回調以後

            //會轉換爲TERMINATED狀態,這個時候線程池已經完整關閉了,

            //經過signalAll方法,喚醒全部阻塞在awaitTermination上的線程

            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {

                try {

                    terminated();

                } finally {

                    ctl.set(ctlOf(TERMINATED, 0));

                    termination.signalAll();

                }

                return;

            }

        } finally {

            mainLock.unlock();

        }

        // else retry on failed CAS

    }

}



/**

 * 中斷空閒的線程

 * @param onlyOne

 */

private void interruptIdleWorkers(boolean onlyOne) {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        for (Worker w : workers) {

            //遍歷全部worker,若以前沒有被中斷過,

            //而且獲取鎖成功,那麼就嘗試中斷。

            //鎖可以獲取成功,那麼代表當前worker沒有在執行任務,而是在

            //獲取任務,所以也就達到了只中斷空閒線程的目的。

            Thread t = w.thread;

            if (!t.isInterrupted() && w.tryLock()) {

                try {

                    t.interrupt();

                } catch (SecurityException ignore) {

                } finally {

                    w.unlock();

                }

            }

            if (onlyOne)

                break;

        }

    } finally {

        mainLock.unlock();

    }

}

Worker

下面看一下Worker類,也就是這個類實際負責執行任務,Worker類繼承自AbstractQueuedSynchronizer,AQS能夠理解爲一個同步框架,提供了一些通用的機制,利用模板方法模式,讓你可以原子的管理同步狀態、blocking和unblocking線程、以及隊列,具體的內容以後有時間會再寫,仍是比較複雜的。這裏Worker對AQS的使用相對比較簡單,使用了狀態變量state表示是否得到鎖,0表示解鎖、1表示已得到鎖,同時經過exclusiveOwnerThread存儲當前持有鎖的線程。另外再簡單提一下,好比說CountDownLatch, 也是基於AQS框架實現的,countdown方法遞減state,await阻塞等待state爲0。

private final class Worker

    extends AbstractQueuedSynchronizer

    implements Runnable

{



    /** Thread this worker is running in.  Null if factory fails. */

    final Thread thread;



    /** Initial task to run.  Possibly null. */

    Runnable firstTask;



    /** Per-thread task counter */

    volatile long completedTasks;



    Worker(Runnable firstTask) {

        setState(-1); // inhibit interrupts until runWorker

        this.firstTask = firstTask;

        this.thread = getThreadFactory().newThread(this);

    }



    /** Delegates main run loop to outer runWorker  */

    public void run() {

        runWorker(this);

    }

   protected boolean isHeldExclusively() {

        return getState() != 0;

    }



    protected boolean tryAcquire(int unused) {

        if (compareAndSetState(0, 1)) {

            setExclusiveOwnerThread(Thread.currentThread());

            return true;

        }

        return false;

    }



    protected boolean tryRelease(int unused) {

        setExclusiveOwnerThread(null);

        setState(0);

        return true;

    }



    public void lock()        { acquire(1); }

    public boolean tryLock()  { return tryAcquire(1); }

    public void unlock()      { release(1); }

    public boolean isLocked() { return isHeldExclusively(); }



    void interruptIfStarted() {

        Thread t;

        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

            try {

                t.interrupt();

            } catch (SecurityException ignore) {

            }

        }

    }

}

注意這裏Worker初始化的時候,會經過setState(-1)將state設置爲-1,並在runWorker()方法中置爲0,上文說過Worker是利用state這個變量來表示鎖的狀態,那麼加鎖的操做就是經過CAS將state從0改爲1,那麼初始化的時候改爲-1,也就是表示在Worker啓動以前,都不容許加鎖操做,咱們再看interruptIfStarted()以及interruptIdleWorkers()方法,這兩個方法在嘗試中斷Worker以前,都會先加鎖或者判斷state是否大於0,所以這裏的將state設置爲-1,就是爲了禁止中斷操做,並在runWorker中置爲0,也就是說只能在Worker啓動以後纔可以中斷Worker。

另外線程啓動以後,其實就是調用了runWorker方法,下面咱們看一下具體是如何實現的。

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); // 調用unlock()方法,將state置爲0,表示其餘操做能夠得到鎖或者中斷worker

    boolean completedAbruptly = true;

    try {

        /*

            首先嚐試執行firstTask,若沒有的話,則調用getTask()從隊列中獲取任務

         */

        while (task != null || (task = getTask()) != null) {

            w.lock();

            /*

                若是線程池正在關閉,那麼中斷線程。

             */

            if ((runStateAtLeast(ctl.get(), STOP) ||

                (Thread.interrupted() &&

                    runStateAtLeast(ctl.get(), STOP))) &&

                !wt.isInterrupted())

                wt.interrupt();

            try {

                //執行beforeExecute回調

                beforeExecute(wt, task);

                Throwable thrown = null;

                try {

                    //實際開始執行任務

                    task.run();

                } catch (RuntimeException x) {

                    thrown = x; throw x;

                } catch (Error x) {

                    thrown = x; throw x;

                } catch (Throwable x) {

                    thrown = x; throw new Error(x);

                } finally {

                    //執行afterExecute回調

                    afterExecute(task, thrown);

                }

            } finally {

                task = null;

                //這裏加了鎖,所以沒有線程安全的問題,volatile修飾保證其餘線程的可見性

                w.completedTasks++;

                w.unlock();//解鎖

            }

        }

        completedAbruptly = false;

    } finally {

        //拋異常了,或者當前隊列中已沒有任務須要處理等

        processWorkerExit(w, completedAbruptly);

    }

}



private void processWorkerExit(Worker w, boolean completedAbruptly) {

    //若是是異常終止的,那麼減小worker的數目

    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

        decrementWorkerCount();



    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        //將當前worker中workers中刪除掉,並累加當前worker已執行的任務到completedTaskCount中

        completedTaskCount += w.completedTasks;

        workers.remove(w);

    } finally {

        mainLock.unlock();

    }



    //上文說過,減小worker的操做都須要調用這個方法

    tryTerminate();



    /*

        若是當前線程池仍然是運行中的狀態,那麼就看一下是否須要新增另一個worker替換此worker

     */

    int c = ctl.get();

    if (runStateLessThan(c, STOP)) {

        /*

            若是是異常結束的則直接擴容,不然的話則爲正常退出,好比當前隊列中已經沒有任務須要處理,

            若是容許core線程超時的話,那麼看一下當前隊列是否爲空,空的話則不用擴容。不然話看一下

            是否少於corePoolSize個worker在運行。

         */

        if (!completedAbruptly) {

            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

            if (min == 0 && ! workQueue.isEmpty())

                min = 1;

            if (workerCountOf(c) >= min)

                return; // replacement not needed

        }

        addWorker(null, false);

    }

}



 private Runnable getTask() {

    boolean timedOut = false; // 上一次poll()是否超時了



    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);



        // 若線程池關閉了(狀態大於STOP)

        // 或者線程池處於SHUTDOWN狀態,可是隊列爲空,那麼返回null

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

            decrementWorkerCount();

            return null;

        }



        int wc = workerCountOf(c);



        /*

            若是容許core線程超時 或者 不容許core線程超時但當前worker的數目大於core線程數,

            那麼下面的poll()則超時調用

         */

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;



        /*

            獲取任務超時了而且(當前線程池中還有不止一個worker 或者 隊列中已經沒有任務了),那麼就嘗試

            減小worker的數目,若失敗了則重試

         */

        if ((wc > maximumPoolSize || (timed && timedOut))

            && (wc > 1 || workQueue.isEmpty())) {

            if (compareAndDecrementWorkerCount(c))

                return null;

            continue;

        }



        try {

            //從隊列中抓取任務

            Runnable r = timed ?

                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                workQueue.take();

            if (r != null)

                return r;

            //走到這裏代表,poll調用超時了

            timedOut = true;

        } catch (InterruptedException retry) {

            timedOut = false;

        }

    }

}

關閉線程池

關閉線程池通常有兩種形式,shutdown()和shutdownNow()。

public void shutdown() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        checkShutdownAccess();

        //經過CAS將狀態更改成SHUTDOWN,這個時候線程池不接受新任務,但會繼續處理隊列中的任務

        advanceRunState(SHUTDOWN);

        //中斷全部空閒的worker,也就是說除了正在處理任務的worker,其餘阻塞在getTask()上的worker

        //都會被中斷

        interruptIdleWorkers();

        //執行回調

        onShutdown(); // hook for ScheduledThreadPoolExecutor

    } finally {

        mainLock.unlock();

    }

    tryTerminate();

    //這個方法不會等待全部的任務處理完成才返回

}

public List<Runnable> shutdownNow() {

    List<Runnable> tasks;

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        checkShutdownAccess();

        /*

            不一樣於shutdown(),會轉換爲STOP狀態,再也不處理新任務,隊列中的任務也不處理,

            並且會中斷全部的worker,而不僅是空閒的worker

         */

        advanceRunState(STOP);

        interruptWorkers();

        tasks = drainQueue();//將全部的任務從隊列中彈出

    } finally {

        mainLock.unlock();

    }

    tryTerminate();

    return tasks;

}



private List<Runnable> drainQueue() {

    BlockingQueue<Runnable> q = workQueue;

    ArrayList<Runnable> taskList = new ArrayList<Runnable>();

    /*

        將隊列中全部的任務remove掉,並添加到taskList中,

        可是有些隊列比較特殊,好比說DelayQueue,若是第一個任務還沒到過時時間,則不會彈出,

        所以這裏經過調用toArray方法,而後再一個一個的remove掉

     */

    q.drainTo(taskList);

    if (!q.isEmpty()) {

        for (Runnable r : q.toArray(new Runnable[0])) {

            if (q.remove(r))

                taskList.add(r);

        }

    }

    return taskList;

}

從上文中能夠看到,調用了shutdown()方法後,不會等待全部的任務處理完畢才返回,所以須要調用awaitTermination()來實現。

在此我向你們推薦一個架構學習交流羣。交流學習羣號:821169538  裏面會分享一些資深架構師錄製的視頻錄像:有Spring,MyBatis,Netty源碼分析,高併發、高性能、分佈式、微服務架構的原理,JVM性能優化、分佈式架構等這些成爲架構師必備的知識體系。還能領取免費的學習資源,目前受益良多。

public boolean awaitTermination(long timeout, TimeUnit unit)

    throws InterruptedException {

    long nanos = unit.toNanos(timeout);

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        for (;;) {

            //線程池若已經終結了,那麼就返回

            if (runStateAtLeast(ctl.get(), TERMINATED))

                return true;

            //若超時了,也返回掉

            if (nanos <= 0)

                return false;

            //阻塞在信號量上,等待線程池終結,可是要注意這個方法可能會由於一些未知緣由隨時喚醒當前線程,

            //所以須要重試,在tryTerminate()方法中,執行完terminated()回調後,代表線程池已經終結了,

            //而後會經過termination.signalAll()喚醒當前線程

            nanos = termination.awaitNanos(nanos);

        }

    } finally {

        mainLock.unlock();

    }

}

一些統計相關的方法

public int getPoolSize() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        //若線程已終結則直接返回0,不然計算works中的數目

       //想一下爲何不用workerCount呢?

        return runStateAtLeast(ctl.get(), TIDYING) ? 0

            : workers.size();

    } finally {

        mainLock.unlock();

    }

}



public int getActiveCount() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        int n = 0;

        for (Worker w : workers)

            if (w.isLocked())//上鎖的代表worker當前正在處理任務,也就是活躍的worker

                ++n;

        return n;

    } finally {

        mainLock.unlock();

    }

}





public int getLargestPoolSize() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        return largestPoolSize;

    } finally {

        mainLock.unlock();

    }

}



//獲取任務的總數,這個方法慎用,如果個無解隊列,或者隊列擠壓比較嚴重,會很蛋疼

public long getTaskCount() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        long n = completedTaskCount;//好比有些worker被銷燬後,其處理完成的任務就會疊加到這裏

        for (Worker w : workers) {

            n += w.completedTasks;//疊加歷史處理完成的任務

            if (w.isLocked())//上鎖代表正在處理任務,也算一個

                ++n;

        }

        return n + workQueue.size();//獲取隊列中的數目

    } finally {

        mainLock.unlock();

    }

}





public long getCompletedTaskCount() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        long n = completedTaskCount;

        for (Worker w : workers)

            n += w.completedTasks;

        return n;

    } finally {

        mainLock.unlock();

    }

}
相關文章
相關標籤/搜索