JDK源碼分析之concurrent包(二) -- 線程池ThreadPoolExecutor

  上一篇咱們簡單描述了Executor框架的結構,本篇正式開始併發包中部分源碼的解讀。java

  咱們知道,目前主流的商用虛擬機在線程的實現上可能會有所差異。但無論如何實現,在開啓和關閉線程時必定會耗費不少CPU資源,甚至在線程的掛起和恢復JDK1.6都作了自旋鎖的優化。因此,使用線程池來管理和執行多線程任務會大大提升程序執行效率。關於使用線程池的優勢這裏不作過多說明,咱們直接進入Java5併發包中ThreadPoolExecutor的實現的源碼。安全


 

在解讀源碼前,咱們先來看看建立線程池的通常作法和線程池的幾種類別:多線程

1 Executors.newFixedThreadPool(int nThreads); // 建立一個固定線程數的線程池
2 Executors.newScheduledThreadPool(int nThreads); // 建立一個可對線程進行時間調度的線程池
3 Executors.newCachedThreadPool(); // 建立一個可緩衝的無線程數量界限(Integer.MAX_VALUE)的線程池
4 Executors.newSingleThreadExecutor(); // 建立一個可複用的單一線程的線程池

咱們重點來看一、三、4條,在Executors中如何實現的併發

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

能夠看到,差異只是ThreadPoolExecutor的構造方法的參數不一樣,下面來看看ThreadPoolExecutor的構造方法的參數(按順序):框架

  • corePoolSize - 池中所保存的線程數,包括空閒線程。
  • maximumPoolSize - 池中容許的最大線程數。
  • keepAliveTime - 當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間。
  • unit - keepAliveTime 參數的時間單位。
  • workQueue - 執行前用於保持任務的隊列。此隊列僅保持由 execute 方法提交的 Runnable 任務。
  • threadFactory - 執行程序建立新線程時使用的工廠。
  • handler - 因爲超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程序。

從參數說明中看出,一、三、4中的線程池主要是「核心線程數」和「最大線程數」的差異,而keepAliveTime和workQueue的差異是由「核心線程數」和「最大線程數」是否相等來決定的。那麼「核心線程數」和「最大線程數」分別表明什麼?帶着這個疑問進入execute方法,源碼以下:oop

 1 public void execute(Runnable command) {
 2     if (command == null)
 3         throw new NullPointerException();
 4     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
 5         if (runState == RUNNING && workQueue.offer(command)) {
 6             if (runState != RUNNING || poolSize == 0)
 7                 ensureQueuedTaskHandled(command);
 8         }
 9         else if (!addIfUnderMaximumPoolSize(command))
10             reject(command); // is shutdown or saturated
11     }
12 }

第4行的代碼表達一件事:當線程池中當前線程數小於核心線程數時,執行addIfUnderCorePoolSize(command)方法,而且執行成功後再也不執行後面的邏輯。那咱們就來看看這個addIfUnderCorePoolSize(command)方法作了什麼:優化

 1 /**
 2  * Creates and starts a new thread running firstTask as its first
 3  * task, only if fewer than corePoolSize threads are running
 4  * and the pool is not shut down.
 5  * @param firstTask the task the new thread should run first (or
 6  * null if none)
 7  * @return true if successful
 8  */
 9 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
10     Thread t = null;
11     final ReentrantLock mainLock = this.mainLock;
12     mainLock.lock();
13     try {
14         if (poolSize < corePoolSize && runState == RUNNING)
15             t = addThread(firstTask);
16     } finally {
17         mainLock.unlock();
18     }
19     if (t == null)
20         return false;
21     t.start();
22     return true;
23 }

 方法註釋的主要意思是:當運行線程少於核心線程時,就建立並運行一個新的線程。代碼的第15行建立了一個新的線程,第21行運行了這個線程。接下來看看如何建立的這個線程:this

 1 private Thread addThread(Runnable firstTask) {
 2     Worker w = new Worker(firstTask);
 3     Thread t = threadFactory.newThread(w);
 4     if (t != null) {
 5         w.thread = t;
 6         workers.add(w);
 7         int nt = ++poolSize;
 8         if (nt > largestPoolSize)
 9             largestPoolSize = nt;
10     }
11     return t;
12 }

第二行能夠看到,線程池中真正執行的線程是由名爲Worker的內部類來執行的,關於Worker的主要結構和方法以下:spa

注:addThread方法的註釋中強調了要在持有mainLock的鎖時才能調用,mainLock鎖在線程池的安全併發的實現中擔任着很是重要的角色,而且對於firstTask,有一點不一樣的邏輯在,因爲篇幅有限,本文這裏不作重點解讀了線程

 1 private final class Worker implements Runnable {
 2 
 3   // others codes
 4 
 5   /**
 6    * Main run loop
 7    */
 8   public void run() {
 9       try {
10           Runnable task = firstTask;
11           firstTask = null;
12           while (task != null || (task = getTask()) != null) {
13               runTask(task);
14               task = null;
15           }
16       } finally {
17           workerDone(this);
18       }
19   }
20 }

