上一篇咱們簡單描述了Executor框架的結構,本篇正式開始併發包中部分源碼的解讀。java
咱們知道,目前主流的商用虛擬機在線程的實現上可能會有所差異。但無論如何實現,在開啓和關閉線程時必定會耗費不少CPU資源,甚至在線程的掛起和恢復JDK1.6都作了自旋鎖的優化。因此,使用線程池來管理和執行多線程任務會大大提升程序執行效率。關於使用線程池的優勢這裏不作過多說明,咱們直接進入Java5併發包中ThreadPoolExecutor的實現的源碼。安全
在解讀源碼前,咱們先來看看建立線程池的通常作法和線程池的幾種類別:多線程
1 Executors.newFixedThreadPool(int nThreads); // 建立一個固定線程數的線程池 2 Executors.newScheduledThreadPool(int nThreads); // 建立一個可對線程進行時間調度的線程池 3 Executors.newCachedThreadPool(); // 建立一個可緩衝的無線程數量界限(Integer.MAX_VALUE)的線程池 4 Executors.newSingleThreadExecutor(); // 建立一個可複用的單一線程的線程池
咱們重點來看一、三、4條,在Executors中如何實現的併發
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
能夠看到,差異只是ThreadPoolExecutor的構造方法的參數不一樣,下面來看看ThreadPoolExecutor的構造方法的參數(按順序):框架
從參數說明中看出,一、三、4中的線程池主要是「核心線程數」和「最大線程數」的差異,而keepAliveTime和workQueue的差異是由「核心線程數」和「最大線程數」是否相等來決定的。那麼「核心線程數」和「最大線程數」分別表明什麼?帶着這個疑問進入execute方法,源碼以下:oop
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { 5 if (runState == RUNNING && workQueue.offer(command)) { 6 if (runState != RUNNING || poolSize == 0) 7 ensureQueuedTaskHandled(command); 8 } 9 else if (!addIfUnderMaximumPoolSize(command)) 10 reject(command); // is shutdown or saturated 11 } 12 }
第4行的代碼表達一件事:當線程池中當前線程數小於核心線程數時,執行addIfUnderCorePoolSize(command)方法,而且執行成功後再也不執行後面的邏輯。那咱們就來看看這個addIfUnderCorePoolSize(command)方法作了什麼:優化
1 /** 2 * Creates and starts a new thread running firstTask as its first 3 * task, only if fewer than corePoolSize threads are running 4 * and the pool is not shut down. 5 * @param firstTask the task the new thread should run first (or 6 * null if none) 7 * @return true if successful 8 */ 9 private boolean addIfUnderCorePoolSize(Runnable firstTask) { 10 Thread t = null; 11 final ReentrantLock mainLock = this.mainLock; 12 mainLock.lock(); 13 try { 14 if (poolSize < corePoolSize && runState == RUNNING) 15 t = addThread(firstTask); 16 } finally { 17 mainLock.unlock(); 18 } 19 if (t == null) 20 return false; 21 t.start(); 22 return true; 23 }
方法註釋的主要意思是:當運行線程少於核心線程時,就建立並運行一個新的線程。代碼的第15行建立了一個新的線程,第21行運行了這個線程。接下來看看如何建立的這個線程:this
1 private Thread addThread(Runnable firstTask) { 2 Worker w = new Worker(firstTask); 3 Thread t = threadFactory.newThread(w); 4 if (t != null) { 5 w.thread = t; 6 workers.add(w); 7 int nt = ++poolSize; 8 if (nt > largestPoolSize) 9 largestPoolSize = nt; 10 } 11 return t; 12 }
第二行能夠看到,線程池中真正執行的線程是由名爲Worker的內部類來執行的,關於Worker的主要結構和方法以下:spa
(注:addThread方法的註釋中強調了要在持有mainLock的鎖時才能調用,mainLock鎖在線程池的安全併發的實現中擔任着很是重要的角色,而且對於firstTask,有一點不一樣的邏輯在,因爲篇幅有限,本文這裏不作重點解讀了)線程
1 private final class Worker implements Runnable { 2 3 // others codes 4 5 /** 6 * Main run loop 7 */ 8 public void run() { 9 try { 10 Runnable task = firstTask; 11 firstTask = null; 12 while (task != null || (task = getTask()) != null) { 13 runTask(task); 14 task = null; 15 } 16 } finally { 17 workerDone(this); 18 } 19 } 20 }
能夠看到,Worker實現了Runnable接口,線程池中執行的線程實際上是Worker的run()方法。而第13行的runTask(task)方法的實現是直接調用了提交到線程池中的Runnable任務的run方法(具體代碼請自行查看源碼,這裏再也不列出,其中還包含一些針對shutdown和shutdownNow的邏輯),還有比較重要的是第12行的getTask()方法,最後來看getTask()的源碼:
1 Runnable getTask() { 2 for (;;) { 3 try { 4 int state = runState; 5 if (state > SHUTDOWN) 6 return null; 7 Runnable r; 8 if (state == SHUTDOWN) // Help drain queue 9 r = workQueue.poll(); 10 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 11 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); 12 else 13 r = workQueue.take(); 14 if (r != null) 15 return r; 16 if (workerCanExit()) { 17 if (runState >= SHUTDOWN) // Wake up others 18 interruptIdleWorkers(); 19 return null; 20 } 21 // Else retry 22 } catch (InterruptedException ie) { 23 // On interruption, re-check runState 24 } 25 } 26 }
以上代碼第13行將線程池保持線程不關閉的實現已經展現出來了:由一個死循環不斷的從隊列中取出提交到線程池中的Runnable任務,而後直接調用其run()方法便可。
基於這個原理,咱們就會很容易的看懂其它的一些特性。
讓咱們先回頭看看關於「核心線程」的源碼,回到最開始的execute()的源碼:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { 5 if (runState == RUNNING && workQueue.offer(command)) { 6 if (runState != RUNNING || poolSize == 0) 7 ensureQueuedTaskHandled(command); 8 } 9 else if (!addIfUnderMaximumPoolSize(command)) 10 reject(command); // is shutdown or saturated 11 } 12 }
前面咱們說根據第4行,當線程池中當前線程數小於核心線程數時,執行addIfUnderCorePoolSize(command)方法並再也不執行後面的代碼。而噹噹前線程數大於等於核心線程數時,就會直接執行第5行的workQueue.offer(command),將新任務添加到名爲workQueue隊列中,也就是死循環中不斷取Runnable任務的隊列。這裏這個workQueue是由構造方法傳進來的workQueue隊列。經過Executors建立線程池的一、三、4條種類能夠看出,核心線程=最大線程的線程池,使用最大容量(Integer.MAX_VALUE)的LinkedBlockingQueue隊列,就是說,線程池沒法擴展,超出的Runnable任務所有進入阻塞隊列中,等待Worker執行完。而核心線程<最大線程的線程池,使用無容量的SynchronousQueue隊列,就是說,線程池能夠無限擴展,擴展的線程所有新建Worker並執行。但根據getTask()方法的第10行和第11行,超出核心線程數的Worker,空閒時只會存活keepAliveTime時間(構造方法的參數)。
OK,到這裏,經過源碼已經解釋了ThreadPoolExecutor線程池主要的特性的實現原理。
上面羅裏吧嗦的一大堆主要說明了JDK源碼中實現的ThreadPoolExecutor線程池的如下幾個主要特性(來自JDK API的描述):
核心線程數與最大線程數的意義:
ThreadPoolExecutor將根據corePoolSize和maximumPoolSize設置的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable)中提交時,若是運行的線程少於corePoolSize,則建立新線程來處理請求,即便其餘輔助線程是空閒的。若是運行的線程多於corePoolSize而少於maximumPoolSize,則僅當隊列滿時才建立新線程。若是設置的corePoolSize和maximumPoolSize相同,則建立了固定大小的線程池。若是將 maximumPoolSize設置爲基本的無界值(如Integer.MAX_VALUE),則容許池適應任意數量的併發任務。
保持活動時間:
若是池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閒時間超過 keepAliveTime 時將會終止
排隊:
全部
BlockingQueue
均可用於傳輸和保持提交的任務。可使用此隊列與池大小進行交互:
- 若是運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
- 若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
- 若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。
其它的特性,如終止線程池的幾種方式及被拒絕的任務由構造方法傳入的handler處理等本文並未給出源碼解讀,感興趣的讀者可自行查看JDK源碼。
另外,關於ThreadPoolExecutor的子類ScheduledThreadPoolExecutor,本文不打算詳細介紹了。其核心原理是同樣的,只是多了「Schedule」的功能。而這個任務調度的功能是經過構造時傳入的DelayQueue來實現的,你們若是感興趣能夠看下DelayQueue的介紹:「Delayed元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素」。「延遲期滿」的原理是經過lock包中ReadWriteLock鎖獲取的Condition的awaitNanos(long nanosTimeout)方法來實現的。
本文經過部分關鍵處源碼的解讀,介紹了ThreadPoolExecutor線程池的實現原理。我我的簡單總結爲兩點:
這兩點只是作歸納,真正展開來描述,仍是有不少細節的。