線程的建立須要開闢虛擬機棧、本地方法棧、程序計數器等線程私有的內存空間,在線程銷燬時須要回收這些系統資源。頻繁的建立銷燬線程會浪費大量資源,使用線程池能夠更好的管理和協調線程的工做。緩存
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
…………
}
複製代碼
1)構造參數分析:
2)拒絕策略分析: ThreadPoolExecutor中提供了四個RejectedExecutionHandler策略。bash
3)建立線程池的其餘方式(不推薦):Executors這個線程池靜態工廠能夠建立三個線程池的包裝對象:ForkJoinPool、ThreadPoolExecutor、ScheduledThreadPoolExecutor。Executors中關於ThreadPoolExecutor的核心方法以下:併發
// SynchronousQueue是不存儲元素的阻塞隊列,而且maximumPoolSize爲Integer.MAX_VALUE
便是無界,當主線程提交任務速度高於CachedThreadPool的處理速度時會不斷建立線程,
極端狀況下會發生OOM
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
// keepAliveTime爲0意味着多餘的空閒線程會被馬上終止,LinkedBlockingQueue的默認容量
是Integer.MAX_VALUE即無界,極端狀況下會發生OOM
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
// LinkedBlockingQueue的默認容量是Integer.MAX_VALUE即無界,極端狀況下會發生OOM
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
複製代碼
看過了上述方法以後能夠發現這三個方法構造出來的線程池都存在OOM的風險。而且不能靈活的配置線程工廠和拒絕策略,因此不推薦使用Executors來建立線程池。ui
4)向線程池提交任務:有兩個方法execute()和submit()能夠向線程池提交任務。execute()方法用於提交不須要返回值的任務,沒法判斷任務是否被線程池執行成功。submit()方法用於提交有返回值的任務(Callable)。線程池會返回一個future類型對象,經過future的get()方法能夠獲取返回值,值得注意的是get()方法會阻塞當前線程直到任務完成。spa
5)關閉線程池:有兩個方法shutdown()和shutdownNow()能夠關閉線程池。它們的原理是遍歷線程池中的工做線程,而後逐個的調用線程的interrupt()方法來中斷線程(沒法響應中斷的線程沒法終止)。它們的區別在於shutdownNow()首先將線程池狀態設置爲STOP,而後嘗試中止全部線程;shutdown()是將線程池狀態設置爲SHOTDOWN,而後中斷全部沒有正在執行任務的線程。線程
當線程池接收到一個任務以後,執行流程以下圖:code
ThreadPoolExecutor執行示意圖:cdn
下面是ThreadPoolExecutor中execute()方法的核心代碼:對象
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 獲取用於返回線程數和線程池狀態的integer數值
int c = ctl.get();
// 一、若是工做線程數小於核心線程數,則建立任務並執行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 二、若是線程池處於RUNNING狀態則將任務加入隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 三、核心線程池和隊列都滿了,嘗試建立一個新的線程
else if (!addWorker(command, false))
// 四、若是建立失敗則執行拒絕策略
reject(command);
}
複製代碼
addWorker()主要是建立工做線程 -- 將任務包裝成Worker類。在一、3兩個步驟中建立線程時須要獲取全局鎖ReentrantLock避免被幹擾,噹噹前工做線程數大於等於corePoolSize以後幾乎全部的execute()都是在執行步驟2。 Worker在執行完任務以後還會循環獲取工做隊列的任務來執行while (task != null || (task = getTask()) != null)
,getTask()方法中獲取阻塞隊列中的任務(poll()或take(),若是核心線程會被銷燬或者當前線程數大於核心線程數則用poll()超時獲取)blog
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
: workQueue.take();
複製代碼
線程池的工做原理代碼在這裏就不具體分析了,下圖直觀的展現了線程池的工做原理。
想要合理的配置線程池首先須要分析任務特性:CPU密集型任務、IO密集型任務、混合型任務 .
CPU密集型任務:儘可能使用較小的線程池,通常爲CPU核心數+1。CPU密集型任務的CPU使用率很高,過多的線程數運行只能增長上下文切換的次數,所以會帶來額外的開銷。
IO密集型任務:使用稍大的線程池,通常爲2*CPU核心數。IO密集型任務CPU使用率並不高,可讓CPU在等待IO的時候去處理別的任務,充分利用CPU。
混合型任務:能夠將任務分紅IO密集型和CPU密集型任務,而後分別用不一樣的線程池去處理。只要分完以後兩個任務的執行時間相差不大,那麼就會比串行執行高效。若是劃分以後兩個任務執行時間相差甚遠,那麼最終的時間仍然取決於後執行完的任務,並且還要加上任務拆分與合併的開銷。
在線程池的實現中還涉及了不少併發包中的知識好比BlockingQueue、ReentrantLock、Condition等,在這裏就暫時不進行介紹了,後續會介紹它們。