多線程學習筆記八之線程池ThreadPoolExecutor實現分析

簡介

  在Web開發中,若是要密集處理多個任務時,相對於每次都一個建立線程去執行任務,新建線程來執行任務相對來講是個更好的選擇,體如今如下三點:源碼分析

  1. 下降資源消耗。 經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
  2. 提升響應速度。 當任務到達時,任務能夠不須要等到線程建立就能當即執行。
  3. 提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。

下面從最經常使用的線程池ThreadPoolExecutor的源碼分析如何實現線程池。ui

繼承結構

  Executor是最基礎的執行接口,只提供了一個execute(Runnable command)提交任務方法;ExecutorService接口繼承了Executor,在其上作了一些shutdown()、submit()的擴展,能夠說是真正的線程池接口AbstractExecutorService抽象類實現了ExecutorService接口中的大部分方法;TheadPoolExecutor繼承了AbstractExecutorService,是線程池的具體實現。
  this

實現分析

ThreadPoolExecutor類屬性

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 線程池的控制狀態(用來表示線程池的運行狀態(整形的高3位)和運行的worker數量(低29位))
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 偏移量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大工做線程數量(2^29 - 1)
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 線程運行狀態,總共有5個狀態,須要3位來表示(因此偏移量的29 = 32 - 3)
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 阻塞隊列,存放提交給線程池的任務
    private final BlockingQueue<Runnable> workQueue;
    // 可重入鎖
    private final ReentrantLock mainLock = new ReentrantLock();
    // 存放工做線程集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // 終止條件
    private final Condition termination = mainLock.newCondition();
    // 最大線程池容量
    private int largestPoolSize;
    // 已完成任務數量
    private long completedTaskCount;
    // 線程工廠
    private volatile ThreadFactory threadFactory;
    // 拒絕執行處理器
    private volatile RejectedExecutionHandler handler;
    // 線程等待運行時間
    private volatile long keepAliveTime;
    // 是否運行核心線程超時
    private volatile boolean allowCoreThreadTimeOut;
    // 核心池的大小
    private volatile int corePoolSize;
    // 最大線程池大小
    private volatile int maximumPoolSize;
    // 默認拒絕執行處理器
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
}

線程池狀態

  線程池自己有兩個很重要的狀態信息:線程池的運行狀態和工做線程數,這兩個狀態信息都包含在變量ctl(int型,32位)中:ctl的高3位表示線程狀態runState,低29位表示工做線程worker的數量workCount。線程狀態信息以下:線程

  1. RUNNING:-1<<COUNT_BITS,即高3位爲1,低29位爲0,該狀態的線程池會接收新任務,會處理在阻塞隊列中等待處理的任務
  2. SHUTDOWN:0<<COUNT_BITS,即高3位爲000,低29位爲0,該狀態的線程池不會再接收新任務,但還會處理已經提交到阻塞隊列中等待處理的任務
  3. STOP:1<<COUNT_BITS,即高3位爲001,低29位爲0,該狀態的線程池不會再接收新任務,不會處理在阻塞隊列中等待的任務,並且還會中斷正在運行的任務
  4. TIDYING:2<<COUNT_BITS,即高3位爲010,低29位爲0,全部任務都被終止了,workerCount爲0,爲此狀態時還將調用terminated()方法
  5. TERMINATED:3<<COUNT_BITS,即高3位爲011,低29位爲0,terminated()方法調用完成後變成此狀態

構造方法

  核心參數含義以下:code

  • corePoolSize:核心線程數量
  • maximumPoolSize:最大線程數量,可能大於corePoolSize,也可能等於
  • workQueue: 必須是BlockingQueue阻塞隊列。當線程池中的線程數超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。經過workQueue,線程池實現了阻塞功能
  • keepAliveTime:線程池維護線程所容許的空閒時間。當線程池中的線程數量大於corePoolSize的時候,若是這時沒有新的任務提交,核心線程外的線程不會當即銷燬,而是會等待,直到等待的時間超過了keepAliveTime。
  • threadFactory:它是ThreadFactory類型的變量,用來建立新線程。默認使用Executors.defaultThreadFactory() 來建立線程。使用默認的ThreadFactory來建立線程時,會使新建立的線程具備相同的NORM_PRIORITY優先級而且是非守護線程,同時也設置了線程的名稱。
  • handler:它是RejectedExecutionHandler類型的變量,表示線程池的飽和策略。若是阻塞隊列滿了而且沒有空閒的線程,這時若是繼續提交任務,就須要採起一種策略處理該任務。
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

