本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》(馬俊昌著),由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買:京東自營連接 html
![]()
上節,咱們初步探討了Java併發包中的任務執行服務,實際中,任務執行服務的主要實現機制是線程池,本節,咱們就來探討線程池。java
線程池,顧名思義,就是一個線程的池子,裏面有若干線程,它們的目的就是執行提交給線程池的任務,執行完一個任務後不會退出,而是繼續等待或執行新任務。線程池主要由兩個概念組成,一個是任務隊列,另外一個是工做者線程,工做者線程主體就是一個循環,循環從隊列中接受任務並執行,任務隊列保存待執行的任務。git
線程池的概念相似於生活中的一些排隊場景,好比在火車站排隊購票、在醫院排隊掛號、在銀行排隊辦理業務等,通常都由若干個窗口提供服務,這些服務窗口相似於工做者線程,而隊列的概念是相似的,只是,在現實場景中,每一個窗口常常有一個單獨的隊列,這種排隊難以公平,隨着信息化的發展,愈來愈多的排隊場合使用虛擬的統一隊列,通常都是先拿一個排隊號,而後按號依次服務。github
線程池的優勢是顯而易見的:編程
Java併發包中線程池的實現類是ThreadPoolExecutor,它繼承自AbstractExecutorService,實現了ExecutorService,基本用法與上節介紹的相似,咱們就不贅述了。不過,ThreadPoolExecutor有一些重要的參數,理解這些參數對於合理使用線程池很是重要,接來下,咱們探討這些參數。swift
ThreadPoolExecutor有多個構造方法,都須要一些參數,主要構造方法有:數組
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼
第二個構造方法多了兩個參數threadFactory和handler,這兩個參數通常不須要,第一個構造方法會設置默認值。微信
參數corePoolSize, maximumPoolSize, keepAliveTime, unit用於控制線程池中線程的個數,workQueue表示任務隊列,threadFactory用於對建立的線程進行一些配置,handler表示任務拒絕策略。下面咱們再來詳細探討下這些參數。多線程
線程池的大小主要與四個參數有關:併發
maximumPoolSize表示線程池中的最多線程數,線程的個數會動態變化,但這是最大值,無論有多少任務,都不會建立比這個值大的線程個數。
corePoolSize表示線程池中的核心線程個數,不過,這並非說,一開始就建立這麼多線程,剛建立一個線程池後,實際上並不會建立任何線程。
通常狀況下,有新任務到來的時候,若是當前線程個數小於corePoolSiz,就會建立一個新線程來執行該任務,須要說明的是,即便其餘線程如今也是空閒的,也會建立新線程。
不過,若是線程個數大於等於corePoolSiz,那就不會當即建立新線程了,它會先嚐試排隊,須要強調的是,它是"嘗試"排隊,而不是"阻塞等待"入隊,若是隊列滿了或其餘緣由不能當即入隊,它就不會排隊,而是檢查線程個數是否達到了maximumPoolSize,若是沒有,就會繼續建立線程,直到線程數達到maximumPoolSize。
keepAliveTime的目的是爲了釋放多餘的線程資源,它表示,當線程池中的線程個數大於corePoolSize時,額外空閒線程的存活時間,也就是說,一個非核心線程,在空閒等待新任務時,會有一個最長等待時間,即keepAliveTime,若是到了時間仍是沒有新任務,就會被終止。若是該值爲0,表示全部線程都不會超時終止。
這幾個參數除了能夠在構造方法中進行指定外,還能夠經過getter/setter方法進行查看和修改。
public void setCorePoolSize(int corePoolSize) public int getCorePoolSize() public int getMaximumPoolSize() public void setMaximumPoolSize(int maximumPoolSize) public long getKeepAliveTime(TimeUnit unit) public void setKeepAliveTime(long time, TimeUnit unit) 複製代碼
除了這些靜態參數,ThreadPoolExecutor還能夠查看關於線程和任務數的一些動態數字:
//返回當前線程個數
public int getPoolSize() //返回線程池曾經達到過的最大線程個數 public int getLargestPoolSize() //返回線程池自建立以來全部已完成的任務數 public long getCompletedTaskCount() //返回全部任務數,包括全部已完成的加上全部排隊待執行的 public long getTaskCount() 複製代碼
ThreadPoolExecutor要求的隊列類型是阻塞隊列BlockingQueue,咱們在76節介紹過多種BlockingQueue,它們均可以用做線程池的隊列,好比:
若是用的是無界隊列,須要強調的是,線程個數最多隻能達到corePoolSize,到達corePoolSize後,新的任務總會排隊,參數maximumPoolSize也就沒有意義了。
另外一面,對於SynchronousQueue,咱們知道,它沒有實際存儲元素的空間,當嘗試排隊時,只有正好有空閒線程在等待接受任務時,纔會入隊成功,不然,老是會建立新線程,直到達到maximumPoolSize。
若是隊列有界,且maximumPoolSize有限,則當隊列排滿,線程個數也達到了maximumPoolSize,這時,新任務來了,如何處理呢?此時,會觸發線程池的任務拒絕策略。
默認狀況下,提交任務的方法如execute/submit/invokeAll等會拋出異常,類型爲RejectedExecutionException。
不過,拒絕策略是能夠自定義的,ThreadPoolExecutor實現了四種處理方式:
它們都是ThreadPoolExecutor的public靜態內部類,都實現了RejectedExecutionHandler接口,這個接口的定義爲:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
複製代碼
當線程池不能接受任務時,調用其拒絕策略的rejectedExecution方法。
拒絕策略能夠在構造方法中進行指定,也能夠經過以下方法進行指定:
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) 複製代碼
默認的RejectedExecutionHandler是一個AbortPolicy實例,以下所示:
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
複製代碼
而AbortPolicy的rejectedExecution實現就是拋出異常,以下所示:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
複製代碼
咱們須要強調下,拒絕策略只有在隊列有界,且maximumPoolSize有限的狀況下才會觸發。
若是隊列無界,服務不了的任務老是會排隊,但這不見得是指望的,由於請求處理隊列可能會消耗很是大的內存,甚至引起內存不夠的異常。
若是隊列有界但maximumPoolSize無限,可能會建立過多的線程,佔滿CPU和內存,使得任何任務都難以完成。
因此,在任務量很是大的場景中,讓拒絕策略有機會執行是保證系統穩定運行很重要的方面。
線程池還能夠接受一個參數,ThreadFactory,它是一個接口,定義爲:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
複製代碼
這個接口根據Runnable建立一個Thread,ThreadPoolExecutor的默認實現是Executors類中的靜態內部類DefaultThreadFactory,主要就是建立一個線程,給線程設置一個名稱,設置daemon屬性爲false,設置線程優先級爲標準默認優先級,線程名稱的格式爲: pool-<線程池編號>-thread-<線程編號>。
若是須要自定義一些線程的屬性,好比名稱,能夠實現自定義的ThreadFactory。
線程個數小於等於corePoolSize時,咱們稱這些線程爲核心線程,默認狀況下:
不過,ThreadPoolExecutor有以下方法,能夠改變這個默認行爲。
//預先建立全部的核心線程
public int prestartAllCoreThreads() //建立一個核心線程,若是全部核心線程都已建立,返回false public boolean prestartCoreThread() //若是參數爲true,則keepAliveTime參數也適用於核心線程 public void allowCoreThreadTimeOut(boolean value) 複製代碼
類Executors提供了一些靜態工廠方法,能夠方便的建立一些預配置的線程池,主要方法有:
public static ExecutorService newSingleThreadExecutor() public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newCachedThreadPool() 複製代碼
newSingleThreadExecutor基本至關於調用:
public static ExecutorService newSingleThreadExecutor() {
return new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
只使用一個線程,使用無界隊列LinkedBlockingQueue,線程建立後不會超時終止,該線程順序執行全部任務。該線程池適用於須要確保全部任務被順序執行的場合。
newFixedThreadPool的代碼爲:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
使用固定數目的n個線程,使用無界隊列LinkedBlockingQueue,線程建立後不會超時終止。和newSingleThreadExecutor同樣,因爲是無界隊列,若是排隊任務過多,可能會消耗很是大的內存。
newCachedThreadPool的代碼爲:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複製代碼
它的corePoolSize爲0,maximumPoolSize爲Integer.MAX_VALUE,keepAliveTime是60秒,隊列爲SynchronousQueue。
它的含義是,當新任務到來時,若是正好有空閒線程在等待任務,則其中一個空閒線程接受該任務,不然就老是建立一個新線程,建立的總線程個數不受限制,對任一空閒線程,若是60秒內沒有新任務,就終止。
實際中,應該使用newFixedThreadPool仍是newCachedThreadPool呢?
在系統負載很高的狀況下,newFixedThreadPool能夠經過隊列對新任務排隊,保證有足夠的資源處理實際的任務,而newCachedThreadPool會爲每一個任務建立一個線程,致使建立過多的線程競爭CPU和內存資源,使得任何實際任務都難以完成,這時,newFixedThreadPool更爲適用。
不過,若是系統負載不過高,單個任務的執行時間也比較短,newCachedThreadPool的效率可能更高,由於任務能夠不經排隊,直接交給某一個空閒線程。
在系統負載可能極高的狀況下,二者都不是好的選擇,newFixedThreadPool的問題是隊列過長,而newCachedThreadPool的問題是線程過多,這時,應根據具體狀況自定義ThreadPoolExecutor,傳遞合適的參數。
關於提交給線程池的任務,咱們須要特別注意一種狀況,就是任務之間有依賴,這種狀況可能會出現死鎖。好比任務A,在它的執行過程當中,它給一樣的任務執行服務提交了一個任務B,但須要等待任務B結束。
若是任務A是提交給了一個單線程線程池,就會出現死鎖,A在等待B的結果,而B在隊列中等待被調度。
若是是提交給了一個限定線程個數的線程池,也有可能出現死鎖,咱們看個簡單的例子:
public class ThreadPoolDeadLockDemo {
private static final int THREAD_NUM = 5;
static ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
static class TaskA implements Runnable {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Future<?> future = executor.submit(new TaskB());
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("finished task A");
}
}
static class TaskB implements Runnable {
@Override
public void run() {
System.out.println("finished task B");
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
executor.execute(new TaskA());
}
Thread.sleep(2000);
executor.shutdown();
}
}
複製代碼
以上代碼使用newFixedThreadPool建立了一個5個線程的線程池,main程序提交了5個TaskA,TaskA會提交一個TaskB,而後等待TaskB結束,而TaskB因爲線程已被佔滿只能排隊等待,這樣,程序就會死鎖。
怎麼解決這種問題呢?
替換newFixedThreadPool爲newCachedThreadPool,讓建立線程再也不受限,這個問題就沒有了。
另外一個解決方法,是使用SynchronousQueue,它能夠避免死鎖,怎麼作到的呢?對於普通隊列,入隊只是把任務放到了隊列中,而對於SynchronousQueue來講,入隊成功就意味着已有線程接受處理,若是入隊失敗,能夠建立更多線程直到maximumPoolSize,若是達到了maximumPoolSize,會觸發拒絕機制,無論怎麼樣,都不會死鎖。咱們將建立executor的代碼替換爲:
static ExecutorService executor = new ThreadPoolExecutor(
THREAD_NUM, THREAD_NUM, 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
複製代碼
只是更改隊列類型,運行一樣的程序,程序不會死鎖,不過TaskA的submit調用會拋出異常RejectedExecutionException,由於入隊會失敗,而線程個數也達到了最大值。
本節介紹了線程池的基本概念,詳細探討了其主要參數的含義,理解這些參數對於合理使用線程池是很是重要的,對於相互依賴的任務,須要特別注意,避免出現死鎖。
ThreadPoolExecutor實現了生產者/消費者模式,工做者線程就是消費者,任務提交者就是生產者,線程池本身維護任務隊列。當咱們碰到相似生產者/消費者問題時,應該優先考慮直接使用線程池,而非從新發明輪子,本身管理和維護消費者線程及任務隊列。
在異步任務程序中,一種常見的場景是,主線程提交多個異步任務,而後有任務完成就處理結果,而且按任務完成順序逐個處理,對於這種場景,Java併發包提供了一個方便的方法,使用CompletionService,讓咱們下一節來探討它。
(與其餘章節同樣,本節全部代碼位於 github.com/swiftma/pro…)
未完待續,查看最新文章,敬請關注微信公衆號「老馬說編程」(掃描下方二維碼),從入門到高級,深刻淺出,老馬和你一塊兒探索Java編程及計算機技術的本質。用心原創,保留全部版權。