咱們通常不會選擇直接使用線程類Thread
進行多線程編程,而是使用更方便的線程池來進行任務的調度和管理。線程池就像共享單車,咱們只要在咱們有須要的時候去獲取就能夠了。甚至能夠說線程池更棒,咱們只須要把任務提交給它,它就會在合適的時候運行了。可是若是直接使用Thread
類,咱們就須要在每次執行任務時本身建立、運行、等待線程了,並且很難對線程進行總體的管理,這可不是一件輕鬆的事情。既然咱們已經有了線程池,那仍是把這些麻煩事交給線程池來處理吧。java
這篇文章將會從線程池的概念與通常使用入手,首先讓你們能夠了解線程池的基本使用方法,以後會介紹實踐中最經常使用的四種線程池。最後,咱們會經過對JDK源代碼的剖析深刻了解線程池的運行過程和具體設計,真正達到知其然而知其因此然的水平。雖然只要瞭解了API就能夠知足通常的平常使用了,可是隻有當咱們真正釐清了多線程相關的知識點,才能在面對多線程的實踐與面試問題時作到遊刃有餘、胸有成竹。面試
本文是一系列多線程文章中的第三篇,主要講解了線程池相關的知識,這個系列總共有十篇文章,前五篇暫定結構以下,感興趣的讀者能夠關注一下:編程
通常咱們最經常使用的線程池實現類是ThreadPoolExecutor
,咱們接下來會介紹這個類的基本使用方法。JDK已經對線程池作了比較好的封裝,相信這個過程會很是輕鬆。數組
既然線程池是一個Java類,那麼最直接的使用方法必定是new一個ThreadPoolExecutor
類的對象,例如ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() )
。那麼這個構造器的裏每一個參數是什麼意思呢?緩存
下面就是這個構造器的方法簽名:安全
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
複製代碼
各個參數分別表示下面的含義:bash
keepAliveTime
是時間數量,unit
是時間單位,單位加數量組成了最終的超時時間。這個超時時間表示若是線程池中包含了超過corePoolSize
數量的線程,則在有線程空閒的時間超過了超時時間時該線程就會被銷燬;線程池中的阻塞隊列專門用於存放待執行的任務,在ThreadPoolExecutor
中一個任務能夠經過兩種方式被執行:第一種是直接在建立一個新的Worker時被做爲第一個任務傳入,由這個新建立的線程來執行;第二種就是把任務放入一個阻塞隊列,等待線程池中的工做線程撈取任務進行執行。數據結構
上面提到的阻塞隊列是這樣的一種數據結構,它是一個隊列(相似於一個List),能夠存放0到N個元素。咱們能夠對這個隊列進行插入和彈出元素的操做,彈出操做能夠理解爲是一個獲取並從隊列中刪除一個元素的操做。當隊列中沒有元素時,對這個隊列的獲取操做將會被阻塞,直到有元素被插入時纔會被喚醒;當隊列已滿時,對這個隊列的插入操做將會被阻塞,直到有元素被彈出後纔會被喚醒。這樣的一種數據結構很是適合於線程池的場景,當一個工做線程沒有任務可處理時就會進入阻塞狀態,直到有新任務提交後才被喚醒。多線程
當建立了一個線程池以後咱們就能夠將任務提交到線程池中執行了。提交任務到線程池中至關簡單,咱們只要把原來傳入Thread
類構造器的Runnable
對象傳入線程池的execute
方法或者submit
方法就能夠了。execute
方法和submit
方法基本沒有區別,二者的區別只是submit
方法會返回一個Future
對象,用於檢查異步任務的執行狀況和獲取執行結果(異步任務完成後)。併發
咱們能夠先試試如何使用比較簡單的execute
方法,代碼例子以下:
public class ThreadPoolTest {
private static int count = 0;
public static void main(String[] args) throws Exception {
Runnable task = new Runnable() {
public void run() {
for (int i = 0; i < 1000000; ++i) {
synchronized (ThreadPoolTest.class) {
count += 1;
}
}
}
};
// 重要:建立線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
// 重要:向線程池提交兩個任務
threadPool.execute(task);
threadPool.execute(task);
// 等待線程池中的全部任務完成
threadPool.shutdown();
while (!threadPool.awaitTermination(1L, TimeUnit.MINUTES)) {
System.out.println("Not yet. Still waiting for termination");
}
System.out.println("count = " + count);
}
}
複製代碼
上面的代碼中爲了等待線程池中的全部任務執行完已經使用了shutdown()
方法,關閉線程池的方法主要有兩個:
shutdown()
,有序關閉線程池,調用後線程池會讓已經提交的任務完成執行,可是不會再接受新任務。shutdownNow()
,直接關閉線程池,線程池中正在運行的任務會被中斷,正在等待執行的任務不會再被執行,可是這些還在阻塞隊列中等待的任務會被做爲返回值返回。咱們能夠經過調用線程池對象上的一些方法來獲取線程池當前的運行信息,經常使用的方法有:
不少狀況下咱們也不會直接建立ThreadPoolExecutor
類的對象,而是根據須要經過Executors
的幾個靜態方法來建立特定用途的線程池。目前經常使用的線程池有四種:
Executors.newCachedThreadPool
方法建立Executors.newFixedThreadPool
方法建立Executors.newScheduledThreadPool
方法建立Executors.newSingleThreadExecutor
方法建立下面經過這些靜態方法的源碼來具體瞭解一下不一樣類型線程池的特性與適用場景。
JDK中的源碼咱們經過在IDE中進行跳轉能夠很方便地進行查看,下面就是Executors.newCachedThreadPool
方法中的源代碼。從代碼中咱們能夠看到,可緩存線程池其實也是經過直接建立ThreadPoolExecutor
類的構造器建立的,只是其中的參數都已經被設置好了,咱們能夠不用作具體的設置。因此咱們要觀察的重點就是在這個方法中具體產生了一個怎樣配置的ThreadPoolExecutor
對象,以及這樣的線程池適用於怎樣的場景。
從下面的代碼中,咱們能夠看到,傳入ThreadPoolExecutor
構造器的值有: - corePoolSize核心線程數爲0,表明線程池中的線程數能夠爲0 - maximumPoolSize最大線程數爲Integer.MAX_VALUE,表明線程池中最多能夠有無限多個線程 - 超時時間設置爲60秒,表示線程池中的線程在空閒60秒後會被回收 - 最後傳入的是一個SynchronousQueue
類型的阻塞隊列,表明每個新添加的任務都要立刻有一個工做線程進行處理
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複製代碼
因此可緩存線程池在添加任務時會優先使用空閒的線程,若是沒有就建立一個新線程,線程數沒有上限,因此每個任務都會立刻被分配到一個工做線程進行執行,不須要在阻塞隊列中等待;若是線程池長期閒置,那麼其中的全部線程都會被銷燬,節約系統資源。
傳入ThreadPoolExecutor
構造器的值有:
nThreads
,即線程池中的線程數量會保持在nThreads
,因此被稱爲「定長線程池」public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
定長線程池中的線程數會逐步增加到nThreads個,而且在以後空閒線程不會被釋放,線程數會一直保持在nThreads
個。若是添加任務時全部線程都處於忙碌狀態,那麼就會把任務添加到阻塞隊列中等待執行,阻塞隊列中任務的總數沒有上限。
與以前的兩個方法不一樣,Executors.newScheduledThreadPool
返回的是ScheduledExecutorService
接口對象,能夠提供延時執行、定時執行等功能。在線程池配置上有以下特色:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
複製代碼
延時任務線程池實現了ScheduledExecutorService
接口,主要用於須要延時執行和定時執行的狀況。
單線程線程池中只有一個工做線程,能夠保證添加的任務都以指定順序執行(先進先出、後進先出、優先級)。可是若是線程池裏只有一個線程,爲何咱們還要用線程池而不直接用Thread
呢?這種狀況下主要有兩種優勢:一是咱們能夠經過共享的線程池很方便地提交任務進行異步執行,而不用本身管理線程的生命週期;二是咱們可使用任務隊列並指定任務的執行順序,很容易作到任務管理的功能。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
複製代碼
經過前面的內容咱們其實已經能夠在代碼中使用線程池了,可是咱們爲何還要去深究線程池的內部實現呢?首先,可能有一個很功利性的目的就是爲了面試,在面試時若是能準確地說出一些底層的運行機制與原理那必定能夠成爲過程當中一個重要的亮點。
可是我認爲學習探究線程池的內部實現的做用絕對不只是如此,只有深刻了解並釐清了線程池的具體實現,咱們才能解決實踐中須要考慮的各類邊界條件。由於多線程編程所表明的併發編程並非一個固定的知識點,而是實踐中不斷在發展和完善的一個知識門類。咱們也許會須要同時考慮多個維度,最後獲得一個特定於應用場景的解決方案,這就要求咱們具有從細節着手構建出解決方案並作好各個考慮維度之間的取捨的能力。
並且我相信只要在某一個點上能突破到至關的深度,那麼之後從這個點上向外擴展就會容易得多。也許在剛開始咱們的探究會碰到很是大的阻力,可是咱們要相信,最後咱們能夠獲得的將不止是一個知識點而是一整個知識面。
在IDE中,例如IDEA裏,咱們能夠點擊咱們樣例代碼裏的ThreadPoolExecutor
類跳轉到JDK中ThreadPoolExecutor
類的源代碼。在源代碼中咱們能夠看到不少java.util.concurrent
包的締造者大牛「Doug Lea」所留下的各類註釋,下面的圖片就是該類源代碼的一個截圖。
這些註釋的內容很是有參考價值,建議有能力的讀者朋友能夠本身閱讀一遍。下面,咱們就一步步地抽絲剝繭,來揭開線程池類ThreadPoolExecutor
源代碼的神祕面紗。
在ThreadPoolExecutor
類定義的開頭,咱們能夠看到以下的幾行代碼:
// 控制變量,前3位表示狀態,剩下的數據位表示有效的線程數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer的位數減去3位狀態位就是線程數的位數
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是線程數的上限(含),即2^COUNT_BITS - 1個
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
複製代碼
第一行是一個用來做爲控制變量的整型值,即一個Integer。之因此要用AtomicInteger
類是由於要保證多線程安全,在本系列以後的文章中會對AtomicInteger
進行具體介紹。一個整型通常是32位,可是這裏的代碼爲了保險起見,仍是使用了Integer.SIZE
來表示整型的總位數。這裏的「位」指的是數據位(bit),在計算機中,8bit = 1字節,1024字節 = 1KB,1024KB = 1MB。每一位都是一個0或1的數字,咱們若是把整型想象成一個二進制(0或1)的數組,那麼一個Integer就是32個數字的數組。其中,前三個被用來表示狀態,那麼咱們就能夠表示2^3 = 8個不一樣的狀態了。剩下的29位二進制數字都會被用於表示當前線程池中有效線程的數量,上限就是(2^29 - 1)個,即常量CAPACITY
。
以後的部分列出了線程池的全部狀態:
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
複製代碼
在這裏能夠忽略數字後面的<< COUNT_BITS
,能夠把狀態簡單地理解爲前面的數字部分,這樣的簡化基本不影響結論。
各個狀態的解釋以下:
terminated()
方法terminated()
方法調用完成後進入這幾個狀態所對應的數字值是按照順序排列的,也就是說線程池的狀態只能從小到大變化,這也方便了經過數字比較來判斷狀態所在的階段,這種經過數字大小來比較狀態值的方法在ThreadPoolExecutor
的源碼中會有大量的使用。
下圖是這五個狀態之間的變化過程:
shutdown()
方法被直接調用,或者在線程池對象被GC回收時經過finalize()
方法隱式調用了shutdown()
方法時,線程池會進入SHUTDOWN狀態。該狀態下線程池仍然會繼續執行完阻塞隊列中的任務,只是再也不接受新的任務了。當隊列中的任務被執行完後,線程池中的線程也會被回收。當隊列和線程都被清空後,線程池將進入TIDYING狀態;shutdownNow()
方法,則線程池會進入STOP狀態。在STOP狀態下,線程池會直接清空阻塞隊列中待執行的任務,而後中斷全部正在進行中的任務並回收線程。當線程都被清空之後,線程池就會進入TIDYING狀態;terminated()
方法,該方法執行完後,線程池就會進入最終的TERMINATED狀態,完全結束。到這裏咱們就已經清楚地瞭解了線程從剛被建立時的RUNNING狀態一直到最終的TERMINATED狀態的整個生命週期了。那麼當咱們要向一個RUNNING狀態的線程池提交任務時會發生些什麼呢?
咱們通常會使用execute
方法提交咱們的任務,那麼線程池在這個過程當中作了什麼呢?在ThreadPoolExecutor
類的execute()
方法的源代碼中,咱們主要作了四件事:
addWorker
方法中的第一個參數是該線程的第一個任務,而第二個參數就是表明是否建立的是核心線程,在execute
方法中addWorker
總共被調用了三次,其中第一次傳入的是true,後兩次傳入的都是false;workQueue.offer()
方法將任務添加到阻塞隊列中等待執行;整體上的執行流程以下,下方的黑色同心圓表明流程結束:
這裏再重複一次阻塞隊列的定義,方便你們閱讀:線程池中的阻塞隊列專門用於存放待執行的任務,在
ThreadPoolExecutor
中一個任務能夠經過兩種方式被執行:第一種是直接在建立一個新的Worker時被做爲第一個任務傳入,由這個新建立的線程來執行;第二種就是把任務放入一個阻塞隊列,等待線程池中的工做線程撈取任務進行執行。
上面提到的阻塞隊列是這樣的一種數據結構,它是一個隊列(相似於一個List),能夠存放0到N個元素。咱們能夠對這個隊列進行插入和彈出元素的操做,彈出操做能夠理解爲是一個獲取並從隊列中刪除一個元素的操做。當隊列中沒有元素時,對這個隊列的獲取操做將會被阻塞,直到有元素被插入時纔會被喚醒;當隊列已滿時,對這個隊列的插入操做將會被阻塞,直到有元素被彈出後纔會被喚醒。這樣的一種數據結構很是適合於線程池的場景,當一個工做線程沒有任務可處理時就會進入阻塞狀態,直到有新任務提交後才被喚醒。
下面是帶有註釋的源代碼,你們能夠和上面的流程對照起來參考一下:
public void execute(Runnable command) {
// 檢查提交的任務是否爲空
if (command == null)
throw new NullPointerException();
// 獲取控制變量值
int c = ctl.get();
// 檢查當前線程數是否達到了核心線程數
if (workerCountOf(c) < corePoolSize) {
// 未達到核心線程數,則建立新線程
// 並將傳入的任務做爲該線程的第一個任務
if (addWorker(command, true))
// 添加線程成功則直接返回,不然繼續執行
return;
// 由於前面調用了耗時操做addWorker方法
// 因此線程池狀態有可能發生了改變,從新獲取狀態值
c = ctl.get();
}
// 判斷線程池當前狀態是不是運行中
// 若是是則調用workQueue.offer方法將任務放入阻塞隊列
if (isRunning(c) && workQueue.offer(command)) {
// 由於執行了耗時操做「放入阻塞隊列」,因此從新獲取狀態值
int recheck = ctl.get();
// 若是當前狀態不是運行中,則將剛纔放入阻塞隊列的任務拿出,若是拿出成功,則直接拒絕這個任務
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 若是線程池中沒有線程了,那就建立一個
addWorker(null, false);
}
// 若是放入阻塞隊列失敗(如隊列已滿),則添加一個線程
else if (!addWorker(command, false))
// 若是添加線程失敗(如已經達到了最大線程數),則拒絕任務
reject(command);
}
複製代碼
在前面execute
方法的代碼中咱們能夠看到線程池是經過addWorker
方法來向線程池中添加新線程的,那麼新的線程又是如何運行起來的呢?
這裏咱們暫時跳過addWorker
方法的詳細源代碼,由於雖然這個方法的代碼行數較多,可是功能相對比較直接,只是建立一個表明線程的Worker
類對象,並調用這個對象所對應線程對象的start()
方法。咱們知道一旦調用了Thread
類的start()
方法,則這個線程就會開始調用建立線程時傳入的Runnable
對象。從下面的Worker
類構造器源代碼能夠看出,Worker
類正是把本身(this指針)傳入了線程的構造器當中,那麼這個線程就會運行Worker
類的run()
方法了,這個run()
方法只執行了一行很簡單的代碼runWorker(this);
。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
複製代碼
咱們看到線程池中的線程在啓動時會調用對應的Worker
類的runWorker
方法,而這裏就是整個線程池任務執行的核心所在了。runWorker
方法中包含有一個相似無限循環的while語句,讓worker對象能夠不斷執行提交到線程池中的新任務。
你們能夠配合代碼上帶有的註釋來理解該方法的具體實現:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 將worker的狀態重置爲正常狀態,由於state狀態值在構造器中被初始化爲-1
w.unlock();
// 經過completedAbruptly變量的值判斷任務是否正常執行完成
boolean completedAbruptly = true;
try {
// 若是task爲null就經過getTask方法獲取阻塞隊列中的下一個任務
// getTask方法通常不會返回null,因此這個while相似於一個無限循環
// worker對象就經過這個方法的持續運行來不斷處理新的任務
while (task != null || (task = getTask()) != null) {
// 每一次任務的執行都必須獲取鎖來保證下方臨界區代碼的線程安全
w.lock();
// 若是狀態值大於等於STOP(狀態值是有序的,即STOP、TIDYING、TERMINATED)
// 且當前線程尚未被中斷,則主動中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 開始
try {
// 執行任務前處理操做,默認是一個空實現
// 在子類中能夠經過重寫來改變任務執行前的處理行爲
beforeExecute(wt, task);
// 經過thrown變量保存任務執行過程當中拋出的異常
// 提供給下面finally塊中的afterExecute方法使用
Throwable thrown = null;
try {
// *** 重要:實際執行任務的代碼
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 由於Runnable接口的run方法中不能拋出Throwable對象
// 因此要包裝成Error對象拋出
thrown = x; throw new Error(x);
} finally {
// 執行任務後處理操做,默認是一個空實現
// 在子類中能夠經過重寫來改變任務執行後的處理行爲
afterExecute(task, thrown);
}
} finally {
// 將循環變量task設置爲null,表示已處理完成
task = null;
// 累加當前worker已經完成的任務數
w.completedTasks++;
// 釋放while體中第一行獲取的鎖
w.unlock();
}
}
// 將completedAbruptly變量設置爲false,表示任務正常處理完成
completedAbruptly = false;
} finally {
// 銷燬當前的worker對象,並完成一些諸如完成任務數量統計之類的輔助性工做
// 在線程池當前狀態小於STOP的狀況下會建立一個新的worker來替換被銷燬的worker
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
在runWorker
方法的源代碼中有兩個比較重要的方法調用,一個是while條件中對getTask
方法的調用,一個是在方法的最後對processWorkerExit
方法的調用。下面是對這兩個方法更詳細的解釋。
getTask
方法在阻塞隊列中有待執行的任務時會從隊列中彈出一個任務並返回,若是阻塞隊列爲空,那麼就會阻塞等待新的任務提交到隊列中直到超時(在一些配置下會一直等待而不超時),若是在超時以前獲取到了新的任務,那麼就會將這個任務做爲返回值返回。
當getTask
方法返回null時會致使當前Worker退出,當前線程被銷燬。在如下狀況下getTask
方法纔會返回null:
setMaximumPoolSize
修改了最大線程數而致使的結果;processWorkerExit
方法會銷燬當前線程對應的Worker對象,並執行一些累加總處理任務數等輔助操做。但在線程池當前狀態小於STOP的狀況下會建立一個新的Worker來替換被銷燬的Worker,有興趣的讀者能夠自行參考processWorkerExit
方法源代碼。
到這裏咱們的線程池源代碼之旅就結束了,但願你們在看完這篇文章以後能對線程池的使用和運行都有一個大概的印象。爲何說只是有了一個大概的印象呢?由於我以爲不少沒有相關基礎的讀者讀到這裏可能還只是對線程池有了一個本身的認識,對其中的一些細節可能尚未徹底捕捉到。因此我建議你們在看完下面的總結以後不妨再返回到文章的開頭多讀幾遍,相信第二遍的閱讀能給你們帶來不同的體驗,由於我本身也是在第三次讀ThreadPoolExecutor
類的源代碼時才真正打通了其中的一些重要關節的。
在這篇文章中咱們從線程池的概念和基本使用方法提及,而後介紹了ThreadPoolExecutor
的構造器參數和經常使用的四種具體配置。最後的一大半篇幅咱們一塊兒在TheadPoolExecutor
類的源代碼中暢遊了一番,瞭解了從線程池的建立到任務執行的完整執行模型。
在瀏覽ThreadPoolExexutor
源碼的過程當中,有幾個點咱們其實並無徹底說清楚,好比對鎖的加鎖操做、對控制變量的屢次獲取、控制變量的AtomicInteger類型。在下一篇文章中,我將會介紹這些以鎖、volatile變量、CAS操做、AQS抽象類爲表明的一系列線程同步方法,歡迎感興趣的讀者繼續關注我後續發佈的文章~