經過了解RejectedExecutionException來分析ThreadPoolExecutor源碼

觀看本文章以前,最好看一下這篇文章熟悉下ThreadPoolExecutor基礎知識。java

1.關於Java多線程的一些常考知識點
2.看ThreadPoolExecutor源碼前的騷操做

講解本篇文章從下面一個例子開始,test1()和test2()方法都會拋出RejectedExecutionException異常,ThreadPoolExecutor默認的拒絕任務策略是AbortPolicy。test1()中線程池中corePoolSize和maximumPoolSize都爲2,阻塞隊列的長度是10,線程池最多能處理12個任務。當超過12個任務時,就會拒絕新的任務,拋出RejectedExecutionException。而test2()中的任務沒有超過線程池的閥值,可是在線程池調用shutdown()後,線程池的狀態會變成shutdown,此時不接收新任務,但會處理正在運行的任務和在阻塞隊列中等待處理的任務。因此咱們在shutdown()以後再調用submit(),會拋出RejectedExecutionException異常。有了這個例子的基礎,咱們再來分析源碼,會好過一點。安全

/**
 * @author cmazxiaoma
 * @version V1.0
 * @Description: 分析拋出RejectedExecutionException問題
 * @date 2018/8/16 14:35
 */
public class RejectedExecutionExceptionTest {

    public static void main(String[] args) {
//        test1();
        test2();
    }

    /**
     * 提交的任務數量超過其自己最大能處理的任務量
     */
    public static void test1() {
        CustomThreadPoolExecutor customThreadPoolExecutor =
                new CustomThreadPoolExecutor(2, 2,
                        0L,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<Runnable>(10));

        for (int i = 0; i < 13; i++) {
            CustomThreadPoolExecutor.CustomTask customTask
                    = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(60 * 60);
                        System.out.println("線程" + Thread.currentThread().getName()
                                + "正在執行...");
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }, "success");

            if (i == 12) {
                // throw RejectedExectionException
                customThreadPoolExecutor.submit(customTask);
            } else {
                customThreadPoolExecutor.submit(customTask);
            }
        }
        customThreadPoolExecutor.shutdown();
    }

    /**
     * 當線程池shutdown()後,會中斷空閒線程。可是正在運行的線程和處於阻塞隊列等待執行的線程不會中斷。
     * shutdown(),不會接收新的線程。
     */
    public static void test2() {
        CustomThreadPoolExecutor customThreadPoolExecutor =
                new CustomThreadPoolExecutor(2, 2,
                        0L,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<Runnable>(10));

        for (int i = 0; i < 2; i++) {
            CustomThreadPoolExecutor.CustomTask customTask
                    = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(60 * 60);
                        System.out.println("線程" + Thread.currentThread().getName()
                                + "正在執行...");
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }, "success");
            customThreadPoolExecutor.submit(customTask);
        }
        customThreadPoolExecutor.shutdown();

        CustomThreadPoolExecutor.CustomTask customTask
                = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(60 * 60);
                    System.out.println("線程" + Thread.currentThread().getName()
                            + "正在執行...");
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }, "success");

        customThreadPoolExecutor.submit(customTask);
    }

}
複製代碼

源碼分析

線程池執行過程

關於線程池執行過程,咱們看下面一幅圖,就能明白個大概。 1.當線程池中的線程數量小於corePoolSize,就會建立新的線程來處理添加的任務直至線程數量等於corePoolSize。bash

2.當線程池中的線程數量大於等於corePoolSize且阻塞隊列(workQueue)未滿,就會把新添加的任務放到阻塞隊列中。網絡

3.當線程池中的線程數量大於等於corePoolSize且阻塞隊列滿了,就會建立線程來處理添加的任務直到線程數量等於maximumPoolSize多線程

4.若是線程池的數量大於maximumPoolSize,會根據RejectedExecutionHandler策略來拒絕任務。AbortPolicy就是其中的一種拒絕任務策略。 併發

線程池執行過程(圖來自於網絡).png


submit

submit()相比於execute()而言,多了RunnableFuture<Void> ftask = newTaskFor(task, null);這一步,把task包裝成RunnableFuture類型的ftask。因此submit()有返回值,返回值類型是Future<?>,能夠經過get()獲取線程執行完畢後返回的值。還能夠經過isDone()isCancelled()cancel(boolean mayInterruptIfRunning)這些方法進行某些操做。好比判斷線程是否執行完畢、判斷線程是否被取消,顯式取消啓動的線程的操做。ide

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
複製代碼