execute(Runnable command)

  execute方法是向線程池提交任務的,此時線程池的狀態爲RUNNING(其餘狀態不接收新提交的任務),主要判斷:對象

  1. 若是運行的線程少於 corePoolSize,則建立新的工做線程來處理任務,即便線程池中的其餘線程是空閒的;
  2. 若是線程池中的線程數量大於等於 corePoolSize,且阻塞隊列未滿,將任務加入阻塞隊列workQueue;
  3. 若是線程池中的線程數量大於等於 corePoolSize 且小於 maximumPoolSize,則只有當workQueue滿時才建立新的線程去處理任務;
  4. 若是運行的線程數量大於等於maximumPoolSize,這時若是workQueue已經滿了,則經過handler所指定的策略來拒絕任務提交;
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //ctl記錄線程池狀態信息和線程池線程數
        int c = ctl.get();
        //比較當前線程數是否小於corePoolSize,若是小於則新建一個線程放入線程池中
        if (workerCountOf(c) < corePoolSize) {
            //成功加入則返回
            if (addWorker(command, true))
                return;
            //加入失敗,從新獲取ctl
            c = ctl.get();
        }
        //若是當前線程數大於等於corePoolSize,判斷線程池是否仍在運行,是的話加入阻塞隊列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次檢查線程池是否仍在運行
            if (! isRunning(recheck) && remove(command))
                reject(command);
            /** 線程池在運行可是工做線程數爲0,此時可能阻塞隊列有任務但線程池沒有工做線程池,
             * 若是配置了參數allowCoreThreadTimeOut(默認是false)爲true可能由於核心線程執行
             * 完任務且阻塞隊列也沒有線程等待獲取任務,此時屬於空閒線程,因爲超時會回收核心線程
            **/
            else if (workerCountOf(recheck) == 0)
            /** 傳false將會在addWorker方法中判斷線程池的工做線程數量和最大線程數量作比較
             * 傳一個空的任務,開啓一個工做線程,但這個工做線程會發現當前的任務是空,而後會去隊列中取任務
             * 這樣就避免了線程池的狀態是running,並且隊列中還有任務,但線程池卻不執行隊列中的任務
            **/
                addWorker(null, false);
        }
            /**
             * 若是執行到這裏,有兩種狀況:
             * 1. 線程池已經不是RUNNING狀態;
             * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize而且workQueue已滿。
             * 這時,再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲 
             * maximumPoolSize;若是失敗則拒絕該任務
             **/
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker(Runnable firstTask, boolean core)

  addWorker方法用與建立工做線程,firstTask表示第一個任務,core爲true那麼線程數受corePoolSize制約,爲false則受maximumPoolSize制約。執行流程:blog

  • 檢查線程池狀態決定是否新建工做線程
  • 新建Worker對象並加入到集合中
  • 啓動工做線程
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            //運行狀態
            int rs = runStateOf(c);

            /**
             * 若是rs >= SHUTDOWN,則表示此時再也不接收新任務
             * 知足rs >= SHUTDOWN條件後接着判斷如下3個條件,只要有1個不知足,則返回false:
             * 1. rs == SHUTDOWN,這時表示關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保
             * 存的任務 2. firsTask爲空 3. 阻塞隊列不爲空
            **/
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //當前工做線程數
                int wc = workerCountOf(c);
                //檢查線程數量是否超出
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                 // 嘗試CAS增長workerCount,若是成功,則跳出第一個for循環
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //CAS失敗,從新獲取ctl的值
                c = ctl.get();  // Re-read ctl
                 // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        //CAS增長workCount成功,退出循環進入到這裏
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 根據firstTask來建立Worker對象
            w = new Worker(firstTask);
            // 每個Worker對象都會建立一個線程
            final Thread t = w.thread;
            if (t != null) {
                //上鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // rs < SHUTDOWN表示是RUNNING狀態;
                    // 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。
                    // 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        //將工做線程work加入到HashSet對象workers
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //啓動線程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

內部類Worker

  線程池的工做線程是經過包裝成Worker對象,Worker類自己既實現了Runnable接口,又繼承了同步器AQS,實現了一個簡易的不可重入的互斥鎖,經過同步狀態state控制中斷:繼承

  • 初始AQS狀態爲-1,此時不容許中斷interrupt(),只有在worker線程啓動了,執行了runWoker(),將state置爲0,才能中斷,不容許中斷體如今:
  1. shutdown()線程池時,會對每一個worker tryLock()上鎖,而Worker類這個AQS的tryAcquire()方法是固定將state從0->1,故初始狀態state==-1時tryLock()失敗,沒法interrupt()
  2. shutdownNow()線程池時,不用tryLock()上鎖,但調用worker.interruptIfStarted()終止worker,interruptIfStarted()也有state>0才能interrupt的邏輯
  • 爲了防止某種狀況下,在運行中的worker被中斷,runWorker()每次運行任務時都會lock()上鎖,而shutdown()這類可能會終止worker的操做須要先獲取worker的鎖,這樣就防止了中斷正在運行的線程
private final class Worker extends AbstractQueuedSynchronizerimplements Runnable{       
        private static final long serialVersionUID = 6138294804551838833L;
        //工做線程       
        final Thread thread;
        //新建Worker傳入的任務command,可能爲null
        Runnable firstTask;
        //執行完的任務數量
        volatile long completedTasks;

        //同步狀態state爲0表明爲鎖定,state爲1表明鎖定,state爲-1表明初始狀態
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //建立線程
            this.thread = getThreadFactory().newThread(this);
        }
        
        public void run() {
            runWorker(this);
        }        

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    }