能夠看到,Worker實現了Runnable接口,線程池中執行的線程實際上是Worker的run()方法。而第13行的runTask(task)方法的實現是直接調用了提交到線程池中的Runnable任務的run方法(具體代碼請自行查看源碼,這裏再也不列出,其中還包含一些針對shutdown和shutdownNow的邏輯),還有比較重要的是第12行的getTask()方法,最後來看getTask()的源碼:

 1 Runnable getTask() {
 2     for (;;) {
 3         try {
 4             int state = runState;
 5             if (state > SHUTDOWN)
 6                 return null;
 7             Runnable r;
 8             if (state == SHUTDOWN)  // Help drain queue
 9                 r = workQueue.poll();
10             else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
11                 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
12             else
13                 r = workQueue.take();
14             if (r != null)
15                 return r;
16             if (workerCanExit()) {
17                 if (runState >= SHUTDOWN) // Wake up others
18                     interruptIdleWorkers();
19                 return null;
20             }
21             // Else retry
22         } catch (InterruptedException ie) {
23             // On interruption, re-check runState
24         }
25     }
26 }

以上代碼第13行將線程池保持線程不關閉的實現已經展現出來了:由一個死循環不斷的從隊列中取出提交到線程池中的Runnable任務,而後直接調用其run()方法便可

基於這個原理,咱們就會很容易的看懂其它的一些特性。

讓咱們先回頭看看關於「核心線程」的源碼,回到最開始的execute()的源碼:

 1 public void execute(Runnable command) {
 2     if (command == null)
 3         throw new NullPointerException();
 4     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
 5         if (runState == RUNNING && workQueue.offer(command)) {
 6             if (runState != RUNNING || poolSize == 0)
 7                 ensureQueuedTaskHandled(command);
 8         }
 9         else if (!addIfUnderMaximumPoolSize(command))
10             reject(command); // is shutdown or saturated
11     }
12 }

前面咱們說根據第4行,當線程池中當前線程數小於核心線程數時,執行addIfUnderCorePoolSize(command)方法並再也不執行後面的代碼。而噹噹前線程數大於等於核心線程數時,就會直接執行第5行的workQueue.offer(command),將新任務添加到名爲workQueue隊列中,也就是死循環中不斷取Runnable任務的隊列。這裏這個workQueue是由構造方法傳進來的workQueue隊列。經過Executors建立線程池的一、三、4條種類能夠看出,核心線程=最大線程的線程池,使用最大容量(Integer.MAX_VALUE)的LinkedBlockingQueue隊列,就是說,線程池沒法擴展,超出的Runnable任務所有進入阻塞隊列中,等待Worker執行完。而核心線程<最大線程的線程池,使用無容量的SynchronousQueue隊列,就是說,線程池能夠無限擴展,擴展的線程所有新建Worker並執行。但根據getTask()方法的第10行第11行,超出核心線程數的Worker,空閒時只會存活keepAliveTime時間(構造方法的參數)。

OK,到這裏,經過源碼已經解釋了ThreadPoolExecutor線程池主要的特性的實現原理。

上面羅裏吧嗦的一大堆主要說明了JDK源碼中實現的ThreadPoolExecutor線程池的如下幾個主要特性(來自JDK API的描述):

核心線程數與最大線程數的意義:

ThreadPoolExecutor將根據corePoolSize和maximumPoolSize設置的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable)中提交時,若是運行的線程少於corePoolSize,則建立新線程來處理請求,即便其餘輔助線程是空閒的。若是運行的線程多於corePoolSize而少於maximumPoolSize,則僅當隊列滿時才建立新線程。若是設置的corePoolSize和maximumPoolSize相同,則建立了固定大小的線程池。若是將 maximumPoolSize設置爲基本的無界值(如Integer.MAX_VALUE),則容許池適應任意數量的併發任務。

保持活動時間:

若是池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閒時間超過 keepAliveTime 時將會終止

排隊:

全部 BlockingQueue 均可用於傳輸和保持提交的任務。可使用此隊列與池大小進行交互:

  • 若是運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
  • 若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
  • 若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。 

其它的特性,如終止線程池的幾種方式及被拒絕的任務由構造方法傳入的handler處理等本文並未給出源碼解讀,感興趣的讀者可自行查看JDK源碼。

另外,關於ThreadPoolExecutor的子類ScheduledThreadPoolExecutor,本文不打算詳細介紹了。其核心原理是同樣的,只是多了「Schedule」的功能。而這個任務調度的功能是經過構造時傳入的DelayQueue來實現的,你們若是感興趣能夠看下DelayQueue的介紹:「Delayed元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素」。「延遲期滿」的原理是經過lock包中ReadWriteLock鎖獲取的Condition的awaitNanos(long nanosTimeout)方法來實現的。

總結

本文經過部分關鍵處源碼的解讀,介紹了ThreadPoolExecutor線程池的實現原理。我我的簡單總結爲兩點:

  • 線程池中真正執行的線程是由名爲Worker的內部類來執行的
  • 執行的方式是由一個死循環不斷的從隊列中取出提交到線程池中的Runnable任務,而後直接調用其run()方法

這兩點只是作歸納,真正展開來描述,仍是有不少細節的。 

相關文章
相關標籤/搜索