execute

線程池去處理被提交的任務,很明顯經過execute()方法提交的任務必需要實現Runnable接口。oop

咱們來仔細看下execute()註釋,發現它說到:若是任務不能被成功提交獲得執行,由於線程池已經處於shutdown狀態或者是任務數量已經達到容器上限,任務會被RejectedExecutionHandler處理進行拒絕操做。很明顯,註釋已經告訴上文拋出RejectedExecutionException異常的答案了。有時候真的要仔細看註釋!!!多看註釋,事半功倍。源碼分析

咱們來看execute()中作了什麼操做。ui

1.獲取線程池的狀態,若是線程池中的線程數量小於corePoolSize,調用addWorker(command, true)建立新的線程去處理command任務。若是addWorker()返回失敗,咱們再次獲取線程池的狀態。由於addWorker()失敗的緣由可能有:線程池已經處於shutdown狀態不接收新的任務或者是存在併發,在workerCountOf(c) < corePoolSize這塊代碼後,有其餘的線程建立了worker線程,致使worker線程的數量大於等於corePoolSize

2.若是線程池的數量大於等於corePoolSize,且線程池的狀態處於RUNNING狀態,咱們將任務放到阻塞隊列中。當任務成功放入阻塞隊列中,咱們仍然須要一個雙重校驗的機制去判斷是否應該建立新的線程去處理任務。

由於會存在這些狀況:有些線程在咱們上次校驗後已經死掉、線程池在上次校驗後忽然關閉處於shutdown狀態。考慮到這些緣由,咱們必須再次校驗線程池的狀態。若是線程池的狀態不處於RUNNING狀態,那麼就行回滾操做,把剛纔入隊的任務移除掉,後續經過reject(command)執行拒絕任務策略。

若是線程池處於RUNNING狀態且線程池中線程數量等於0或者從阻塞隊列中刪除任務失敗(意味着:這個任務已經被其餘線程處理掉了)且線程池中線程數量等於0,那麼調用addWorker(null, false)新建一個worker線程,去消費workQueue中裏面的任務

3.若是線程池不處於RUNNING狀態或者任務沒法成功入隊(此時阻塞隊列已經滿了),此時須要建立新的線程擴容至maximumPoolSize。若是addWorker(command, false)返回false,那麼經過reject(command)執行拒絕任務策略。

這裏再嘮叨幾句,調用addWorker()有這4種傳參的方式,適用於不一樣場景。

1.addWorker(command, true)當線程池中的線程數量少於corePoolSize,會把command包裝成worker而且放入到workers集合中。若是線程池中的線程數量超過了corePoolSize,會返回false。

2.addWorker(command, false)當阻塞隊列滿了,一樣會把command包裝成worker而且放入到worker集合中。若是線程池中的線程數量超過了maximumPoolSize,會返回false。

3.addWorker(null, false)說明firstTask是個空任務,一樣把它包裝成worker而且放入到worker集合中。若是線程池中的數量超過了maximumPoolSize,會返回false。這樣firstTask爲空的worker在線程執行的時候,也能夠從阻塞隊列中獲取任務去處理。

4.addWorker(null, true):和上面同樣,只是線程池的線程數量限制在corePoolSize,超過也是返回false。使用它的有prestartAllCoreThreads()prestartCoreThread()這2個方法,其使用目的是預加載線程池中的核心線程。

/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } 複製代碼

addWorker

addWorker()主要是建立新的線程,而後執行任務。

1.首先判斷線程池的狀態是否知足建立worker線程的要求。

若是線程池的狀態大於SHUTDOWN狀態,那麼此時處於STOP、TIDYING、TERMINATE狀態,不能建立worker線程,返回false。

若是線程池處於shutdown狀態且firstTask不等於null,此時也沒法建立worker線程。由於處於shutdown狀態的線程池不會去接收新的任務。

