ThreadPoolExecutor源碼詳解

我以前一篇文章談到了ThreadPoolExecutor的做用(http://my.oschina.net/xionghui/blog/494004),這篇文章介紹下它的原理,並根據原理分析下它的實現源碼。java

咱們先來查看一下ThreadPoolExecutor API,看看它能實現什麼功能,而後看看它是怎麼實現這些功能的。數據庫

ThreadPoolExecutor API

ThreadPoolExecutor API比較長,這裏列出幾個關鍵點:安全

  • 核心和最大池大小:若是運行的線程少於 corePoolSize,則建立新線程來處理請求(即一個Runnable實例),即便其它線程是空閒的。若是運行的線程多於 corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才建立新線程。函數

  • 保持活動時間:若是池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閒時間超過 keepAliveTime 時將會終止。ui

  • 排隊:若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列BlockingQueue,而不添加新的線程。this

  • 被拒絕的任務:當 Executor 已經關閉,或者隊列已滿且線程數量達到maximumPoolSize時(即線程池飽和了)請求將被拒絕。spa

好,ThreadPoolExecutor實現的功能確實不少,我們來屢屢ThreadPoolExecutor 的執行過程:.net

  1. 若是運行的線程少於 corePoolSize,ThreadPoolExecutor 會始終首選建立新的線程來處理請求;注意,這時即便有空閒線程也不會重複使用(這和數據庫鏈接池有很大差異)。線程

  2. 若是運行的線程等於或多於 corePoolSize,則 ThreadPoolExecutor 會將請求加入隊列BlockingQueue,而不添加新的線程(這和數據庫鏈接池也不同)rest

  3. 若是沒法將請求加入隊列(好比隊列已滿),則建立新的線程來處理請求;可是若是建立的線程數超出 maximumPoolSize,在這種狀況下,請求將被拒絕。

到這兒你們應該瞭解了線程池的大概執行過程,下面經過源碼來介紹下ThreadPoolExecutor是如何實現這些過程和功能的。在理解源碼前我們先來考慮幾個問題:

  1. 線程池裏的線程如何重複利用?好比一個線程執行完請求,怎麼控制不退出。

  2. 線程池空閒時線程池裏的線程數量會不會降到0?

  3. 線程池如何保持活動時間?線程能夠設置一段時間內閒置就會退出(經過keepAliveTime 設置)。

  4. 線程池的阻塞隊列有什麼用?

  5. 請求數量太多如何處理過多的請求?

ThreadPoolExecutor源碼

首先看下線程池的執行過程:

execute(Runnable command)是ThreadPoolExecutor的核心處理方法,用於處理Runnable 請求。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
}

poolSize爲線程池內啓動的線程數量,當線程池的poolSize小於核心池corePoolSize時,會去執行addIfUnderCorePoolSize(command),addIfUnderCorePoolSize(Runnable firstTask)會建立一個新線程來處理請求:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

能夠看到,首先加鎖(默認是非公平鎖)已保證線程安全,而後會進行double check,狀態合法則建立新線程。建立新線程處理任務是經過addThread(Runnable firstTask)方法來完成:

 private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

能夠看到建立線程時使用了內部類Worker封裝了請求Runnable,Worker也是一個Runnable,它封裝了firstTask請求,做用後面再介紹。

這裏先介紹下經過threadFactory建立新線程的過程:threadFactory是能夠自定義的(經過ThreadPoolExecutor 的構造函數傳入),默認會使用DefaultThreadFactory,再來看看DefaultThreadFactory是如何建立新線程的:

public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }

代碼很明朗,建立了一個線程,設置爲非守護線程並設置優先級爲5。其中group和namePrefix是在DefaultThreadFactory的構造函數中定義的:

group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";


如今回到addIfUnderCorePoolSize(Runnable firstTask)方法,建立完線程後會直接start,而後就會調用Work的run()方法,這裏介紹下Work的做用:

public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

其中firstTask就是execute(Runnable command)方法傳入的請求,能夠看到,若是firstTask不爲空,則直接執行,不然會經過getTask()從阻塞隊列中獲取等待的任務;到這裏能夠解答第一個問題了:線程池裏的線程如何重複利用?一個線程會執行多個請求(即Runnable),當執行完一個請求後會經過getTask()去獲取新的請求來執行(是從阻塞隊列中獲取,後面會介紹)。下面看看getTask()方法

Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