runWorker(Worker w)

  runWork是工做線程執行任務的方法,執行過程以下:接口

  • 經過while循環步斷獲取任務
  • 檢查線程池運行狀態,若是處於STOP及以上,中斷線程;若是是RUNNING或SHUTDOWN,不中斷工做線程
  • task.run()執行任務
  • 當取得的任務task爲null退出循環,執行processWorkerExit方法,此時Work的工做線程run()方法執行完畢,線程銷燬
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        //同步狀態state設置爲0,容許中斷
        w.unlock(); // allow interrupts
        //用於標識是否工做線程因爲異常忽然終止,在執行任務拋出異常或線程被中斷兩種狀況爲true
        boolean completedAbruptly = true;
        try {
            //循環取任務執行
            while (task != null || (task = getTask()) != null) {
                //上鎖,表示正在工做線程正在執行任務,不能響應中斷
                w.lock();
                /**
                 * 確保在線程池狀態在STOP及以上時,纔會被設置中斷標示,不然清除中斷標示,判斷如下兩個條件:
                 * 一、若是線程池狀態>=stop,且當前線程沒有設置中斷狀態,wt.interrupt()
                 * 二、若是一開始判斷線程池狀態<stop,但Thread.interrupted()爲true,即線程已經被中斷,又
                 * 清除了中斷標示,再次判斷線程池狀態是否>=stop(可能調用了shutdownNow關閉線程池)
                 **/
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //執行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask()

  當工做線程數達到corePoolSize,後續提交的任務就會放到阻塞隊列workQueue中,工做線程經過getTask方法從阻塞隊列取出任務,執行如下步驟:

  • 檢查線程池狀態及阻塞隊列是否爲空
  • 控制核心線程數(使工做線程數不超過corePoolSize)
  • 從阻塞隊列取任務
private Runnable getTask() {
        // timeOut變量的值表示上次從阻塞隊列中取任務時是否超時
        boolean timedOut = false; 
        
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            /**
             * 1.rs>SHUTDOWN 因此rs至少等於STOP,這時再也不處理隊列中的任務,無論workQueue是否爲空都返回null
             * 2.rs = SHUTDOWN 因此rs>=STOP確定不成立,這時還須要處理隊列中的任務除非workQueue爲空
             * 若是以上條件知足,則將workerCount減1並返回null。由於若是當前線程池狀態的值是SHUTDOWN
             * 或以上時,不容許再向阻塞隊列中添加任務。
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            /**
             * timed表示工做線程是否須要剔除,爲true
             * allowCoreThreadTimeOut默認爲false,表示核心線程不作超時控制
             * wc > corePoolSize 超過核心線程數
             * timed爲true下面的if條件經過返回null,從而剔除掉超過corePoolSize數目的線程,使線程數
             * 回覆corePoolSize            
             **/            
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            /** 
             * 條件1:
             * wc > maximumPoolSize 檢查是否超出maximumPoolSize,線程池可能重置了maximumPoolSize
             * timed && timedOut 當前線程須要超時控制且上次取任務超時爲true
             * 條件2:若是線程數量大於1,或者阻塞隊列是空的
             * 兩個條件都爲true把workCount減一,返回null
             **/
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                //CAS失敗從新循環
                continue;
            }

            try {
               /**
                * 根據timed來判斷,若是爲true,則經過阻塞隊列的poll方法進行超時控制,若是在
                * keepAliveTime時間內沒有獲取到任務,則返回null;
                * 不然經過take方法,若是這時隊列爲空,則take方法會阻塞直到隊列不爲空。
                **/
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //若是r==null,說明是超時了
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

processWorkerExit(Worker w, boolean completedAbruptly)

  當getTask返回null,會跳出runWork的while循環,此時工做線程的run方法執行完畢,線程會終止,同時會執行processWorkerExit方法,步驟以下:

  • 根據completedAbruptly參數調整線程池的工做線程數
  • 統計完成的任務數並從集合中移出Worker對象
  • 根據線程池狀態進行判斷是否結束線程池
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //若是是忽然終止,從新調整workCount
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //統計完成的任務數
            completedTaskCount += w.completedTasks;
            //從集合中移出Worker對象
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 根據線程池狀態進行判斷是否結束線程池
        tryTerminate();

        int c = ctl.get();
        //線程狀態小於STOP,即線程池處於RUNNING或SHUTDOWN狀態
        if (runStateLessThan(c, STOP)) {
            //檢查是否異常終止
            if (!completedAbruptly) {
                //若是allowCoreThreadTimeOut=true,而且等待隊列有任務,至少保留一個worker;
                //若是allowCoreThreadTimeOut=false,workerCount很多於corePoolSize。
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //忽然終止,添加一個Worker
            addWorker(null, false);
        }
    }

shutdown()

  關閉線程池,線程池狀態由RUNNING變爲SHUTDOWN,只處理已有任務再也不接收新提交的任務,中斷空閒線程。
爲何要中斷空閒線程:當線程池狀態爲RUNNING可是阻塞隊列爲空,allowCoreThreadTimeOut爲默認值false(既不支持核心線程超時回收),那麼工做線程必然堵塞在workQueue.take()方法上,而調用了shutdown()方法後線程池狀態變爲SHUTDOWN不接收新提交的任務,那麼阻塞隊列永遠爲空,因此須要經過中斷讓線程由阻塞狀態返回null。

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //檢查是否有關閉線程池權限
            checkShutdownAccess();
            //把線程池運行狀態切換爲SHUTDOWN       
            advanceRunState(SHUTDOWN);
            //中斷空閒線程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

interruptIdleWorkers()

  中斷空閒線程。

private void interruptIdleWorkers() {
        //false代表中斷全部空閒線程
        interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // t.isInterrupted()檢查線程是否已經中斷過
                // w.tryLock() runWork在執行任務會上鎖,執行完解鎖去阻塞隊列得到任務,若是tryLock成功
                //說明沒有執行任務,是空閒線程。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

tryTerminate()

  根據線程池狀態嘗試關閉線程池。這裏解釋一下interruptIdleWorkers(ONLY_ONE):
當到達workerCountOf(c) != 0這個判斷時,說明線程池處於SHUTDOWN狀態,且阻塞隊列已經爲空,這是若判斷成立,那麼還有工做線程等待在線程池上,會中斷一個空閒線程,這個被中斷的空閒線程的Worker返回null又會調用tryTerminate,從而把線程池關閉的消息傳給每一個線程,回收空閒線程。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /*
             * 當前線程池的狀態爲如下幾種狀況時,直接返回:
             * 1. RUNNING,由於還在運行中,不能中止;
             * 2. TIDYING或TERMINATED,由於線程池中已經沒有正在運行的線程了;
             * 3. SHUTDOWN而且等待隊列非空,這時要執行完workQueue中的task;
             */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //工做線程數不爲0
            if (workerCountOf(c) != 0) { // Eligible to terminate
                //中斷一個空閒線程(等待在阻塞隊列上獲取任務的線程)
                //中斷的線程在回收Worker時還會調用tryTerminate方法,從而回收空閒線程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            //到這裏說明工做線程數workCount爲0,線程池狀態置爲TIDYING
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdownNow()

  關閉線程池,運行狀態修改成 STOP, 中斷全部線程; 並返回未處理的任務

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //更改線程池狀態
            advanceRunState(STOP);
            // 中斷全部工做線程,不管是否空閒
            interruptWorkers();
            //取出阻塞隊列中沒有被執行的任務
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

interruptWorkers()

  不論線程是否空閒,中斷全部線程。

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

    void interruptIfStarted() {
        Thread t;
        /**
         * getState() >= 0 同步狀態state=-1線程還沒啓動,大於等於0說明線程以及啓動,處於
         * 執行任務或空閒狀態。 
         * (t = thread) != null 線程不爲null 
         * !t.isInterrupted() 檢查線程是否被中斷過。
         **/
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }

總結

  本文分析了線程池ThreadPoolExecutor的實現,主要從向線程池提交任務和關閉線程池這兩個方法分析的,瞭解了線程池複用線程資源減小線程建立和切換的開銷背後的祕密。

相關文章
相關標籤/搜索