若是線程池處於shutdown狀態且firstTask等於null且workQueue阻塞隊列爲空,此時就更沒有必要建立worker線程了。由於firstTask爲null,就是爲了建立一個沒有任務的worker線程去阻塞隊列裏面獲取任務。而阻塞隊列都已經爲空,那麼再建立一個firstTask爲null的worker線程顯然沒有什麼意思,返回false便可。

  1. 判斷線程池中的線程數量是否超過最大值。當core爲true,最大值爲corePoolSize。當core爲false,最大值爲maximumPoolSize。若是超過最大值,也沒法建立worker線程,直接返回false便可。若是沒有超過最大值,經過CAS操做讓當前線程數加1,而後經過標籤跳轉跳出循環體至retry:位置。若是CAS操做失敗,說明workerCount被其餘線程修改過。咱們再次獲取ctl,判斷當前線程池狀態和以前的狀態是否匹配。若是不匹配,說明線程池狀態發生變動,繼續循環操做。

3.經過傳入來的firstTask建立worker線程。Worker的構造方法中經過setState(-1)設置state(同步狀態)爲-1。Worker繼承了AbstractQueuedSynchronizer,其自己是一把不可重入鎖。getThreadFactory().newThread(this)建立新線程,由於Worker實現了Runnable接口,其自己也是一個可執行的任務。

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
複製代碼

4.咱們往workers添加worker線程時,經過ReentrantLock保證線程安全。只有在當前線程池處於RUNNING狀態或者是處於SHUTDOWN狀態且firstTask等於null的狀況下,才能夠添加worker線程。若是worker線程已經處於啓動且未死亡的狀態,會拋出IllegalThreadStateException異常。

添加完畢後,啓動worker線程。若是worker線程啓動成功返回true,啓動失敗調用addWorkerFailed()進行回滾操做。

private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
複製代碼
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

Worker

咱們來看下ThreadPoolExecutor的內部類Worker,上文已經說到Worker繼承了AbstractQueuedSynchronizer類且實現了Runnable接口。因此說是一個可執行的任務,也是一把不可重入鎖,具備排他性。

1.咱們建立Worker對象時,默認的state爲-1。咱們中斷的時候,要獲取worker對象的鎖(state從0 CAS到1)。獲取鎖成功後,才能進行中斷。這說明了在初始化worker對象階段,不容許中斷。只有調用了runWorker()以後,將state置爲0,才能中斷。

2.shutdown()中調用interruptIdleWorkers()中斷空閒線程和shutdownNow()中調用interruptWorkers()中斷全部線程。

interruptIdleWorkers()中中斷空閒線程的前提是要獲取worker對象的鎖。

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
複製代碼

interruptWorkers()中中斷全部線程時,不用調用tryLock()獲取worker對象的鎖,最終是經過worker中的interruptIfStarted()來中斷線程。在這個方法中只有state大於等於0且線程不等於null且線程沒有被中斷過,才能進行中斷操做。說明只有通過了runworker()階段才能進行中斷操做。

這也是Worker爲何要設計成不可重入的緣由,就是爲了防止中斷在運行中的任務,只會中斷在等待從workQueue中經過getTask()獲取任務的線程(由於他們沒有上鎖,此時state爲0)。

如下這5種方法都會調用到interruptIdleWorkers()去中斷空閒線程。

setCorePoolSize()
setKeepAliveTime(long time, TimeUnit unit)
setMaximumPoolSize(int maximumPoolSize)
shutdown()
allowCoreThreadTimeOut(boolean value)
複製代碼

還有一點必須強調。Task沒有真正的被執行,執行的是Work線程。Work線程中只是調用到了Task中的run()方法。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
複製代碼

runWorker

1.work線程啓動後,會調用其run()方法。run()方法再去調用runWorker(this)方法。

2.執行任務以前,獲取work線程中的task,而後釋放worker的鎖。讓state狀態從-1 CAS到0。當state爲0,說明能夠去中斷此線程。

3.以輪詢的方式經過getTask()從阻塞隊列中獲取task,當task爲null,跳出輪詢。

4.開始執行任務的時候,經過lock()獲取鎖,將state從0 CAS到1。任務執行完畢時,經過unlock()釋放鎖。

5.若是線程池處於STOP、TIDYING、TERMINATE狀態,要中斷worker線程。

6.經過beforeExecute(wt, task)和afterExecute(task, thrown)對task進行前置和後置處理。

7.在task.run()、beforeExecute(wt, task)、afterExecute(task, thrown)發生異常時都會致使worker線程終止。經過調用processWorkerExit(w, completedAbruptly)來進行worker退出操做。

