我以前一篇文章談到了ThreadPoolExecutor的做用(http://my.oschina.net/xionghui/blog/494004),這篇文章介紹下它的原理,並根據原理分析下它的實現源碼。java
咱們先來查看一下ThreadPoolExecutor API,看看它能實現什麼功能,而後看看它是怎麼實現這些功能的。數據庫
ThreadPoolExecutor API比較長,這裏列出幾個關鍵點:安全
核心和最大池大小:若是運行的線程少於 corePoolSize,則建立新線程來處理請求(即一個Runnable實例),即便其它線程是空閒的。若是運行的線程多於 corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才建立新線程。函數
保持活動時間:若是池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閒時間超過 keepAliveTime 時將會終止。ui
排隊:若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列BlockingQueue,而不添加新的線程。this
被拒絕的任務:當 Executor 已經關閉,或者隊列已滿且線程數量達到maximumPoolSize時(即線程池飽和了),請求將被拒絕。spa
好,ThreadPoolExecutor實現的功能確實不少,我們來屢屢ThreadPoolExecutor 的執行過程:.net
若是運行的線程少於 corePoolSize,ThreadPoolExecutor 會始終首選建立新的線程來處理請求;注意,這時即便有空閒線程也不會重複使用(這和數據庫鏈接池有很大差異)。線程
若是運行的線程等於或多於 corePoolSize,則 ThreadPoolExecutor 會將請求加入隊列BlockingQueue,而不添加新的線程(這和數據庫鏈接池也不同)。rest
若是沒法將請求加入隊列(好比隊列已滿),則建立新的線程來處理請求;可是若是建立的線程數超出 maximumPoolSize,在這種狀況下,請求將被拒絕。
到這兒你們應該瞭解了線程池的大概執行過程,下面經過源碼來介紹下ThreadPoolExecutor是如何實現這些過程和功能的。在理解源碼前我們先來考慮幾個問題:
線程池裏的線程如何重複利用?好比一個線程執行完請求,怎麼控制不退出。
線程池空閒時線程池裏的線程數量會不會降到0?
線程池如何保持活動時間?線程能夠設置一段時間內閒置就會退出(經過keepAliveTime 設置)。
線程池的阻塞隊列有什麼用?
請求數量太多如何處理過多的請求?
首先看下線程池的執行過程:
execute(Runnable command)是ThreadPoolExecutor的核心處理方法,用於處理Runnable 請求。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
poolSize爲線程池內啓動的線程數量,當線程池的poolSize小於核心池corePoolSize時,會去執行addIfUnderCorePoolSize(command),addIfUnderCorePoolSize(Runnable firstTask)會建立一個新線程來處理請求:
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
能夠看到,首先加鎖(默認是非公平鎖)已保證線程安全,而後會進行double check,狀態合法則建立新線程。建立新線程處理任務是經過addThread(Runnable firstTask)方法來完成:
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); if (t != null) { w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
能夠看到建立線程時使用了內部類Worker封裝了請求Runnable,Worker也是一個Runnable,它封裝了firstTask請求,做用後面再介紹。
這裏先介紹下經過threadFactory建立新線程的過程:threadFactory是能夠自定義的(經過ThreadPoolExecutor 的構造函數傳入),默認會使用DefaultThreadFactory,再來看看DefaultThreadFactory是如何建立新線程的:
public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
代碼很明朗,建立了一個線程,設置爲非守護線程並設置優先級爲5。其中group和namePrefix是在DefaultThreadFactory的構造函數中定義的:
group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
如今回到addIfUnderCorePoolSize(Runnable firstTask)方法,建立完線程後會直接start,而後就會調用Work的run()方法,這裏介紹下Work的做用:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
其中firstTask就是execute(Runnable command)方法傳入的請求,能夠看到,若是firstTask不爲空,則直接執行,不然會經過getTask()從阻塞隊列中獲取等待的任務;到這裏能夠解答第一個問題了:線程池裏的線程如何重複利用?一個線程會執行多個請求(即Runnable),當執行完一個請求後會經過getTask()去獲取新的請求來執行(是從阻塞隊列中獲取,後面會介紹)。下面看看getTask()方法:
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
挑重點介紹:當poolSize小於或等於corePoolSize時,會經過workQueue.take()一直等待,直到workQueue添加新的Runnable,到這裏能夠解答第二個問題了:線程池空閒時線程池裏的線程數量會不會降到0?答案是若是線程池裏的線程數量小於或等於核心線程數(corePoolSize)則不會退出任何線程。
然而當poolSize大於corePoolSize時或經過workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)等待keepAliveTime(ns),這裏能夠解答第三個問題了:線程池如何保持活動時間?答案是經過阻塞隊列workQueue控制。
這裏須要注意下,當poolSize大於corePoolSize時且在keepAliveTime內沒有得到新的請求,則會判斷當前線程是否須要退出,經過workerCanExit()來判斷:
private boolean workerCanExit() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean canExit; try { canExit = runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1, corePoolSize)); } finally { mainLock.unlock(); } return canExit; }
從上面能夠看到線程退出的條件爲:運行狀態大於STOP,或者阻塞隊列爲空,或者當前線程數大於核心線程數。達到條件則返回false,此時getTask()會返回空,而後Work的run()方法裏面的while循環則會退出,線程此時會退出並銷燬。注意,退出前會執行workerDone(this)進行一些清理操做:
void workerDone(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); if (--poolSize == 0) tryTerminate(); } finally { mainLock.unlock(); } }
介紹完了Work的處理過程我們再回到execute(Runnable command)方法,前面已經貼出源碼了,這裏再貼一份:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
前面講到當線程池的poolSize小於核心池corePoolSize時會去建立新線程來執行請求,而後若是poolSize超過了corePoolSize則會直接把請求Runnable添加進阻塞隊列workQueue裏,這裏有兩種狀況:
1. 若是添加成功,則直接返回。前面介紹過,線程池的線程會執行完本身的請求後會從阻塞隊列workQueue中取請求來執行。
2.若是添加失敗(好比隊列滿了),則會經過addIfUnderMaximumPoolSize(command)建立新的線程來處理請求。
到這裏能夠解答第四個問題了:線程池的阻塞隊列有什麼用?阻塞隊列有兩個做用:第一是爲了控制線程存活,經過workQueue的take和pull實現;第二是爲了存放Runnable對象,以便線程池裏空閒的線程處理。
下面繼續介紹addIfUnderMaximumPoolSize(command)方法:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
該方法和addIfUnderCorePoolSize(Runnable firstTask)方法相似,大體流程是若是線程池內建立的線程數小於最大線程數maximumPoolSize則建立新線程執行請求,不然返回false。
若是返回false,表示請求數量不能再被處理,此時會調用reject(command)來處理請求:
void reject(Runnable command) { handler.rejectedExecution(command, this); }
能夠看到,處理過程很簡單,就直接調用handler來處理請求;這裏的handler能夠自定義(一樣是經過構造函數傳入),handler默認是使用AbortPolicy:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); }
AbortPolicy處理也是很簡單粗暴,直接拋出非受查異常RejectedExecutionException。
到這裏也能夠解答第五個問題了:請求數量太多如何處理過多的請求?答案是經過handler處理的。
ThreadPoolExecutor設置確實十分精巧(做者就是大名鼎鼎的Doug Lea),上面介紹了它的一些實現細節;下面再來談談它的一些鉤子。
默認狀況下,線程池的線程只是在新任務到達時才建立和啓動的;若是但願預先啓動線程,能夠使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 。
prestartCoreThread()會建立並啓動一個線程,prestartAllCoreThreads()會啓動因此的corePoolSize個線程:
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null)) ++n; return n; } private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
另外還有兩個經常使用的鉤子方法:beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable)。
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { }
ThreadPoolExecutor內他們的默認實現爲空方法。咱們能夠擴展它們,它們會在執行請求先後調用:
private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); boolean ran = false; beforeExecute(thread, task); try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } }
咱們一般使用 Executors 工廠方法來配置ThreadPoolExecutor,下面摘自ThreadPoolExecutor API: