能夠說,線程池是Java併發場景中應用到的最多併發框架了。幾乎全部須要異步或者併發執行的任務程序均可以使用線程池。在開發過程當中,合理的使用線程池會帶來如下3個好處:前端
世界上沒有完美無瑕的事情,任何事情都有正反兩面。若是濫用線程池或者使用不當,也有可能帶來安全隱患。所以,必須合理的使用線程池才能使得收益最大化。接下來,咱們就來系統的瞭解線程池,以便可以達到「合理使用」的境界。java
當任務提交到線程池以後,線程池是如何處理這些任務的呢?它是這樣處理的的:數據庫
從以上圖中,能夠看見,當提交了一個新任務時,線程池的處理過程以下:安全
以上2/3步驟是否能夠調換順序呢?實際上,線程池之因此採用以上的設計思路,是由於,1/3步驟都是要獲取全局鎖的。若是任務頻繁提交執行,此時將加重鎖的競爭,而2步驟是不須要額外的全局鎖的競爭。
帶着以上的認知,咱們來剖析一下源碼:架構
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 首先獲取ctl變量的值(ctl變量及其重要,在這裏咱們只需知道它能夠同時表明線程數和線程池狀態便可)
int c = ctl.get();
// 基於ctl計算出當前的線程數量,若是小於設定的核心線程數。
if (workerCountOf(c) < corePoolSize) {
// 則經過addWorker建立一個線程並執行當前任務,addWorker方法內部須要獲取全局鎖
if (addWorker(command, true))
return;
// 若是addWorker返回失敗標誌,則從新獲取當前ctl的值。
c = ctl.get();
}
// 基於ctl獲取當前線程池狀態,若是是RUNNING狀態而且任務添加到隊列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 則從新檢查線程池的狀態,若是當前不是RUNNING狀態了,則移除當前任務。
if (! isRunning(recheck) && remove(command))
若是成功,則執行拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0) // 若是當前線程數爲0
// 則經過addWorker方法初始化ctl
addWorker(null, false);
}
// 若是失敗,則經過addWorker方法建立新的線程來執行任務,
// 若是addWorker方法返回false標誌,說明此時建立新的線程來執行任務失敗了。
// 此時說明線程池已滿,或者線程池已經不是RUNNING狀態了。
else if (!addWorker(command, false))
// 此時將執行拒絕策略
reject(command);
}
複製代碼
經過以上源碼解析,可以清晰的瞭解一個任務提交到線程池是如何處理的了。源碼中有兩個重要的地方尚未講解。一是ctl變量的做用;二是addWorker方法的解析。這兩個點能夠說是線程池的精髓所在了。併發
ctl變量是一個AtomicInteger類型,它包含了兩個概念:框架
In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise representable. If this is ever an issue in the future, the variable can be changed to be an AtomicLong, and the shift/mask constants below adjusted. But until the need arises, this code is a bit faster and simpler using an int.異步
以上大概意思就是:爲了讓有效線程數和線程池的狀態可以用一個int變量表示,將線程數限制在了2^29-1(約爲5億),這樣的話就能夠用低29位來表示有效線程數,高3位來表示線程的狀態。爲了更好的理解,咱們直接上源碼:函數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼
基於源碼,能夠得出它們的實際值:
COUNT_BITS = Integer.SIZE - 3;即:COUNT_BITS = 32 -3 = 29;
CAPACITY = (1 << 29) - 1;即:CAPACITY = 2^29 - 1 約等於5億;
RUNNING = -1 << 29; --> 高3位爲:111
SHUTDOWN = 0 << 29; --> 高3位爲:000
STOP = 1 << 29; --> 高3位爲:001
TIDYING = 2 << 29; --> 高3位爲:010
TERMINATED= 3 << 29; --> 高3位爲:011性能
有了以上的計算,咱們再來看:
runStateO()方法是獲取當前線程池狀態的方法,它的計算公式爲: ctl & ~ CAPACITY.
~ CAPACITY獲取到的值實際上就是高3位爲1,低29位爲0. 所以 ctl & ~ CAPACITY 獲得的實際上就是ctl高3位的值。
同理,workerCountOf()方法獲取到的實際上就是ctl低29位的值。表示爲當前有效的線程數。
經過以上解析,應該就能夠理解ctl變量的含義了!
在線程池中,線程並非Thread,而是基於Thread包裝成了一個Worker。Worker是ThreadPoolExecutor的一個內部類。通所以,addWorker()方法實際上就是基於當前線程池的狀態來決定是否構建Worker並執行。Worker執行萬當前任務後,並不會直接退出,而是循環獲取隊列中的任務來執行,從源碼中咱們能證實這個結論:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 若是task不爲空,則執行task。
// rugo task爲空,則從隊列中繼續獲取task。
while (task != null || (task = getTask()) != null) {
w.lock();
// 判斷線程池的狀態和當前task的中斷標誌,是否知足繼續執行的條件。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 鉤子函數:執行task以前的鉤子函數
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 鉤子函數:執行task以後調用的鉤子函數
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
從源碼中,咱們能夠得知,ThreadPoolExecutor框架預留了幾個鉤子函數.worker在執行任務的過程當中,會觸發鉤子函數,若是咱們須要執行一些特殊的業務(好比統計任務的執行時長),則能夠繼承ThreadPoolExecutor實現鉤子函數來達到特定業務的目的。
鉤子函數 | 說明 |
---|---|
beforeExecute() | 任務執行前觸發 |
afterExecute() | 任務執行後觸發 |
terminated() | 當線程池狀態變成TIDYING時,會觸發此方法 |
線程池一共有5種狀態。
狀態 | 解釋 |
---|---|
RUNNING | 運行狀態。當線程池建立成功後,線程池的狀態就是RUNNING狀態了。此狀態下能夠接收新的任務和執行隊列中的任務 |
SHUTDOWN | 此狀態下,線程池再也不接收新的任務了,可是會將執行中的任務和隊列中的任務執行完成。 |
STOP | 此狀態下,線程池再也不接收新的任務;再也不執行隊列中的任務;會中斷正在執行中的任務。 |
TIDYING | 此狀態下,線程池中的workCount=0,線程池將調用調用terminated()方法。 |
TERMINATED | 當terminated()方法執行完成以後,線程池狀態將變成TERMINATED,至此,線程池的生命週期完成。 |
線程池狀態的流轉:
TIP:線程池狀態的流轉和線程狀態的流轉是徹底不同的概念。
基於以上介紹,咱們對線程池的原理已經瞭然於胸。接下來,經過一個例子來看看實戰中線程池的使用方式和技巧。
建立線程池的方式有不少種。好比:
// 1.建立固定大小的線程池
Executors.newFixedThreadPool();
// 2.建立一個基於SynchronousQueue隊列的線程池.(此隊列的特性在前文是有解析)
Executors.newCachedThreadPool();
// 3.建立一個具備延時功能的線程池(實際上就是基於DelayQueue實現,此隊列在前文中解析)
Executors.newScheduledThreadPool();
// 4.建立一個只有一個線程的線程池
Executors.newSingleThreadExecutor();
複製代碼
以上是快捷建立線程池的的4種方式,其實若是咱們再深刻理解一波的話,能夠發現其實他們底層都是基於ThreadPoolExecutor提供的構造方法構建的線程池。這4種方式建立出來的線程池都具有必定的特性在裏面,若是對於隊列理解透徹的話,能夠發現,它們的本質其實就是選擇的隊列不一樣。從而能夠基於隊列提供的特色實現特殊的功能。
在實戰中,除非應用場景比較簡單,任務量不是很大的狀況下,咱們能夠採用這種快捷建立線程池的方式。但若是咱們在大中型工程中,則最好基於ThreadPoolExecutor自定義建立線程池,這樣能夠更加貼切實際的場景使用。
ThreadPoolExecutor提供瞭如下幾個構造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
複製代碼
能夠發現,經過原生構造器來構建線程池,是最靈活的。所以,在《阿里巴巴Java開發規範》中也強烈建議採用這種方式來建立線程池。每一個參數的含義以下:
參數名 | 含義 |
---|---|
corePoolSize | 核心線程數 |
maximumPoolSize | 最大線程數 |
keepAliveTime | 當線程數大於核心線程數,多餘的線程的存活時長,和unit配合使用 |
unit | 和keepAliveTime配合使用 |
workQueue | 工做隊列。當核心線程數滿時,任務會先入隊。若無特殊要求,應該儘可能選擇有界隊列!不然當任務激增時,有可能撐爆內存致使性能降低甚至崩潰。 |
threadFactory | 線程工廠類,用於建立線程。若是沒有指定,則採用默認的工廠類 |
handler | 任務拒絕策略。當線程池不能再接收新的任務時,將執行任務拒絕策略。若是沒有指定,則採用默認的拒絕策略。 |
當線程池滿了以後,將會執行拒絕策略。線程池提供了4種拒絕策略。以下:
拒絕策略 | 說明 |
---|---|
AbortPolicy | 當線程池滿時,將會拒絕接收新任務,並拋出RejectedExecutionException。若是沒有指定拒絕策略,此爲默認策略 |
CallerRunsPolicy | 當線程池滿時,將會使用任務提交者所在的線程來執行這個任務。線程池自己會丟棄掉這個任務。 |
DiscardPolicy | 當線程池滿時,將會默默地丟棄掉這個任務。tips:在實際開發當中,若是任務不影響業務,則能夠採用此策略,不然斷然不可採起這個策略。 |
DiscardOldestPolicy | 當線程池滿時,將會默默地丟棄掉隊列最前端的任務。而後執行提交的任務。tips:和以上同樣 |
關閉線程池是頗有必要的。當應用程序須要退出時,能夠經過註冊回調函數來關閉線程池。若是咱們暴力關閉應用程序的話,會致使正在執行的任務和隊列中的任務丟失。在企業工程中,這一點千萬注意。 線程池提供了兩種關閉方法:
關閉方式 | 說明 |
---|---|
shutdown() | 當調用此方法時,線程池狀態會變成SHUTDOWN狀態,此時線程池將不會再接收新的任務,但以及接收的任務會執行完畢。 |
shutdownNow() | 當調用此方法時,線程池狀態會變成STOP狀態,此時線程池將不會再接收新的任務,而且會中斷全部正在執行中的任務以及丟棄掉隊列中的任務。 |
在實際工程中,具體採起哪一種方式,應該根據實際狀況來抉擇。若是任務對業務有影響,則應當選擇shutdown(),不然能夠視狀況選擇shutdownNow()。
在Java應用中,線程屬於稀有資源。那麼線程數是設置的越大越好麼?非也。在計算機體系中,若是想讓性能發揮極致,應該是各個子系統之間的合理配置使用。對於線程數而言也是如此。要想合理的設置線程數,就必須首先分析人物的特性。能夠從如下幾個角度來分析:
咱們能夠根據任務的不一樣特性來綜合考慮線程數的設置。通常而言。若是是CPU密集型,則應該分配儘量小的線程數:一般狀況下,能夠設置爲CPU核數 + 1;若是是IO密集型,則線程並不老是在執行任務,則應該分配儘量大的線程數:一般狀況下,能夠設置爲2 * CPU核數;若是是混合型,則能夠將任務拆分紅一個CPU密集型和一個IO密集型,只要兩個任務執行的時間不會相差太大,則性能會比串行執行的效率要高,若是拆分後任務執行相差的時間過大,則沒有必要拆分。
經過以上的介紹,如何用好線程池應該不是問題。對於一個完善的應用而言,應當還要有良好的監控能力,以便在任務執行出現問題時,能夠快速的定位、分析、解決問題。
ThreadPoolExecutor提供了一些基本且好用的方法來監控線程池的運行狀況:
方法名 | 說明 |
---|---|
getTaskCount() | 線程池隊列中須要執行的任務數量 |
getCompletedTaskCount() | 線程池中已經執行完成的任務數量 |
getActiveCount() | 線程池中正在執行任務的線程數量 |
若是咱們想要更加全面的監控線程池的運行狀態以及任務的執行過程。能夠繼承ThreadPoolExecutor來自定義線程池。
本篇文章圍繞ThreadPoolExecutor,系統介紹了線程池的實現;以及實際項目中如何正確的使用線程池。經過本篇文章的寫做,本身對於線程池的認識有多了一些不同的感受。好比clt變量的設計真的很精妙。像Doug Lea大神致以崇高的敬意!