挑重點介紹:當poolSize小於或等於corePoolSize時,會經過workQueue.take()一直等待,直到workQueue添加新的Runnable,到這裏能夠解答第二個問題了:線程池空閒時線程池裏的線程數量會不會降到0?答案是若是線程池裏的線程數量小於或等於核心線程數(corePoolSize)則不會退出任何線程。

然而當poolSize大於corePoolSize時或經過workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)等待keepAliveTime(ns),這裏能夠解答第三個問題了:線程池如何保持活動時間?答案是經過阻塞隊列workQueue控制。

這裏須要注意下,當poolSize大於corePoolSize時且在keepAliveTime內沒有得到新的請求,則會判斷當前線程是否須要退出,經過workerCanExit()來判斷:

private boolean workerCanExit() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean canExit;
        try {
            canExit = runState >= STOP ||
                workQueue.isEmpty() ||
                (allowCoreThreadTimeOut &&
                 poolSize > Math.max(1, corePoolSize));
        } finally {
            mainLock.unlock();
        }
        return canExit;
    }

從上面能夠看到線程退出的條件爲:運行狀態大於STOP,或者阻塞隊列爲空,或者當前線程數大於核心線程數。達到條件則返回false,此時getTask()會返回空,而後Work的run()方法裏面的while循環則會退出,線程此時會退出並銷燬。注意,退出前會執行workerDone(this)進行一些清理操做:

void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
            if (--poolSize == 0)
                tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }


介紹完了Work的處理過程我們再回到execute(Runnable command)方法,前面已經貼出源碼了,這裏再貼一份:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
}

前面講到當線程池的poolSize小於核心池corePoolSize時會去建立新線程來執行請求,而後若是poolSize超過了corePoolSize則會直接把請求Runnable添加進阻塞隊列workQueue裏,這裏有兩種狀況:

1. 若是添加成功,則直接返回。前面介紹過,線程池的線程會執行完本身的請求後會從阻塞隊列workQueue中取請求來執行。

2.若是添加失敗(好比隊列滿了),則會經過addIfUnderMaximumPoolSize(command)建立新的線程來處理請求。

到這裏能夠解答第四個問題了:線程池的阻塞隊列有什麼用?阻塞隊列有兩個做用:第一是爲了控制線程存活,經過workQueue的take和pull實現;第二是爲了存放Runnable對象,以便線程池裏空閒的線程處理。

下面繼續介紹addIfUnderMaximumPoolSize(command)方法:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

該方法和addIfUnderCorePoolSize(Runnable firstTask)方法相似,大體流程是若是線程池內建立的線程數小於最大線程數maximumPoolSize則建立新線程執行請求,不然返回false。

若是返回false,表示請求數量不能再被處理,此時會調用reject(command)來處理請求:

void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

能夠看到,處理過程很簡單,就直接調用handler來處理請求;這裏的handler能夠自定義(一樣是經過構造函數傳入),handler默認是使用AbortPolicy:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException();
        }

AbortPolicy處理也是很簡單粗暴,直接拋出非受查異常RejectedExecutionException。

到這裏也能夠解答第五個問題了:請求數量太多如何處理過多的請求?答案是經過handler處理的。

ThreadPoolExecutor 鉤子

ThreadPoolExecutor設置確實十分精巧(做者就是大名鼎鼎的Doug Lea),上面介紹了它的一些實現細節;下面再來談談它的一些鉤子。

默認狀況下,線程池的線程只是在新任務到達時才建立和啓動的;若是但願預先啓動線程,能夠使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 。

prestartCoreThread()會建立並啓動一個線程,prestartAllCoreThreads()會啓動因此的corePoolSize個線程

public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null);
    }
    
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addIfUnderCorePoolSize(null))
            ++n;
        return n;
    }
    
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

另外還有兩個經常使用的鉤子方法:beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable)。

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

ThreadPoolExecutor內他們的默認實現爲空方法。咱們能夠擴展它們,它們會在執行請求先後調用:

private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

正確使用ThreadPoolExecutor

咱們一般使用 Executors 工廠方法來配置ThreadPoolExecutor,下面摘自ThreadPoolExecutor API:

相關文章
相關標籤/搜索