8.在getTask()獲取阻塞隊列中的任務,若是隊列中沒有任務或者是獲取任務超時,都會調用processWorkerExit(w, completedAbruptly)來進行worker退出操做。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
複製代碼

getTask

上文已經提起過getTask()方法,主要是從阻塞隊列獲取task的。那麼條件下task會返回null呢?咱們能夠經過註釋獲得一些信息。

  • 超過了maximumPoolSize設置的線程數量,由於調用了setMaximumPoolSize()方法。
  • 線程池處於stop狀態。
  • 線程池處於shutdown狀態且workQueue爲空.
  • 獲取任務等待超時。

1.首先獲取線程池運行狀態,若是線程池的狀態處於shutdown狀態且workQueue爲空,或者處於stop狀態。而後調用decrementWorkerCount()遞減workerCount,最後返回null。

* Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
複製代碼

2.allowCoreThreadTimeOut默認爲false。爲false的時候,核心線程即時在空閒時也會保持活躍。爲true的時候,核心線程在keepAliveTime時間範圍內等待工做。若是線程池的數量超過maximumPoolSize或者等待任務超時或者workQueue爲空,那麼直接經過CAS減小workerCount數量,返回null。

3.若是timed爲true,經過workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)獲取task,等待時間超過了keepAliveTime還沒獲取到task,直接返回null。若是timed爲false,經過workQueue.take()獲取task。若是沒有獲取到task,會一直阻塞當前線程直到獲取到task(當阻塞隊列中加入了新的任務,會喚醒當前線程)爲止。

4.若是獲取task成功,就直接返回。若是獲取task超時,timedOut會置爲true,會在下一次循環中以返回null了結。

再強調一點,只有當線程池中的線程數量大於corePoolSize纔會進行獲取任務超時檢查,這也體現線程池中的一種策略:當線程池中線程數量達到maximumPoolSize大小後,若是一直沒有任務進來,會逐漸減小workerCount直到線程數量等於corePoolSize。

/**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
複製代碼

processWorkerExit

1.completedAbruptly爲true,說明worker線程時忽然終止,說明執行task.run()發生了異常,因此要經過CAS減小workerCount的數量。 2.completedAbruptly爲false,說明worker線程是正常終止,不須要對workerCount進行減小的操做。由於在getTask()中已經作了此操做。

3.對worker完成的任務數進行統計,而且從workers集合中移出。

4.調用tryTerminate()方法,嘗試終止線程池。若是狀態知足的話,線程池還存在線程,會調用interruptIdleWorkers(ONLY_ONE)進行中斷處理,使其進入退出流程。若是線程池中的線程數量等於0的話,經過CAS把線程池的狀態更新到TIDYING。而後經過terminated()進行一些結束的處理,最後經過CAS把線程池狀態更新到TERMINATED。最後的最後,調用termination.signalAll()喚醒等待的線程,通知它們線程池已經終止。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
複製代碼

5.獲取線程池的狀態。若是線程池的狀態還處於RUNNING、SHUTDOWN,說明tryTerminate()沒有成功。若是worker線程是忽然終止的話,經過addWorker(null, false)再建立一個沒有task的worker線程去處理任務。

6.若是worker線程是正常終止的話,且當前線程池中的線程數量小於須要維護的數量,咱們也會經過addWorker(null, false)再建立一個沒有task的worker線程去處理任務。

7.默認狀況下allowCoreThreadTimeOut爲false,那麼min就等於corePoolSize。那麼線程池須要維護的線程數量就是corePoolSize個。若是allowCoreThreadTimeOut爲true,min就等於0。在workQueue不等於空的狀況,min會被賦值成1。此時線程池須要維護的線程池數量是1。

若是線程池處於shutdown狀態,在workQueue不爲空的狀況下,線程池始終會維護corePoolSize個線程。當workQueue爲空的話,線程池會逐漸銷燬這corePoolSize個線程。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } 複製代碼

尾言

你們好,我是cmazxiaoma(寓意是沉夢昂志的小馬),感謝各位閱讀本文章。 小弟不才。 若是您對這篇文章有什麼意見或者錯誤須要改進的地方,歡迎與我討論。 若是您以爲還不錯的話,但願大家能夠點個贊。 但願個人文章對你能有所幫助。 有什麼意見、看法或疑惑,歡迎留言討論。

最後送上:心之所向,素履以往。生如逆旅,一葦以航。

saoqi.png
相關文章
相關標籤/搜索