ThreadPoolExecutor源碼淺析

初始化

ThreadPoolExecutor重載了多個構造方法,不過最終都是調用的同一個:git

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

其中涉及了7個參數:github

  • corePoolSize:線程池維護的線程數,及時線程空閒也不關閉,除非設置了allowCoreThreadTimeOut(默認未設置)
  • maximumPoolSize:最大線程數,當須要的線程數超過corePoolSize時就會新建線程,但線程總數不會超過maximumPoolSize
  • keepAliveTime:超出corePoolSize的線程,在用完後空閒時間超過keepAliveTime的時間後就會終止(terminating)
  • TimeUnit unit:keepAliveTime的時間單位
  • BlockingQueue<Runnable> workQueue:當任務沒法當即被執行時,會被存儲在隊列中。不一樣類型的隊列會致使線程池不一樣的特性,這裏不深刻討論(有興趣能夠查看: 隊列爲 直接提交隊列SynchronousQueue,無界隊列LinkedBlockingQueue,有界隊列ArrayBlockingQueue時不一樣的特性,參考
  • ThreadFactory threadFactory:建立線程的工廠, 如常見的指定線程名字的工廠方法:new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build();
  • RejectedExecutionHandler handler:拒絕策略,當線程數達到maximumPoolSize,且workQueue已經沒法存儲更多任務時,採用拒絕策略。

ThreadPoolExecutor爲咱們提供了4種拒絕策略:shell

  • AbortPolicy默認策略,拋出異常RejectedExecutionException,告訴調用方已經來不及處理了,調用方須要處理異常和線程線程池來不及執行的任務
  • DiscardPolicy,靜默的忽略掉,無一致性要求的能夠這麼幹
  • DiscardOldestPolicy,從隊列裏拋棄掉最老的任務,無一致性要求的能夠這麼幹
  • CallerRunsPolicy,當任務添加到線程池中被拒絕時,會在線程池當前正在運行的Thread線程中處理被拒絕的任務。能夠必定程度緩解當前線程不夠的狀況,可是若是當前任務執行所需時間不定,有卡住主線程的風險

再看看CallerRunsPolicy的實現:安全

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

可見是經過執行r.run()來佔用主線程執行的。less

全部的拒絕策略都是繼承RejectedExecutionHandler,因此咱們也能夠自定義拒絕策略。oop

ctl變量

ctl變量是ThreadPoolExecutor的一個屬性,ctl能夠理解爲control的簡寫,源碼中定義以下:源碼分析

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

源碼中ctl變量的註釋中解釋了該變量的含義,該變量包含了兩個含義,線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount)。 ctl用高3位來表示線程池的運行狀態, 用低29位來表示線程池內有效線程的數量。在源碼中,rs一般表示線程池運行狀態 , wc一般表示線程池中有效線程數量, 另外, ctl 也一般會簡寫做 c。ui

再看與ctl相關的幾個變量和方法:this

private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
  • COUNT_BITS,表示用於標記線程數量的位數,32-3=29位
  • CAPACITY, 表示線程池最大能夠容納的線程數量,2^30-1
  • RUNNING,表示運行狀態,-1 << COUNT_BITS,前三位的值爲111,後29位爲0
  • SHUTDOWN,表示不接受新的任務,可是能夠處理阻塞隊列裏的任務。0<< COUNT_BITS,前三位的值爲000,後29位爲0。調用shutdown()方法會置爲該狀態。
  • STOP,該狀態不接受新的任務,不處理阻塞隊列裏的任務,中斷正在處理的任務。1<< COUNT_BITS,前三位的值爲001,後29位爲0。調用shutdownNow()方法會置爲該狀態
  • TIDYING,表示過渡狀態,2<< COUNT_BITS,前三位的值爲010,後29位爲0。此時表示全部的任務都執行完了,當前線程池已經沒有有效的線程,而且將要調用terminated方法
  • TERMINATED,表示終止狀態,3<< COUNT_BITS,前三位的值爲011,後29位爲0
  • runStateOf(int c) ,獲取線程池狀態,這裏c爲ctl變量,CAPACITY取反結果是前三位爲1,後29位爲0,與ctl與操做便可獲得狀態
  • workerCountOf(int c), 與runStateOf(int c) 相反取後29位,即線程數量
  • ctlOf(int rs, int wc),基於狀態和線程數量構造一個ctl變量

對於狀態能夠簡單理解爲:RUNNING爲-1,SHUTDOWN爲0,STOP爲1,TIDYING爲2,TERMINATED爲3。RUNNING變爲SHUTDOWN或者STOP後,再變爲TIDYING,再變爲TERMINATED。

添加任務

ThreadPoolExecutor繼承於AbstractExecutorService:

public class ThreadPoolExecutor extends AbstractExecutorService

AbstractExecutorService提供了最經常使用的三個添加任務到線程成的方法:

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

能夠看到最終它們都是調用了execute方法,ThreadPoolExecutor中execute的實現以下:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

