線程池是很是重要的工具,若是你要成爲一個好的工程師,仍是得比較好地掌握這個知識。即便你爲了謀生,也要知道,這基本上是面試必問的題目,並且面試官很容易從被面試者的回答中捕捉到被面試者的技術水平。java
本文略長,建議在 pc 上閱讀,邊看文章邊翻源碼(Java7 和 Java8 都同樣),建議想好好看的讀者抽出至少 15 至 30 分鐘的整塊時間來閱讀。固然,若是讀者僅爲面試準備,能夠直接滑到最後的總結部分。面試
開篇來一些廢話。下圖是 java 線程池幾個相關類的繼承結構:
多線程
先簡單說說這個繼承結構,Executor 位於最頂層,也是最簡單的,就一個 execute(Runnable runnable) 接口方法定義。併發
ExecutorService 也是接口,在 Executor 接口的基礎上添加了不少的接口方法,因此通常來講咱們會使用這個接口。app
而後再下來一層是 AbstractExecutorService,從名字咱們就知道,這是抽象類,這裏實現了很是有用的一些方法供子類直接使用,以後咱們再細說。工具
而後纔到咱們的重點部分 ThreadPoolExecutor 類,這個類提供了關於線程池所需的很是豐富的功能。oop
另外,咱們還涉及到下圖中的這些類:
源碼分析
同在併發包中的 Executors 類,類名中帶字母 s,咱們猜到這個是工具類,裏面的方法都是靜態方法,如如下咱們最經常使用的用於生成 ThreadPoolExecutor 的實例的一些方法:性能
public static ExecutorService newCachedThreadPool() {ui
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
另外,因爲線程池支持獲取線程執行的結果,因此,引入了 Future 接口,RunnableFuture 繼承自此接口,而後咱們最須要關心的就是它的實現類 FutureTask。到這裏,記住這個概念,在線程池的使用過程當中,咱們是往線程池提交任務(task),使用過線程池的都知道,咱們提交的每一個任務是實現了 Runnable 接口的,其實就是先將 Runnable 的任務包裝成 FutureTask,而後再提交到線程池。這樣,讀者才能比較容易記住 FutureTask 這個類名:它首先是一個任務(Task),而後具備 Future 接口的語義,便可以在未來(Future)獲得執行的結果。
固然,線程池中的 BlockingQueue 也是很是重要的概念,若是線程數達到 corePoolSize,咱們的每一個任務會提交到等待隊列中,等待線程池中的線程來取任務並執行。這裏的 BlockingQueue 一般咱們使用其實現類 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每一個實現類都有不一樣的特徵,使用場景以後會慢慢分析。想要詳細瞭解各個 BlockingQueue 的讀者,能夠參考個人前面的一篇對 BlockingQueue 的各個實現類進行詳細分析的文章。
把事情說完整:除了上面說的這些類外,還有一個很重要的類,就是定時任務實現類 ScheduledThreadPoolExecutor,它繼承自本文要重點講解的 ThreadPoolExecutor,用於實現定時執行。不過本文不會介紹它的實現,我相信讀者看完本文後能夠比較容易地看懂它的源碼。
以上就是本文要介紹的知識,廢話很少說,開始進入正文。
/*
*/
public interface Executor {
void execute(Runnable command);
}
咱們能夠看到 Executor 接口很是簡單,就一個 void execute(Runnable command) 方法,表明提交一個任務。爲了讓你們理解 java 線程池的整個設計方案,我會按照 Doug Lea 的設計思路來多說一些相關的東西。
咱們常常這樣啓動一個線程:
new Thread(new Runnable(){
// do something
}).start();
用了線程池 Executor 後就能夠像下面這麼使用:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
若是咱們但願線程池同步執行每個任務,咱們能夠這麼實現這個接口:
class DirectExecutor implements Executor {
public void execute(Runnable r) { r.run();// 這裏不是用的new Thread(r).start(),也就是說沒有啓動任何一個新的線程。 }
}
咱們但願每一個任務提交進來後,直接啓動一個新的線程來執行這個任務,咱們能夠這麼實現:
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); // 每一個任務都用一個新的線程來執行 }
}
咱們再來看下怎麼組合兩個 Executor 來使用,下面這個實現是將全部的任務都加到一個 queue 中,而後從 queue 中取任務,交給真正的執行器執行,這裏採用 synchronized 進行併發控制:
class SerialExecutor implements Executor {
// 任務隊列 final Queue<Runnable> tasks = new ArrayDeque<Runnable>(); // 這個纔是真正的執行器 final Executor executor; // 當前正在執行的任務 Runnable active; // 初始化的時候,指定執行器 SerialExecutor(Executor executor) { this.executor = executor; } // 添加任務到線程池: 將任務添加到任務隊列,scheduleNext 觸發執行器去任務隊列取任務 public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { // 具體的執行轉給真正的執行器 executor executor.execute(active); } }
}
固然了,Executor 這個接口只有提交任務的功能,太簡單了,咱們想要更豐富的功能,好比咱們想知道執行結果、咱們想知道當前線程池有多少個線程活着、已經完成了多少任務等等,這些都是這個接口的不足的地方。接下來咱們要介紹的是繼承自 Executor 接口的 ExecutorService 接口,這個接口提供了比較豐富的功能,也是咱們最常使用到的接口。
通常咱們定義一個線程池的時候,每每都是使用這個接口:
ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);
由於這個接口中定義的一系列方法大部分狀況下已經能夠知足咱們的須要了。
那麼咱們簡單初略地來看一下這個接口中都有哪些方法:
public interface ExecutorService extends Executor {
// 關閉線程池,已提交的任務繼續執行,不接受繼續提交新任務 void shutdown(); // 關閉線程池,嘗試中止正在執行的全部任務,不接受繼續提交新任務 // 它和前面的方法相比,加了一個單詞「now」,區別在於它會去中止當前正在進行的任務 List<Runnable> shutdownNow(); // 線程池是否已關閉 boolean isShutdown(); // 若是調用了 shutdown() 或 shutdownNow() 方法後,全部任務結束了,那麼返回true // 這個方法必須在調用shutdown或shutdownNow方法以後調用纔會返回true boolean isTerminated(); // 等待全部任務完成,並設置超時時間 // 咱們這麼理解,實際應用中是,先調用 shutdown 或 shutdownNow, // 而後再調這個方法等待全部的線程真正地完成,返回值意味着有沒有超時 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交一個 Callable 任務 <T> Future<T> submit(Callable<T> task); // 提交一個 Runnable 任務,第二個參數將會放到 Future 中,做爲返回值, // 由於 Runnable 的 run 方法自己並不返回任何東西 <T> Future<T> submit(Runnable task, T result); // 提交一個 Runnable 任務 Future<?> submit(Runnable task); // 執行全部任務,返回 Future 類型的一個 list <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 也是執行全部任務,可是這裏設置了超時時間 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 只有其中的一個任務結束了,就能夠返回,返回執行完的那個任務的結果 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // 同上一個方法,只有其中的一個任務結束了,就能夠返回,返回執行完的那個任務的結果, // 不過這個帶超時,超過指定的時間,拋出 TimeoutException 異常 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
這些方法都很好理解,一個簡單的線程池主要就是這些功能,能提交任務,能獲取結果,能關閉線程池,這也是爲何咱們常常用這個接口的緣由。
在繼續往下層介紹 ExecutorService 的實現類以前,咱們先來講說相關的類 FutureTask。
Future -> RunnableFuture -> FutureTask
Runnable -> RunnableFuture
FutureTask 經過 RunnableFuture 間接實現了 Runnable 接口,
因此每一個 Runnable 一般都先包裝成 FutureTask,
而後調用 executor.execute(Runnable command) 將其提交給線程池
咱們知道,Runnable 的 void run() 方法是沒有返回值的,因此,一般,若是咱們須要的話,會在 submit 中指定第二個參數做爲返回值:
<T> Future<T> submit(Runnable task, T result);
其實到時候會經過這兩個參數,將其包裝成 Callable。
Callable 也是由於線程池的須要,因此纔有了這個接口。它和 Runnable 的區別在於 run() 沒有返回值,而 Callable 的 call() 方法有返回值,同時,若是運行出現異常,call() 方法會拋出異常。
public interface Callable<V> {
V call() throws Exception;
}
在這裏,就不展開說 FutureTask 類了,由於本文篇幅原本就夠大了,這裏咱們須要知道怎麼用就好了。
下面,咱們來看看 ExecutorService 的抽象實現 AbstractExecutorService 。
AbstractExecutorService 抽象類派生自 ExecutorService 接口,而後在其基礎上實現了幾個實用的方法,這些方法提供給子類進行調用。
這個抽象類實現了 invokeAny 方法和 invokeAll 方法,這裏的兩個 newTaskFor 方法也比較有用,用於將任務包裝成 FutureTask。定義於最上層接口 Executor中的 void execute(Runnable command) 因爲不須要獲取結果,不會進行 FutureTask 的包裝。
須要獲取結果(FutureTask),用 submit 方法,不須要獲取結果,能夠用 execute 方法。
下面,我將一行一行源碼地來分析這個類,跟着源碼來看看其實現吧:
Tips: invokeAny 和 invokeAll 方法佔了這整個類的絕大多數篇幅,讀者能夠選擇適當跳過,由於它們可能在你的實踐中使用的頻次比較低,並且它們不帶有承前啓後的做用,不用擔憂會漏掉什麼致使看不懂後面的代碼。
public abstract class AbstractExecutorService implements ExecutorService {
// RunnableFuture 是用於獲取執行結果的,咱們經常使用它的子類 FutureTask // 下面兩個 newTaskFor 方法用於將咱們的任務包裝成 FutureTask 提交到線程池中執行 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } // 提交任務 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 1. 將任務包裝成 FutureTask RunnableFuture<Void> ftask = newTaskFor(task, null); // 2. 交給執行器執行,execute 方法由具體的子類來實現 // 前面也說了,FutureTask 間接實現了Runnable 接口。 execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 1. 將任務包裝成 FutureTask RunnableFuture<T> ftask = newTaskFor(task, result); // 2. 交給執行器執行 execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 1. 將任務包裝成 FutureTask RunnableFuture<T> ftask = newTaskFor(task); // 2. 交給執行器執行 execute(ftask); return ftask; } // 此方法目的:將 tasks 集合中的任務提交到線程池執行,任意一個線程執行完後就能夠結束了 // 第二個參數 timed 表明是否設置超時機制,超時時間爲第三個參數, // 若是 timed 爲 true,同時超時了尚未一個線程返回結果,那麼拋出 TimeoutException 異常 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); // 任務數 int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); // List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); // ExecutorCompletionService 不是一個真正的執行器,參數 this 纔是真正的執行器 // 它對執行器進行了包裝,每一個任務結束後,將結果保存到內部的一個 completionQueue 隊列中 // 這也是爲何這個類的名字裏面有個 Completion 的緣由吧。 ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); try { // 用於保存異常信息,此方法若是沒有獲得任何有效的結果,那麼咱們能夠拋出最後獲得的一個異常 ExecutionException ee = null; long lastTime = timed ? System.nanoTime() : 0; Iterator<? extends Callable<T>> it = tasks.iterator(); // 首先先提交一個任務,後面的任務到下面的 for 循環一個個提交 futures.add(ecs.submit(it.next())); // 提交了一個任務,因此任務數量減 1 --ntasks; // 正在執行的任務數(提交的時候 +1,任務結束的時候 -1) int active = 1; for (;;) { // ecs 上面說了,其內部有一個 completionQueue 用於保存執行完成的結果 // BlockingQueue 的 poll 方法不阻塞,返回 null 表明隊列爲空 Future<T> f = ecs.poll(); // 爲 null,說明剛剛提交的第一個線程尚未執行完成 // 在前面先提交一個任務,加上這裏作一次檢查,也是爲了提升性能 if (f == null) { if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } // 這裏是 else if,不是 if。這裏說明,沒有任務了,同時 active 爲 0 說明 // 任務都執行完成了。其實我也沒理解爲何這裏作一次 break? // 由於我認爲 active 爲 0 的狀況,必然從下面的 f.get() 返回了 // 2018-02-23 感謝讀者 newmicro 的 comment, // 這裏的 active == 0,說明全部的任務都執行失敗,那麼這裏是 for 循環出口 else if (active == 0) break; // 這裏也是 else if。這裏說的是,沒有任務了,可是設置了超時時間,這裏檢測是否超時 else if (timed) { // 帶等待的 poll 方法 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); // 若是已經超時,拋出 TimeoutException 異常,這整個方法就結束了 if (f == null) throw new TimeoutException(); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } // 這裏是 else。說明,沒有任務須要提交,可是池中的任務沒有完成,尚未超時(若是設置了超時) // take() 方法會阻塞,直到有元素返回,說明有任務結束了 else f = ecs.take(); } /* * 我感受上面這一段並非很好理解,這裏簡單說下。 * 1. 首先,這在一個 for 循環中,咱們設想每個任務都沒那麼快結束, * 那麼,每一次都會進到第一個分支,進行提交任務,直到將全部的任務都提交了 * 2. 任務都提交完成後,若是設置了超時,那麼 for 循環其實進入了「一直檢測是否超時」 這件事情上 * 3. 若是沒有設置超時機制,那麼沒必要要檢測超時,那就會阻塞在 ecs.take() 方法上, 等待獲取第一個執行結果 * 4. 若是全部的任務都執行失敗,也就是說 future 都返回了, 可是 f.get() 拋出異常,那麼從 active == 0 分支出去(感謝 newmicro 提出) // 固然,這個須要看下面的 if 分支。 */ // 有任務結束了 if (f != null) { --active; try { // 返回執行結果,若是有異常,都包裝成 ExecutionException return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } }// 注意看 for 循環的範圍,一直到這裏 if (ee == null) ee = new ExecutionException(); throw ee; } finally { // 方法退出以前,取消其餘的任務 for (Future<T> f : futures) f.cancel(true); } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } // 執行全部的任務,返回任務結果。 // 先不要看這個方法,咱們先想一想,其實咱們本身提交任務到線程池,也是想要線程池執行全部的任務 // 只不過,咱們是每次 submit 一個任務,這裏以一個集合做爲參數提交 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { // 這個很簡單 for (Callable<T> t : tasks) { // 包裝成 FutureTask RunnableFuture<T> f = newTaskFor(t); futures.add(f); // 提交任務 execute(f); } for (Future<T> f : futures) { if (!f.isDone()) { try { // 這是一個阻塞方法,直到獲取到值,或拋出了異常 // 這裏有個小細節,其實 get 方法簽名上是會拋出 InterruptedException 的 // 但是這裏沒有進行處理,而是拋給外層去了。此異常發生於還沒執行完的任務被取消了 f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; // 這個方法返回,不像其餘的場景,返回 List<Future>,其實執行結果還沒出來 // 這個方法返回是真正的返回,任務都結束了 return futures; } finally { // 爲何要這個?就是上面說的有異常的狀況 if (!done) for (Future<T> f : futures) f.cancel(true); } } // 帶超時的 invokeAll,咱們找不一樣吧 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null || unit == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) futures.add(newTaskFor(t)); long lastTime = System.nanoTime(); Iterator<Future<T>> it = futures.iterator(); // 提交一個任務,檢測一次是否超時 while (it.hasNext()) { execute((Runnable)(it.next())); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; // 超時 if (nanos <= 0) return futures; } for (Future<T> f : futures) { if (!f.isDone()) { if (nanos <= 0) return futures; try { // 調用帶超時的 get 方法,這裏的參數 nanos 是剩餘的時間, // 由於上面其實已經用掉了一些時間了 f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } } done = true; return futures; } finally { if (!done) for (Future<T> f : futures) f.cancel(true); } }
}
到這裏,咱們發現,這個抽象類包裝了一些基本的方法,但是像 submit、invokeAny、invokeAll 等方法,它們都沒有真正開啓線程來執行任務,它們都只是在方法內部調用了 execute 方法,因此最重要的 execute(Runnable runnable) 方法還沒出現,須要等具體執行器來實現這個最重要的部分,這裏咱們要說的就是 ThreadPoolExecutor 類了。
鑑於本文的篇幅,我以爲看到這裏的讀者應該已經很少了,快餐文化使然啊!我寫的每篇文章都力求讓讀者能夠經過個人一篇文章而記住全部的相關知識點,因此篇幅難免長了些。其實,工做了不少年的話,會有一個感受,好比說線程池,即便看了 20 篇各類總結,也不如一篇長文實實在在講解清楚每個知識點,有點少便是多,多便是少的意味了。
ThreadPoolExecutor 是 JDK 中的線程池實現,這個類實現了一個線程池須要的各個方法,它實現了任務提交、線程管理、監控等等方法。
咱們能夠基於它來進行業務上的擴展,以實現咱們須要的其餘功能,好比實現定時任務的類 ScheduledThreadPoolExecutor 就繼承自 ThreadPoolExecutor。固然,這不是本文關注的重點,下面,仍是趕忙進行源碼分析吧。
首先,咱們來看看線程池實現中的幾個概念和處理流程。
咱們先回顧下提交任務的幾個方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;
}
一個最基本的概念是,submit 方法中,參數是 Runnable 類型(也有Callable 類型),這個參數不是用於 new Thread(runnable).start() 中的,此處的這個參數不是用於啓動線程的,這裏指的是任務,任務要作的事情是 run() 方法裏面定義的或 Callable 中的 call() 方法裏面定義的。
初學者每每會搞混這個,由於 Runnable 老是在各個地方出現,常常把一個 Runnable 包到另外一個 Runnable 中。請把它想象成有個 Task 接口,這個接口裏面有一個 run() 方法(我想做者只是不想由於這個再定義一個徹底能夠用 Runnable 來代替的接口,Callable 的出現,徹底是由於 Runnable 不能知足須要)。
咱們回過神來繼續往下看,我畫了一個簡單的示意圖來描述線程池中的一些主要的構件:
固然,上圖沒有考慮隊列是否有界,提交任務時隊列滿了怎麼辦?什麼狀況下會建立新的線程?提交任務時線程池滿了怎麼辦?空閒線程怎麼關掉?這些問題下面咱們會一一解決。
咱們常常會使用 Executors 這個工具類來快速構造一個線程池,對於初學者而言,這種工具類是頗有用的,開發者不須要關注太多的細節,只要知道本身須要一個線程池,僅僅提供必需的參數就能夠了,其餘參數都採用做者提供的默認值。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
這裏先不說有什麼區別,它們最終都會導向這個構造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 這幾個參數都是必需要有的 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
基本上,上面的構造方法中列出了咱們最須要關心的幾個屬性了,下面逐個介紹下構造方法中出現的這幾個屬性:
corePoolSize
核心線程數,不要摳字眼,反正先記着有這麼個屬性就能夠了。
maximumPoolSize
最大線程數,線程池容許建立的最大線程數。
workQueue
任務隊列,BlockingQueue 接口的某個實現(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。
keepAliveTime
空閒線程的保活時間,若是某線程的空閒時間超過這個值都沒有任務給它作,那麼能夠被關閉了。注意這個值並不會對全部線程起做用,若是線程池中的線程數少於等於核心線程數 corePoolSize,那麼這些線程不會由於空閒太長時間而被關閉,固然,也能夠經過調用 allowCoreThreadTimeOut(true)使核心線程數內的線程也能夠被回收。
threadFactory
用於生成線程,通常咱們能夠用默認的就能夠了。一般,咱們能夠經過它將咱們的線程的名字設置得比較可讀一些,如 Message-Thread-1, Message-Thread-2 相似這樣。
handler:
當線程池已經滿了,可是又有新的任務提交的時候,該採起什麼策略由這個來指定。有幾種方式可供選擇,像拋出異常、直接拒絕而後返回等,也能夠本身實現相應的接口實現本身的邏輯,這個以後再說。
除了上面幾個屬性外,咱們再看看其餘重要的屬性。
Doug Lea 採用一個 32 位的整數來存放線程池的狀態和當前池中的線程數,其中高 3 位用於存放線程池狀態,低 29 位表示線程數(即便只有 29 位,也已經不小了,大概 5 億多,如今尚未哪一個機器能起這麼多線程的吧)。咱們知道,java 語言在整數編碼上是統一的,都是採用補碼的形式,下面是簡單的移位操做和布爾操做,都是挺簡單的。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 這裏 COUNT_BITS 設置爲 29(32-3),意味着前三位用於存放線程狀態,後29位用於存放線程數
// 不少初學者很喜歡在本身的代碼中寫不少 29 這種數字,或者某個特殊的字符串,而後分佈在各個地方,這是很是糟糕的
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000 11111111111111111111111111111
// 這裏獲得的是 29 個 1,也就是說線程池的最大線程數是 2^29-1=536870911
// 以咱們如今計算機的實際狀況,這個數量仍是夠用的
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 咱們說了,線程池的狀態存放在高 3 位中
// 運算結果爲 111跟29個0:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 將整數 c 的低 29 位修改成 0,就獲得了線程池的狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 將整數 c 的高 3 爲修改成 0,就獲得了線程池中的線程數
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
上面就是對一個整數的簡單的位操做,幾個操做方法將會在後面的源碼中一直出現,因此讀者最好把方法名字和其表明的功能記住,看源碼的時候也就不須要來來回回翻了。
在這裏,介紹下線程池中的各個狀態和狀態變化的轉換過程:
RUNNING:這個沒什麼好說的,這是最正常的狀態:接受新的任務,處理等待隊列中的任務
SHUTDOWN:不接受新的任務提交,可是會繼續處理等待隊列中的任務
STOP:不接受新的任務提交,再也不處理等待隊列中的任務,中斷正在執行任務的線程
TIDYING:全部的任務都銷燬了,workCount 爲 0。線程池的狀態在轉換爲 TIDYING 狀態時,會執行鉤子方法 terminated()
TERMINATED:terminated() 方法結束後,線程池的狀態就會變成這個
RUNNING 定義爲 -1,SHUTDOWN 定義爲 0,其餘的都比 0 大,因此等於 0 的時候不能提交任務,大於 0 的話,連正在執行的任務也須要中斷。
看了這幾種狀態的介紹,讀者大致也能夠猜到十之八九的狀態轉換了,各個狀態的轉換過程有如下幾種:
RUNNING -> SHUTDOWN:當調用了 shutdown() 後,會發生這個狀態轉換,這也是最重要的
(RUNNING or SHUTDOWN) -> STOP:當調用 shutdownNow() 後,會發生這個狀態轉換,這下要清楚 shutDown() 和 shutDownNow() 的區別了
SHUTDOWN -> TIDYING:當任務隊列和線程池都清空後,會由 SHUTDOWN 轉換爲 TIDYING
STOP -> TIDYING:當任務隊列清空後,發生這個轉換
TIDYING -> TERMINATED:這個前面說了,當 terminated() 方法結束後
上面的幾個記住核心的就能夠了,尤爲第一個和第二個。
另外,咱們還要看看一個內部類 Worker,由於 Doug Lea 把線程池中的線程包裝成了一個個 Worker,翻譯成工人,就是線程池中作任務的線程。因此到這裏,咱們知道任務是 Runnable(內部叫 task 或 command),線程是 Worker。
Worker 這裏又用到了抽象類 AbstractQueuedSynchronizer。題外話,AQS 在併發中真的是處處出現,並且很是容易使用,寫少許的代碼就能實現本身須要的同步方式(對 AQS 源碼感興趣的讀者請參看我以前寫的幾篇文章)。
private final class Worker
extends AbstractQueuedSynchronizer implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L; // 這個是真正的線程,任務靠你啦 final Thread thread; // 前面說了,這裏的 Runnable 是任務。爲何叫 firstTask?由於在建立線程的時候,若是同時指定了 // 這個線程起來之後須要執行的第一個任務,那麼第一個任務就是存放在這裏的(線程可不止執行這一個任務) // 固然了,也能夠爲 null,這樣線程起來了,本身到任務隊列(BlockingQueue)中取任務(getTask 方法)就好了 Runnable firstTask; // 用於存放此線程徹底的任務數,注意了,這裏用了 volatile,保證可見性 volatile long completedTasks; // Worker 只有這一個構造方法,傳入 firstTask,也能夠傳 null Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 調用 ThreadFactory 來建立一個新的線程 this.thread = getThreadFactory().newThread(this); } // 這裏調用了外部類的 runWorker 方法 public void run() { runWorker(this); } ...// 其餘幾個方法沒什麼好看的,就是用 AQS 操做,來獲取這個線程的執行權,用了獨佔鎖
}
前面雖然囉嗦,可是簡單。有了上面的這些基礎後,咱們終於能夠看看 ThreadPoolExecutor 的 execute 方法了,前面源碼分析的時候也說了,各類方法都最終依賴於 execute 方法:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException(); // 前面說的那個表示 「線程池狀態」 和 「線程數」 的整數 int c = ctl.get(); // 若是當前線程數少於核心線程數,那麼直接添加一個 worker 來執行任務, // 建立一個新的線程,並把當前任務 command 做爲這個線程的第一個任務(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任務成功,那麼就結束了。提交任務嘛,線程池已經接受了這個任務,這個方法也就能夠返回了 // 至於執行的結果,到時候會包裝到 FutureTask 中。 // 返回 false 表明線程池不容許提交任務 if (addWorker(command, true)) return; c = ctl.get(); } // 到這裏說明,要麼當前線程數大於等於核心線程數,要麼剛剛 addWorker 失敗了 // 若是線程池處於 RUNNING 狀態,把這個任務添加到任務隊列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { /* 這裏面說的是,若是任務進入了 workQueue,咱們是否須要開啓新的線程 * 由於線程數在 [0, corePoolSize) 是無條件開啓新的線程 * 若是線程數已經大於等於 corePoolSize,那麼將任務添加到隊列中,而後進到這裏 */ int recheck = ctl.get(); // 若是線程池已不處於 RUNNING 狀態,那麼移除已經入隊的這個任務,而且執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 若是線程池仍是 RUNNING 的,而且線程數爲 0,那麼開啓新的線程 // 到這裏,咱們知道了,這塊代碼的真正意圖是:擔憂任務提交到隊列中了,可是線程都關閉了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是 workQueue 隊列滿了,那麼進入到這個分支 // 以 maximumPoolSize 爲界建立新的 worker, // 若是失敗,說明當前線程數已經達到 maximumPoolSize,執行拒絕策略 else if (!addWorker(command, false)) reject(command);
}
對建立線程的錯誤理解:若是線程數少於 corePoolSize,建立一個線程,若是線程數在 [corePoolSize, maximumPoolSize] 之間那麼能夠建立線程或複用空閒線程,keepAliveTime 對這個區間的線程有效。
從上面的幾個分支,咱們就能夠看出,上面的這段話是錯誤的。
上面這些一時半會也不可能所有消化搞定,咱們先繼續往下吧,到時候再回頭看幾遍。
這個方法很是重要 addWorker(Runnable firstTask, boolean core) 方法,咱們看看它是怎麼建立新的線程的:
// 第一個參數是準備提交給這個線程執行的任務,以前說了,能夠爲 null
// 第二個參數爲 true 表明使用核心線程數 corePoolSize 做爲建立線程的界線,也就說建立這個線程的時候,
// 若是線程池中的線程總數已經達到 corePoolSize,那麼不能響應此次建立線程的請求
// 若是是 false,表明使用最大線程數 maximumPoolSize 做爲界線
private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 這個很是很差理解 // 若是線程池已關閉,並知足如下條件之一,那麼不建立新的 worker: // 1. 線程池狀態大於 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED // 2. firstTask != null // 3. workQueue.isEmpty() // 簡單分析下: // 仍是狀態控制的問題,當線程池處於 SHUTDOWN 的時候,不容許提交任務,可是已有的任務繼續執行 // 當狀態大於 SHUTDOWN 時,不容許提交任務,且中斷正在執行的任務 // 多說一句:若是線程池處於 SHUTDOWN,可是 firstTask 爲 null,且 workQueue 非空,那麼是容許建立 worker 的 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 若是成功,那麼就是全部建立線程前的條件校驗都知足了,準備建立線程執行任務了 // 這裏失敗的話,說明有其餘線程也在嘗試往線程池中建立線程 if (compareAndIncrementWorkerCount(c)) break retry; // 因爲有併發,從新再讀取一下 ctl c = ctl.get(); // 正常若是是 CAS 失敗的話,進到下一個裏層的for循環就能夠了 // 但是若是是由於其餘線程的操做,致使線程池的狀態發生了變動,若有其餘線程關閉了這個線程池 // 那麼須要回到外層的for循環 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } /* * 到這裏,咱們認爲在當前這個時刻,能夠開始建立線程來執行任務了, * 由於該校驗的都校驗了,至於之後會發生什麼,那是之後的事,至少當前是知足條件的 */ // worker 是否已經啓動 boolean workerStarted = false; // 是否已將這個 worker 添加到 workers 這個 HashSet 中 boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; // 把 firstTask 傳給 worker 的構造方法 w = new Worker(firstTask); // 取 worker 中的線程對象,以前說了,Worker的構造方法會調用 ThreadFactory 來建立一個新的線程 final Thread t = w.thread; if (t != null) { // 這個是整個類的全局鎖,持有這個鎖才能讓下面的操做「瓜熟蒂落」, // 由於關閉一個線程池須要這個鎖,至少我持有鎖的期間,線程池不會被關閉 mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); // 小於 SHUTTDOWN 那就是 RUNNING,這個自沒必要說,是最正常的狀況 // 若是等於 SHUTDOWN,前面說了,不接受新的任務,可是會繼續執行等待隊列中的任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // worker 裏面的 thread 可不能是已經啓動的 if (t.isAlive()) throw new IllegalThreadStateException(); // 加到 workers 這個 HashSet 中 workers.add(w); int s = workers.size(); // largestPoolSize 用於記錄 workers 中的個數的最大值 // 由於 workers 是不斷增長減小的,經過這個值能夠知道線程池的大小曾經達到的最大值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加成功的話,啓動這個線程 if (workerAdded) { // 啓動線程 t.start(); workerStarted = true; } } } finally { // 若是線程沒有啓動,須要作一些清理工做,如前面 workCount 加了 1,將其減掉 if (! workerStarted) addWorkerFailed(w); } // 返回線程是否啓動成功 return workerStarted;
}
簡單看下 addWorkFailed 的處理:
// workers 中刪除掉相應的 worker
// workCount 減 1
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); // rechecks for termination, in case the existence of this worker was holding up termination tryTerminate(); } finally { mainLock.unlock(); }
}
回過頭來,繼續往下走。咱們知道,worker 中的線程 start 後,其 run 方法會調用 runWorker 方法:
// Worker 類的 run() 方法
public void run() {
runWorker(this);
}
繼續往下看 runWorker 方法:
// 此方法由 worker 線程啓動後調用,這裏用一個 while 循環來不斷地從等待隊列中獲取任務並執行
// 前面說了,worker 在初始化的時候,能夠指定 firstTask,那麼第一個任務也就能夠不須要從隊列中獲取
final void runWorker(Worker w) {
// Thread wt = Thread.currentThread(); // 該線程的第一個任務(若是有的話) Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 循環調用 getTask 獲取任務 while (task != null || (task = getTask()) != null) { w.lock(); // 若是線程池狀態大於等於 STOP,那麼意味着該線程也要中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 這是一個鉤子方法,留給須要的子類實現 beforeExecute(wt, task); Throwable thrown = null; try { // 到這裏終於能夠執行任務了 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { // 這裏不容許拋出 Throwable,因此轉換爲 Error thrown = x; throw new Error(x); } finally { // 也是一個鉤子方法,將 task 和異常做爲參數,留給須要的子類實現 afterExecute(task, thrown); } } finally { // 置空 task,準備 getTask 獲取下一個任務 task = null; // 累加完成的任務數 w.completedTasks++; // 釋放掉 worker 的獨佔鎖 w.unlock(); } } completedAbruptly = false; } finally { // 若是到這裏,須要執行線程關閉: // 1. 說明 getTask 返回 null,也就是說,這個 worker 的使命結束了,執行關閉 // 2. 任務執行過程當中發生了異常 // 第一種狀況,已經在代碼處理了將 workCount 減 1,這個在 getTask 方法分析中會說 // 第二種狀況,workCount 沒有進行處理,因此須要在 processWorkerExit 中處理 // 限於篇幅,我不許備分析這個方法了,感興趣的讀者請自行分析源碼 processWorkerExit(w, completedAbruptly); }
}
咱們看看 getTask() 是怎麼獲取任務的,這個方法寫得真的很好,每一行都很簡單,組合起來卻全部的狀況都想好了:
// 此方法有三種可能:
// 1. 阻塞直到獲取到任務返回。咱們知道,默認 corePoolSize 以內的線程是不會被回收的,
// 它們會一直等待任務
// 2. 超時退出。keepAliveTime 起做用的時候,也就是若是這麼多時間內都沒有任務,那麼應該執行關閉
// 3. 若是發生瞭如下條件,此方法必須返回 null:
// - 池中有大於 maximumPoolSize 個 workers 存在(經過調用 setMaximumPoolSize 進行設置)
// - 線程池處於 SHUTDOWN,並且 workQueue 是空的,前面說了,這種再也不接受新的任務
// - 線程池處於 STOP,不只不接受新的線程,連 workQueue 中的線程也再也不執行
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 兩種可能 // 1. rs == SHUTDOWN && workQueue.isEmpty() // 2. rs >= STOP if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // CAS 操做,減小工做線程數 decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); // 容許核心線程數內的線程回收,或當前線程數超過了核心線程數,那麼有可能發生超時關閉 timed = allowCoreThreadTimeOut || wc > corePoolSize; // 這裏 break,是爲了避免往下執行後一個 if (compareAndDecrementWorkerCount(c)) // 兩個 if 一塊兒看:若是當前線程數 wc > maximumPoolSize,或者超時,都返回 null // 那這裏的問題來了,wc > maximumPoolSize 的狀況,爲何要返回 null? // 換句話說,返回 null 意味着關閉線程。 // 那是由於有可能開發者調用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調小了 if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl // compareAndDecrementWorkerCount(c) 失敗,線程池中的線程數發生了改變 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } // wc <= maximumPoolSize 同時沒有超時 try { // 到 workQueue 中獲取任務 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 若是此 worker 發生了中斷,採起的方案是重試 // 解釋下爲何會發生中斷,這個讀者要去看 setMaximumPoolSize 方法, // 若是開發者將 maximumPoolSize 調小了,致使其小於當前的 workers 數量, // 那麼意味着超出的部分線程要被關閉。從新進入 for 循環,天然會有部分線程會返回 null timedOut = false; } }
}
到這裏,基本上也說完了整個流程,讀者這個時候應該回到 execute(Runnable command) 方法,看看各個分支,我把代碼貼過來一下:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException(); // 前面說的那個表示 「線程池狀態」 和 「線程數」 的整數 int c = ctl.get(); // 若是當前線程數少於核心線程數,那麼直接添加一個 worker 來執行任務, // 建立一個新的線程,並把當前任務 command 做爲這個線程的第一個任務(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任務成功,那麼就結束了。提交任務嘛,線程池已經接受了這個任務,這個方法也就能夠返回了 // 至於執行的結果,到時候會包裝到 FutureTask 中。 // 返回 false 表明線程池不容許提交任務 if (addWorker(command, true)) return; c = ctl.get(); } // 到這裏說明,要麼當前線程數大於等於核心線程數,要麼剛剛 addWorker 失敗了 // 若是線程池處於 RUNNING 狀態,把這個任務添加到任務隊列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { /* 這裏面說的是,若是任務進入了 workQueue,咱們是否須要開啓新的線程 * 由於線程數在 [0, corePoolSize) 是無條件開啓新的線程 * 若是線程數已經大於等於 corePoolSize,那麼將任務添加到隊列中,而後進到這裏 */ int recheck = ctl.get(); // 若是線程池已不處於 RUNNING 狀態,那麼移除已經入隊的這個任務,而且執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 若是線程池仍是 RUNNING 的,而且線程數爲 0,那麼開啓新的線程 // 到這裏,咱們知道了,這塊代碼的真正意圖是:擔憂任務提交到隊列中了,可是線程都關閉了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是 workQueue 隊列滿了,那麼進入到這個分支 // 以 maximumPoolSize 爲界建立新的 worker, // 若是失敗,說明當前線程數已經達到 maximumPoolSize,執行拒絕策略 else if (!addWorker(command, false)) reject(command);
}
上面各個分支中,有兩種狀況會調用 reject(command) 來處理任務,由於按照正常的流程,線程池此時不能接受這個任務,因此須要執行咱們的拒絕策略。接下來,咱們說一說 ThreadPoolExecutor 中的拒絕策略。
final void reject(Runnable command) {
// 執行拒絕策略 handler.rejectedExecution(command, this);
}
此處的 handler 咱們須要在構造線程池的時候就傳入這個參數,它是 RejectedExecutionHandler 的實例。
RejectedExecutionHandler 在 ThreadPoolExecutor 中有四個已經定義好的實現類可供咱們直接使用,固然,咱們也能夠實現本身的策略,不過通常也沒有必要。
// 只要線程池沒有被關閉,那麼由提交任務的線程本身來執行這個任務。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
}
// 無論怎樣,直接拋出 RejectedExecutionException 異常
// 這個是默認的策略,若是咱們構造線程池的時候不傳相應的 handler 的話,那就會指定使用這個
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
}
// 不作任何處理,直接忽略掉這個任務
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
}
// 這個相對霸道一點,若是線程池沒有被關閉的話,
// 把隊列隊頭的任務(也就是等待了最長時間的)直接扔掉,而後提交這個任務到等待隊列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
}
到這裏,ThreadPoolExecutor 的源碼算是分析結束了。單純從源碼的難易程度來講,ThreadPoolExecutor 的源碼還算是比較簡單的,只是須要咱們靜下心來好好看看罷了。
這節其實也不是分析 Executors 這個類,由於它僅僅是工具類,它的全部方法都是 static 的。
生成一個固定大小的線程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
最大線程數設置爲與核心線程數相等,此時 keepAliveTime 設置爲 0(由於這裏它是沒用的,即便不爲 0,線程池默認也不會回收 corePoolSize 內的線程),任務隊列採用 LinkedBlockingQueue,無界隊列。
過程分析:剛開始,每提交一個任務都建立一個 worker,當 worker 的數量達到 nThreads 後,再也不建立新的線程,而是把任務提交到 LinkedBlockingQueue 中,並且以後線程數始終爲 nThreads。
生成只有一個線程的固定線程池,這個更簡單,和上面的同樣,只要設置線程數爲 1 就能夠了:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
生成一個須要的時候就建立新的線程,同時能夠複用以前建立的線程(若是這個線程當前沒有任務)的線程池:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
核心線程數爲 0,最大線程數爲 Integer.MAX_VALUE,keepAliveTime 爲 60 秒,任務隊列採用 SynchronousQueue。
這種線程池對於任務能夠比較快速地完成的狀況有比較好的性能。若是線程空閒了 60 秒都沒有任務,那麼將關閉此線程並從線程池中移除。因此若是線程池空閒了很長時間也不會有問題,由於隨着全部的線程都會被關閉,整個線程池不會佔用任何的系統資源。
過程分析:我把 execute 方法的主體黏貼過來,讓你們看得明白些。鑑於 corePoolSize 是 0,那麼提交任務的時候,直接將任務提交到隊列中,因爲採用了 SynchronousQueue,因此若是是第一個任務提交的時候,offer 方法確定會返回 false,由於此時沒有任何 worker 對這個任務進行接收,那麼將進入到最後一個分支來建立第一個 worker。以後再提交任務的話,取決因而否有空閒下來的線程對任務進行接收,若是有,會進入到第二個 if 語句塊中,不然就是和第一個任務同樣,進到最後的 else if 分支。
int c = ctl.get();
// corePoolSize 爲 0,因此不會進到這個 if 分支
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; c = ctl.get();
}
// offer 若是有空閒線程恰好能夠接收此任務,那麼返回 true,不然返回 false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
SynchronousQueue 是一個比較特殊的 BlockingQueue,其自己不儲存任何元素,它有一個虛擬隊列(或虛擬棧),無論讀操做仍是寫操做,若是當前隊列中存儲的是與當前操做相同模式的線程,那麼當前操做也進入隊列中等待;若是是相反模式,則配對成功,從當前隊列中取隊頭節點。具體的信息,能夠看個人另外一篇關於 BlockingQueue 的文章。
我一貫不喜歡寫總結,由於我把全部須要表達的都寫在正文中了,寫小篇幅的總結並不能真正將話說清楚,本文的總結部分爲準備面試的讀者而寫,但願能幫到面試者或者沒有足夠的時間看徹底文的讀者。
java 線程池有哪些關鍵屬性?
corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler
corePoolSize 到 maximumPoolSize 之間的線程會被回收,固然 corePoolSize 的線程也能夠經過設置而獲得回收(allowCoreThreadTimeOut(true))。
workQueue 用於存聽任務,添加任務的時候,若是當前線程數超過了 corePoolSize,那麼往該隊列中插入任務,線程池中的線程會負責到隊列中拉取任務。
keepAliveTime 用於設置空閒時間,若是線程數超出了 corePoolSize,而且有些線程的空閒時間超過了這個值,會執行關閉這些線程的操做
rejectedExecutionHandler 用於處理當線程池不能執行此任務時的狀況,默認有拋出 RejectedExecutionException 異常、忽略任務、使用提交任務的線程來執行此任務和將隊列中等待最久的任務刪除,而後提交此任務這四種策略,默認爲拋出異常。
說說線程池中的線程建立時機?
若是當前線程數少於 corePoolSize,那麼提交任務的時候建立一個新的線程,並由這個線程執行這個任務;
若是當前線程數已經達到 corePoolSize,那麼將提交的任務添加到隊列中,等待線程池中的線程去隊列中取任務;
若是隊列已滿,那麼建立新的線程來執行任務,須要保證池中的線程數不會超過 maximumPoolSize,若是此時線程數超過了 maximumPoolSize,那麼執行拒絕策略。
Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 構造出來的線程池有什麼差異?
細說太長,往上滑一點點,在 Executors 的小節進行了詳盡的描述。
任務執行過程當中發生異常怎麼處理?
若是某個任務執行出現異常,那麼執行任務的線程會被關閉,而不是繼續接收其餘任務。而後會啓動一個新的線程來代替它。
何時會執行拒絕策略?
workers 的數量達到了 corePoolSize(任務此時須要進入任務隊列),任務入隊成功,與此同時線程池被關閉了,並且關閉線程池並無將這個任務出隊,那麼執行拒絕策略。這裏說的是很是邊界的問題,入隊和關閉線程池併發執行,讀者仔細看看 execute 方法是怎麼進到第一個 reject(command) 裏面的。
workers 的數量大於等於 corePoolSize,將任務加入到任務隊列,但是隊列滿了,任務入隊失敗,那麼準備開啓新的線程,但是線程數已經達到 maximumPoolSize,那麼執行拒絕策略。
由於本文實在太長了,因此我沒有說執行結果是怎麼獲取的,也沒有說關閉線程池相關的部分,這個就留給讀者吧。
本文篇幅是有點長,若是讀者發現什麼不對的地方,或者有須要補充的地方,請不吝提出,謝謝。
(全文完)