若是咱們要使用線程的時候就去建立一個,這樣雖然很是簡便,可是就會有一個問題:java
若是併發的線程數量不少,而且每一個線程都是執行一個時間很短的任務就結束了,這樣頻繁建立線程就會大大下降系統的效率,由於頻繁建立線程和銷燬線程須要時間。數組
那麼有沒有一種辦法使得線程能夠複用,就是執行完一個任務,並不被銷燬,而是能夠繼續執行其餘的任務?緩存
這時候Java中的線程池就能夠閃亮登場了。性能優化
先來看看Executor
的框架圖:架構
image併發
接口:Executor
,CompletionService
,ExecutorService
,ScheduledExecutorService
抽象類:AbstractExecutorService
實現類:ExecutorCompletionService
,ThreadPoolExecutor
,ScheduledThreadPoolExecutor
框架
從圖中就能夠看到主要的方法,本文主要討論的是ThreadPoolExecutor
分佈式
在ThreadPoolExecutor
類中提供了四個構造方法:ide
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
從上面的代碼能夠得知,ThreadPoolExecutor
繼承了AbstractExecutorService
類,並提供了四個構造器,事實上,經過觀察每一個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工做。微服務
下面解釋下一下構造器中各個參數的含義:
corePoolSize
:核心池的大小,這個參數跟後面講述的線程池的實現原理有很是大的關係。在建立了線程池後,默認狀況下,線程池中並無任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()
或者prestartCoreThread()
方法,從這2個方法的名字就能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立corePoolSize
個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲0
,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize
後,就會把到達的任務放到緩存隊列當中;
maximumPoolSize
:線程池最大線程數,這個參數也是一個很是重要的參數,它表示在線程池中最多能建立多少個線程;
keepAliveTime
:表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize
時,keepAliveTime
纔會起做用,直到線程池中的線程數不大於corePoolSize
,即當線程池中的線程數大於corePoolSize
時,若是一個線程空閒的時間達到keepAliveTime
,則會終止,直到線程池中的線程數不超過corePoolSize
。可是若是調用了allowCoreThreadTimeOut(boolean)
方法,在線程池中的線程數不大於corePoolSize
時,keepAliveTime
參數也會起做用,直到線程池中的線程數爲0
;
uni
t:參數keepAliveTime
的時間單位,有7
種取值,在TimeUnit
類中有7種靜態屬性:
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
workQueue
:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,通常來講,這裏的阻塞隊列有如下幾種選擇:ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;
ArrayBlockingQueue
和PriorityBlockingQueue
使用較少,通常使用LinkedBlockingQueue
和Synchronous
。線程池的排隊策略與BlockingQueue
有關。
threadFactory
:線程工廠,主要用來建立線程;handler
:表示當拒絕處理任務時的策略,有如下四種取值:ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程) ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
具體參數的配置與線程池的關係將在下一節講述。
從上面給出的ThreadPoolExecutor
類的代碼能夠知道,ThreadPoolExecutor
繼承了AbstractExecutorService
,咱們來看一下AbstractExecutorService
的實現:
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { }; public Future<?> submit(Runnable task) {}; public <T> Future<T> submit(Runnable task, T result) { }; public <T> Future<T> submit(Callable<T> task) { }; private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { }; }
AbstractExecutorService
是一個抽象類,它實現了ExecutorService
接口。
咱們接着看ExecutorService
接口的實現:
public interface ExecutorService extends Executor { void shutdown(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
而ExecutorService
又是繼承了Executor
接口,咱們看一下Executor
接口的實現:
public interface Executor { void execute(Runnable command); }
到這裏,你們應該明白了ThreadPoolExecutor
、AbstractExecutorService
、ExecutorService
和Executor
幾個之間的關係了。
Executor
是一個頂層接口,在它裏面只聲明瞭一個方法execute(Runnable)
,返回值爲void
,參數爲Runnable
類型,從字面意思能夠理解,就是用來執行傳進去的任務的;
而後ExecutorService
接口繼承了Executor
接口,並聲明瞭一些方法:submit
、invokeAll
、invokeAny
以及shutDown
等;
抽象類AbstractExecutorService
實現了ExecutorService
接口,基本實現了ExecutorService
中聲明的全部方法;
而後ThreadPoolExecutor
繼承了類AbstractExecutorService
。
在ThreadPoolExecutor
類中有幾個很是重要的方法:
execute() submit() shutdown() shutdownNow()
execute()
方法其實是Executor中聲明的方法,在ThreadPoolExecutor
進行了具體的實現,這個方法是ThreadPoolExecutor
的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。
submit()
方法是在ExecutorService
中聲明的方法,在AbstractExecutorService
就已經有了具體的實現,在ThreadPoolExecutor
中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()
方法不一樣,它可以返回任務執行的結果,去看submit()
方法的實現,會發現它實際上仍是調用的execute()
方法,只不過它利用了Future
來獲取任務執行結果
shutdown()
和shutdownNow()
是用來關閉線程池的。
還有不少其餘的方法:
好比:getQueue()
、getPoolSize()
、getActiveCount()
、getCompletedTaskCount()
等獲取與線程池相關屬性的方法,有興趣的朋友能夠自行查閱API
。
在上一節咱們從宏觀上介紹了ThreadPoolExecutor
,下面咱們來深刻解析一下線程池的具體實現原理,將從下面幾個方面講解:
線程池狀態
2.任務的執行
3.線程池中的線程初始化
4.任務緩存隊列及排隊策略
5.任務拒絕策略
6.線程池的關閉
7.線程池容量的動態調整
在ThreadPoolExecutor
中定義了一個volatile
變量,另外定義了幾個static final
變量表示線程池的各個狀態:
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
runState
表示當前線程池的狀態,它是一個volatile
變量用來保證線程之間的可見性;
下面的幾個static final
變量表示runState
可能的幾個取值。
當建立線程池後,初始時,線程池處於RUNNING
狀態;
若是調用了shutdown()
方法,則線程池處於SHUTDOWN
狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢;
若是調用了shutdownNow()
方法,則線程池處於STOP
狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務;
當線程池處於SHUTDOWN
或STOP
狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED
狀態。
在瞭解將任務提交給線程池到任務執行完畢整個過程以前,咱們先來看一下ThreadPoolExecutor
類中其餘的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來存放等待執行的任務 private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態鎖,對線程池狀態(好比線程池大小 //、runState等)的改變都要使用這個鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工做集 private volatile long keepAliveTime; //線程存貨時間 private volatile boolean allowCoreThreadTimeOut; //是否容許爲核心線程設置存活時間 private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列) private volatile int maximumPoolSize; //線程池最大能容忍的線程數 private volatile int poolSize; //線程池中當前的線程數 private volatile RejectedExecutionHandler handler; //任務拒絕策略 private volatile ThreadFactory threadFactory; //線程工廠,用來建立線程 private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數 private long completedTaskCount; //用來記錄已經執行完畢的任務個數
每一個變量的做用都已經標明出來了,這裏要重點解釋一下corePoolSize
、maximumPoolSize
、largestPoolSize
三個變量。
corePoolSize
在不少地方被翻譯成核心池大小,其實個人理解這個就是線程池的大小。舉個簡單的例子:
假若有一個工廠,工廠裏面有 10 個工人,每一個工人同時只能作一件任務。
所以只要當 10 個工人中有工人是空閒的,來了任務就分配給空閒的工人作;
當 10 個工人都有任務在作時,若是還來了任務,就把任務進行排隊等待;
若是說新任務數目增加的速度遠遠大於工人作任務的速度,那麼此時工廠主管可能會想補救措施,好比從新招 4 個臨時工人進來;
而後就將任務也分配給這 4 個臨時工人作;
若是說着 14 個工人作任務的速度仍是不夠,此時工廠主管可能就要考慮再也不接收新的任務或者拋棄前面的一些任務了。
當這 14 個工人當中有人空閒時,而新任務增加的速度又比較緩慢,工廠主管可能就考慮辭掉 4 個臨時工了,只保持原來的 10 個工人,畢竟請額外的工人是要花錢的。
這個例子中的corePoolSize
就是10
,而maximumPoolSize
就是14(10+4)
。
也就是說corePoolSize
就是線程池大小,maximumPoolSize
在我看來是線程池的一種補救措施,即任務量忽然過大時的一種補救措施。
不過爲了方便理解,在本文後面仍是將corePoolSize
翻譯成核心池大小。
largestPoolSize
只是一個用來起記錄做用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關係。
下面咱們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。
在ThreadPoolExecutor
類中,最核心的任務提交方法是execute()
方法,雖然經過submit
也能夠提交任務,可是實際上submit
方法裏面最終調用的仍是execute()
方法,因此咱們只須要研究execute()
方法的實現原理便可:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
上面的代碼可能看起來不是那麼容易理解,下面咱們一句一句解釋:
首先,判斷提交的任務command
是否爲null
,如果null
,則拋出空指針異常;
接着是這句,這句要好好理解一下:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
因爲是或條件運算符,因此先計算前半部分的值,若是線程池中當前線程數不小於核心池大小,那麼就會直接進入下面的if
語句塊了。
若是線程池中當前線程數小於核心池大小,則接着執行後半部分,也就是執行
addIfUnderCorePoolSize(command)
若是執行完addIfUnderCorePoolSize
這個方法返回false
,則繼續執行下面的if
語句塊,不然整個方法就直接執行完畢了。
若是執行完addIfUnderCorePoolSize
這個方法返回false
,而後接着判斷:
if (runState == RUNNING && workQueue.offer(command))
若是當前線程池處於RUNNING
狀態,則將任務放入任務緩存隊列;若是當前線程池不處於RUNNING
狀態或者任務放入緩存隊列失敗,則執行:
addIfUnderMaximumPoolSize(command)
若是執行addIfUnderMaximumPoolSize
方法失敗,則執行reject()
方法進行任務拒絕處理。
回到前面:
if (runState == RUNNING && workQueue.offer(command))
這句的執行,若是說當前線程池處於RUNNING
狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:
if (runState != RUNNING || poolSize == 0)
這句判斷是爲了防止在將此任務添加進任務緩存隊列的同時其餘線程忽然調用shutdown
或者shutdownNow
方法關閉了線程池的一種應急措施。若是是這樣就執行:
ensureQueuedTaskHandled(command)
進行應急處理,從名字能夠看出是保證 添加到任務緩存隊列中的任務獲得處理。
咱們接着看2
個關鍵方法的實現:addIfUnderCorePoolSize
和addIfUnderMaximumPoolSize
:
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); //建立線程去執行firstTask任務 } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
這個是addIfUnderCorePoolSize
方法的具體實現,從名字能夠看出它的意圖就是當低於核心吃大小時執行的方法。
下面看其具體實現,首先獲取到鎖,由於這地方涉及到線程池狀態的變化,先經過if
語句判斷當前線程池中的線程數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()
方法中不是已經判斷過了嗎,只有線程池當前線程數目小於核心池大小纔會執行addIfUnderCorePoolSize
方法的,爲什麼這地方還要繼續判斷?
緣由很簡單,前面的判斷過程當中並無加鎖,所以可能在execute
方法判斷的時候poolSize
小於corePoolSize
,而判斷完以後,在其餘線程中又向線程池提交了任務,就可能致使poolSize
不小於corePoolSize
了,因此須要在這個地方繼續判斷。而後接着判斷線程池的狀態是否爲RUNNING
,緣由也很簡單,由於有可能在其餘線程中調用了shutdown
或者shutdownNow
方法。
而後就是執行
t = addThread(firstTask);
這個方法也很是關鍵,傳進去的參數爲提交的任務,返回值爲Thread
類型。而後接着在下面判斷t
是否爲空,爲空則代表建立線程失敗(即poolSize>=corePoolSize
或者runState
不等於RUNNING
),不然調用t.start()
方法啓動線程。
咱們來看一下addThread
方法的實現:
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); //建立一個線程,執行任務 if (t != null) { w.thread = t; //將建立的線程的引用賦值爲w的成員變量 workers.add(w); int nt = ++poolSize; //當前線程數加1 if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
在addThread
方法中,首先用提交的任務建立了一個Worker
對象,而後調用線程工廠threadFactory
建立了一個新的線程t
,而後將線程t的引用賦值給了Worker
對象的成員變量thread
,接着經過workers.add(w)將Worker
對象添加到工做集當中。
下面咱們看一下Worker
類的實現:
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶能夠根據 //本身須要重載這個方法和後面的afterExecute方法來進行一些統計信息,好比某個任務的執行時間等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); //當任務隊列中沒有任務時,進行清理工做 } } }
它實際上實現了Runnable
接口,所以上面的Thread t = threadFactory.newThread(w)
;效果跟下面這句的效果基本同樣:
Thread t = new Thread(w);
至關於傳進去了一個Runnable
任務,在線程t中執行這個Runnable
。
既然Worker
實現了Runnable
接口,那麼天然最核心的方法即是run()
方法了:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
從run
方法的實現能夠看出,它首先執行的是經過構造器傳進來的任務firstTask
,在調用runTask()
執行完firstTask
以後,在while循環裏面不斷經過getTask()
去取新的任務來執行,那麼去哪裏取呢?天然是從任務緩存隊列裏面去取,getTask
是ThreadPoolExecutor
類中的方法,並非Worker
類中的方法,下面是getTask
方法的實現:
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //若是線程數大於核心池大小或者容許爲核心池線程設置空閒時間, //則經過poll取任務,若等待必定的時間取不到任務,則返回null r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { //若是沒取到任務,即r爲null,則判斷當前的worker是否能夠退出 if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); //中斷處於空閒狀態的worker return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
在getTask
中,先判斷當前線程池狀態,若是runState
大於SHUTDOWN
(即爲STOP
或者TERMINATED
),則直接返回null
。
若是runState
爲SHUTDOWN
或者RUNNING
,則從任務緩存隊列取任務。
若是當前線程池的線程數大於核心池大小corePoolSize
或者容許爲核心池中的線程設置空閒存活時間,則調用poll(time,timeUnit)
來取任務,這個方法會等待必定的時間,若是取不到任務就返回null
。
而後判斷取到的任務r是否爲null
,爲null
則經過調用workerCanExit()
方法來判斷當前worker
是否能夠退出,咱們看一下workerCanExit()
的實現:
private boolean workerCanExit() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean canExit; //若是runState大於等於STOP,或者任務緩存隊列爲空了 //或者 容許爲核心池線程設置空閒存活時間而且線程池中的線程數目大於1 try { canExit = runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1, corePoolSize)); } finally { mainLock.unlock(); } return canExit; }
也就是說若是線程池處於STOP
狀態、或者任務隊列已爲空或者容許爲核心池線程設置空閒存活時間而且線程數大於1
時,容許worker
退出。若是容許worker
退出,則調用interruptIdleWorkers()
中斷處於空閒狀態的worker
,咱們看一下interruptIdleWorkers()
的實現:
void interruptIdleWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) //實際上調用的是worker的interruptIfIdle()方法 w.interruptIfIdle(); } finally { mainLock.unlock(); } }
從實現能夠看出,它實際上調用的是worker
的interruptIfIdle()
方法,在worker
的interruptIfIdle()
方法中:
void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { //注意這裏,是調用tryLock()來獲取鎖的,由於若是當前worker正在執行任務,鎖已經被獲取了,是沒法獲取到鎖的 //若是成功獲取了鎖,說明當前worker處於空閒狀態 try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } }
這裏有一個很是巧妙的設計方式,假如咱們來設計線程池,可能會有一個任務分派線程,當發現有線程空閒時,就從任務緩存隊列中取一個任務交給空閒線程執行。可是在這裏,並無採用這樣的方式,由於這樣會要額外地對任務分派線程進行管理,無形地會增長難度和複雜度,這裏直接讓執行完任務的線程去任務緩存隊列裏面取任務來執行。
咱們再看addIfUnderMaximumPoolSize
方法的實現,這個方法的實現思想和addIfUnderCorePoolSize
方法的實現思想很是類似,惟一的區別在於addIfUnderMaximumPoolSize
方法是在線程池中的線程數達到了核心池大小而且往任務隊列中添加任務失敗的狀況下執行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
看到沒有,其實它和addIfUnderCorePoolSize
方法的實現基本如出一轍,只是if
語句判斷條件中的poolSize
< maximumPoolSize
不一樣而已。
到這裏,大部分朋友應該對任務提交給線程池以後到被執行的整個過程有了一個基本的瞭解,下面總結一下:
1)首先,要清楚corePoolSize
和maximumPoolSize
的含義;
2)其次,要知道Worker
是用來起到什麼做用的;
3)要知道任務提交給線程池以後的處理策略,這裏總結一下主要有4
點:
corePoolSize
,則每來一個任務,就會建立一個線程去執行這個任務;corePoolSize
,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;maximumPoolSize
,則會採起任務拒絕策略進行處理;corePoolSize
時,若是某線程空閒時間超過keepAliveTime
,線程將被終止,直至線程池中的線程數目不大於corePoolSize
;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime
,線程也會被終止。默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。
在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:
prestartCoreThread()
:初始化一個核心線程;
prestartAllCoreThreads()
:初始化全部核心線程
下面是這2
個方法的實現:
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意傳進去的參數是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null ++n; return n; }
注意上面傳進去的參數是null
,根據第2
小節的分析可知若是傳進去的參數爲null
,則最後執行線程會阻塞在getTask
方法中的
`r = workQueue.take()`;
即等待任務隊列中有任務。
在前面咱們屢次提到了任務緩存隊列,即workQueue
,它用來存放等待執行的任務。
workQueue
的類型爲BlockingQueue<Runnable>
,一般能夠取下面三種類型:
1)ArrayBlockingQueue
:基於數組的先進先出隊列,此隊列建立時必須指定大小;
2)LinkedBlockingQueue
:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE
;
3)synchronousQueue
:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize
,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程) ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
ThreadPoolExecutor
提供了兩個方法,用於線程池的關閉,分別是shutdown()
和shutdownNow()
,其中:
shutdown()
:不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務shutdownNow()
:當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務ThreadPoolExecutor
提供了動態調整線程池容量大小的方法:setCorePoolSize()
和setMaximumPoolSize()
,
setCorePoolSize
:設置核心池大小
setMaximumPoolSize
:設置線程池最大能建立的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor
進行線程賦值,還可能當即建立新的線程來執行任務。
前面咱們討論了關於線程池的實現原理,這一節咱們來看一下它的具體使用:
public class Test { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); for(int i=0;i<15;i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+ executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount()); } executor.shutdown(); } } class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println("正在執行task "+taskNum); try { Thread.currentThread().sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task "+taskNum+"執行完畢"); } }
執行結果:
正在執行task 0 線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task 1 線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task 2 線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task 3 線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task 4 線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task 10 線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task 11 線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task 12 線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task 13 線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task 14 task 3執行完畢 task 0執行完畢 task 2執行完畢 task 1執行完畢 正在執行task 8 正在執行task 7 正在執行task 6 正在執行task 5 task 4執行完畢 task 10執行完畢 task 11執行完畢 task 13執行完畢 task 12執行完畢 正在執行task 9 task 14執行完畢 task 8執行完畢 task 5執行完畢 task 7執行完畢 task 6執行完畢 task 9執行完畢
從執行結果能夠看出,當線程池中線程的數目大於5
時,便將任務放入任務緩存隊列裏面,當任務緩存隊列滿了以後,便建立新的線程。若是上面程序中,將for
循環中改爲執行20
個任務,就會拋出任務拒絕異常了。
不過在java doc
中,並不提倡咱們直接使用ThreadPoolExecutor
,而是使用Executors
類中提供的幾個靜態方法來建立線程池:
Executors.newCachedThreadPool(); //建立一個緩衝池,緩衝池容量大小爲Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //建立容量爲1的緩衝池 Executors.newFixedThreadPool(int); //建立固定容量大小的緩衝池
下面是這三個靜態方法的具體實現;
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor
,只不過參數都已配置好了。
newFixedThreadPool
建立的線程池corePoolSize
和maximumPoolSize
值是相等的,它使用的LinkedBlockingQueue
;
newSingleThreadExecutor
將corePoolSize
和maximumPoolSize
都設置爲1
,也使用的LinkedBlockingQueue
;
newCachedThreadPool
將corePoolSize
設置爲0
,將maximumPoolSize
設置爲Integer.MAX_VALUE
,使用的SynchronousQueue
,也就是說來了任務就建立線程運行,當線程空閒超過60
秒,就銷燬線程。
實際中,若是Executors
提供的三個靜態方法能知足要求,就儘可能使用它提供的三個方法,由於本身去手動配置ThreadPoolExecutor
的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
另外,若是ThreadPoolExecutor
達不到要求,能夠本身繼承ThreadPoolExecutor
類進行重寫。
學習永不止步,在看到這篇文章以前我想你對線程池的理解沒有這麼深,因此說學習更重要的是一個氛圍和對比,知作別人有多牛逼,纔會發現本身有多low筆。所以我給你們推薦一個Java架構羣:895244712,裏面
有分佈式,微服務,性能優化等技術點底層原理的視頻,也有衆多想要提高的小夥伴討論技術,歡迎你們加羣一塊兒交流學習。
再來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。
通常須要根據任務的類型來配置線程池大小:
若是是CPU
密集型任務,就須要儘可能壓榨CPU
,參考值能夠設爲NCPU+1
若是是IO
密集型任務,參考值能夠設置爲2*NCPU
固然,這只是一個參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。
轉自:https://www.jianshu.com/p/122224675765