源碼中的這段註釋詳細的介紹了這段代碼的做用,該方法考慮三種狀況:

  1. 若是當前存活thread的數量小於corePoolSize,則嘗試開啓一個新的線程。若是建立成功則返回;若是建立失敗,則繼續後續步驟;
  2. 若是步驟1中建立失敗或者thread數量>=corePoolSize,那會進入該步驟。該步驟判斷線程池處於運行狀態,則嘗試將新任務加入隊列。

    1. 若是線程池處於運行狀態,且加入隊列成功,則再次判斷線程池是否處於運行狀態(防止在執行workQueue.offer(command)的時候線程池狀態改變)。若是線程池狀態改變則remove剛剛入隊的任務,並執行拒絕操做。若是在運行態,可是線程數爲0,則添加一個worker。
    2. 若是線程池不處於運行狀態加入隊列失敗則進入下一步驟
  3. 若是線程池不處於運行狀態或者處於運行狀態,可是thread數量>=corePoolSize且workQueue已滿,則會進入該步驟。該步驟會嘗試建立一個新的線程來執行任務。若是線程池線程總數達到maximumPoolSize 或者 建立線程時線程池狀態變化再也不處於運行狀態,則會建立失敗。

在上面的代碼中主要是經過addWorker方法添加新任務的,下面咱們就來分析下這個方法的實現

addWorker方法

源碼以下:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //rs >= SHUTDOWN,狀態不爲RUNNING
            //而且
            //rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()
            //一下幾種狀況
            //1. 狀態不爲RUNNING和SHUTDOWN,
            //2. 或者 狀態爲SHUTDOWN且task不爲null,
            //3. 或者 狀態爲SHUTDOWN, task爲null, workQueue 爲空,
            //則返回false,添加失敗
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //判斷是否超過線程數量的限制,
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //未超過限制則嘗試把線程數加1,成功跳出retry循環
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //線程數加1失敗則說明ctl有變化(狀態或數量), 從新獲取
                c = ctl.get();  // Re-read ctl
                //若是是狀態變化則循環外層,反之循環內層
                if (runStateOf(c) != rs) 
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            //從Worker構造方法能夠看到 
            //this.firstTask = firstTask;
            //this.thread = getThreadFactory().newThread(this);
            //故此firstTask爲null的時候, w.thread不爲null
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();  //成功添加worker後,啓動線程
                    workerStarted = true;
                }
            }  //end of if (t != null) 
        } finally {
            //worker啓動失敗則移除worker, 數量減一
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

execute方法中在三個地方用不用的參數調用了addWorker方法:

  1. addWorker(command, true)
  2. addWorker(null, false)
  3. addWorker(command, false)

addWorker有兩個參數:Runnable firstTaskboolean core,前者表示要執行的任務,後者表示線程數量限制的類型(基於corePoolSize仍是maximumPoolSize)。1和3 是相似的,惟一的不一樣就是線程數的限制不一樣,因此這裏主要分析firstTask爲null 和 不爲null 的區別。

方法中retry: for (;;) {...}的內容主要是用於判斷是否線程池已經關閉,以及線程數量是否超過限制。若未關閉,未超過限制則把線程數加1。firstTask爲null的時候, w.thread不爲null,因此firstTask是否在addWorker中仍是沒有區別,那隻能更進一步看看worker裏對firstTask是如何處理的。

worker實現

線程池中的任務都是經過worker來代理的。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{
        
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        
        //後續代碼此處省略...........................
}

Worker繼承與AQS,實現Runable接口,自己是線程類,且具備AQS的特性。

看worker構造方法:

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //Worker實現了Runnable因此,
            //因此this.thread.start(),就是用線程執行worker的run方法
            this.thread = getThreadFactory().newThread(this);
        }

setState(-1)爲AQS的方法,把狀態位設置成-1,這樣任何線程都不能獲得Worker的鎖,除非調用了unlock方法。這個unlock方法會在runWorker方法中一開始就調用,這是爲了確保Worker構造出來以後,沒有任何線程可以獲得它的鎖,除非調用了runWorker以後,其餘線程才能得到Worker的鎖。

再看其run方法:

/** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

runWorker(this)不是worker的方法,是ThreadPoolExecutor的方法,也是執行任務的方法。

執行任務

又回到了ThreadPoolExecutor中,runWorker實現以下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts,建立worker時狀態設置爲-1了,此時設置爲1
        boolean completedAbruptly = true; //task是否意外終止,意外終止爲true,反之false
        try {
            //優先運行初始化時的firstTask, 若是firstTask已經執行了則從隊列取
            while (task != null || (task = getTask()) != null) {
                w.lock();  //獲取到task後鎖定,獨佔worker,保證線程安全
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task); //空方法,用於子類擴展
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//空方法,用於子類擴展
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //移除執行完成的worker
            processWorkerExit(w, completedAbruptly);
        }
    }

到此咱們終於能回答前面的問題了,addWorker(Runnable firstTask, boolean core) 中firstTask爲null不不爲null的區別:

  • 爲null,addWorker(null, core) 表示建立一個worker,執行隊列中的task
  • 不爲null,addWorker(firstTask, core) 表示建立一個worker,先執行firstTask,再執行隊列中的task

他們都新增了一個線程,一個是直接執行隊列裏的任務,一個先執行當前任務,再執行隊列任務。

下面繼續分析runWorker。

線程池在runWorker方法中,經過while (task != null || (task = getTask()) != null)不斷從隊列中取出任務執行,等待隊列中任務執行完成後,調用processWorkerExit(w, completedAbruptly),移除當前worker。問題來了,這麼看起來線程池中的線程只有在隊列不爲空的時候才得以複用,這不科學啊,那問題在哪兒?反覆看代碼,惟一忽略的掉的地方就是getTask()了,看到這個方法的時候,想固然的認爲是簡單的獲取隊列中的任務,那麼咱們來看一下它的具體實現:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary. 線程池是否已經關閉
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //表示worker是否須要回收
            //allowCoreThreadTimeOut=true時core線程超時也回收, 默認爲false
            //因此默認狀況下timed表示 wc > corePoolSize
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    //線程須要回收;嘗試取隊列中的任務,超過keepAliveTime還未取到返回null
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    //線程無需回收;取隊列中的任務, 隊列中沒有任務則一直等到有任務
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

上面代碼能夠看出getTask()確實是取任務,不過也兼任了 線程池在運行態取不到數據時 park線程等待線程直到超時(parkNanos) 的工做,咱們查看線程無需回收時park在取隊列任務的線程堆棧以下:

"pool-1-thread-1@731" prio=5 tid=0xd nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
      at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
      at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)

線程處於waiting狀態,從堆棧中能夠看到at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403),正是被workQueue.take() park住了。如此一來worker執行完當前線程以後,若是取不到新的任務就會一直處在park狀態,直到隊列中有新的任務進入。以ArrayBlockingQueue爲例看,看其takeenqueue實現:

/** Condition for waiting takes */
 private final Condition notEmpty;

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();   //park 線程
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();  //喚起線程
    }

關閉鏈接池

ThreadPoolExecutor提供了兩個關閉的方法:

  • shutdown(),關閉線程池,再也不接受新的任務,可是會處理完當前線程和隊列中的線程
  • shutdownNow() ,關閉線程池,再也不接受新的任務,且試圖中止全部正在執行的線程,並再也不處理還在池隊列中等待的任務。可是它試圖終止線程的方法是經過調用Thread.interrupt()方法來實現的,可是interrupt的做用有限,運行中的線程不必定能成功退出(具體緣由參考)。

下面看下實現:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);  //狀態設置爲SHUTDOWN
            interruptIdleWorkers();  //中斷空閒線程
            onShutdown(); // hook for ScheduledThreadPoolExecutor,這裏爲空方法
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);  //狀態設置爲STOP
            interruptWorkers();  //中斷所有線程
            tasks = drainQueue();  //返回隊列中未執行的任務
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

能夠看到shutdown和shutdownNow的實現大體相同,不一樣的地方有兩個,

  • 前者關閉時將狀態設置爲SHUTDOWN,後者爲STOP
  • 前者interruptIdleWorkers(),只中斷空閒線程;後者interruptWorkers(),中斷所有 線程,返回隊列中未執行的任務

設置狀態的源碼:

private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

interruptIdleWorkers():

private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //若是線程未被中斷,且獲取work的鎖成功(說明空閒),則中斷線程
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

interruptWorkers():

//ThreadPoolExecutor
private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //中斷所有worker線程
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

//worker
void interruptIfStarted() {
            Thread t;
            //若worker已經啓動(未啓動時爲-1),且thread不爲null,且未被中斷
            //也就是說線程還存活着,那就發送中斷信號
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

tryTerminate()除了在關閉鏈接池時調用,還在其它地方調用了,這裏只分析在關閉鏈接池時它都作了什麼:

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //關閉鏈接池調用該方法第一次調用時:
            //狀態爲SHUTDOWN或STOP,都小於TIDYING,故前兩條件都不知足
            //第三個條件,隊列不爲空的時候直接返回了,
            //若是爲shutdown()則可能隊列不爲空,可能知足條件直接返回,也可能不知足
            //若是爲shutdownNow()則隊列被清空,不知足
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //若是worker數量不爲0則執行interruptIdleWorkers(true)
            //而後直接返回,完成該方法
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //嘗試設置狀態爲TIDYING,worker數量爲0,
                //期間ctl若未變更,則成功
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated(); //空方法用於子類擴展
                    } finally {
                        //設置狀態爲TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //喚醒調用了awaitTermination(long timeout, TimeUnit unit)的線程
                        //awaitTermination中調用了
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

tryTerminate()在關閉鏈接池時的作的判斷能夠簡單理解爲

    • 若是隊列不爲空直接返回
    • 存活worker數量不爲0則直接返回
    • 設置狀態爲TIDYING,TERMINATED

因此不管是shutdown仍是shutdownNow都不會阻塞線程,且不保證worker已經所有關閉。

參考

Java線程池ThreadPoolExecutor源碼分析
csdn-Java 線程池 ThreadPoolExecutor 源碼分析
詳細分析Java中斷機制
談談 Java 線程狀態相關的幾個方法

相關文章
相關標籤/搜索