上一篇從總體上介紹了Executor
接口,從上一篇咱們知道了Executor
框架的最頂層實現是ThreadPoolExecutor
類,Executors
工廠類中提供的newScheduledThreadPool
、newFixedThreadPool
、newCachedThreadPool
方法其實也只是ThreadPoolExecutor
的構造函數參數不一樣而已。經過傳入不一樣的參數,就能夠構造出適用於不一樣應用場景下的線程池,那麼它的底層原理是怎樣實現的呢,這篇就來介紹下ThreadPoolExecutor
線程池的運行過程。java
既然要講運行過程,那麼首先要了解下線程池的狀態分爲哪些?spring
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
ThreadPoolExecutor
代碼中定義了上面幾個變量:定義了一個volatile變量runState,以及其餘幾個表示狀態的常量。
runState
:初始狀態,表示當前線程池的運行狀態,它的值就是上面的那4個常量值之一緩存
RUNNING
:線程池接受新任務並執行隊列任務中...框架
SHUTDOWN
:再也不接受新任務,可是會繼續執行等待隊列Queued中的任務。當調用了shutdown()方法,會從 RUNNING -> SHUTDOWN函數
STOP
:再也不接受新任務,同時也不執行等待隊列Queued中的任務,而且會嘗試終止正在執行中的任務。當調用了shutdownNow()方法, 會從(RUNNING or SHUTDOWN) -> STOP學習
TERMINATED
:線程池中全部線程已經中止運行,其餘行爲同 STOP狀態。this
在講解運行過程前,咱們先看下ThreadPoolExecutor
中的幾個比較重要的成員變量:線程
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來保存等待中的任務,等待worker線程空閒時執行任務 private final ReentrantLock mainLock = new ReentrantLock(); //更新 poolSize, corePoolSize,maximumPoolSize, runState, and workers set 時須要持有這個鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來保存工做中的執行線程 private volatile long keepAliveTime; //超過corePoolSize外的線程空閒存活之間 private volatile boolean allowCoreThreadTimeOut; //是否對corePoolSize內的線程設置空閒存活時間 private volatile int corePoolSize; //核心線程數 private volatile int maximumPoolSize; //最大線程數(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列) private volatile int poolSize; //線程池中的當前線程數 private volatile RejectedExecutionHandler handler; //任務拒絕策略 private volatile ThreadFactory threadFactory; //線程工廠,用來新建線程 private int largestPoolSize; //記錄線程池中出現過的最大線程數大小 private long completedTaskCount; //已經執行完的線程數
這邊重點解釋下 corePoolSize
、maximumPoolSize
、workQueue
兩個變量,這兩個變量涉及到線程池中建立線程個數的一個策略。
corePoolSize
: 這個變量咱們能夠理解爲線程池的核心大小,舉個例子來講明(corePoolSize假設等於10,maximumPoolSize等於20):設計
先看下前一篇文章中的一個例子:代理
ExecutorService executor = Executors.newFixedThreadPool(3); IntStream.range(0, 6).forEach(i -> executor.execute(() -> { String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName); }));
上面代碼就是新建6個任務,而後扔到線程池中運行,輸出線程名稱,直到運行完畢。其中最核心的方法就是execute()
方法,雖然submit()
也能夠執行任務,但它底層也是調用execute()
方法,因此懂了execute()
的實現原理便可:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //1. if (runState == RUNNING && workQueue.offer(command)) { //2. if (runState != RUNNING || poolSize == 0) //3. ensureQueuedTaskHandled(command); //4. } else if (!addIfUnderMaximumPoolSize(command)) //5. reject(command); // is shutdown or saturated //6 } }
上面的代碼看起來邏輯有點複雜,咱們一個一個看,首先看上面1位置處:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
是一個或表達式,它分紅兩部分
addIfUnderCorePoolSize(command)
,這個方法是當線程數小於核心線程數時,用來新建線程執行任務(由於線程數小於corePoolSize時,直接新建線程來運行任務,無論當前線程池裏有沒有空閒的線程)。若是新建失敗,那麼進入if語句塊,成功了那麼execute方法就執行結束了,由於線程已經新建成功了,任務已經開始在線程池中運行。進入if語句塊後,看上面代碼2.if (runState == RUNNING && workQueue.offer(command))
if (!addIfUnderMaximumPoolSize(command))
,判斷新任務用新線程執行是否成功(注:這裏的新線程就是咱們上面講的 「借來的工人」 maximumPoolSize)繼續進到代碼塊3 的if語句塊if (runState != RUNNING || poolSize == 0)
, 由於新任務加入到等待隊列中了,這句判斷是爲了防止在將此任務添加進任務緩存隊列的同時其餘線程忽然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。若是是的話,應急處理加入的新任務 ensureQueuedTaskHandled(command)
。
咱們看下兩個關鍵方法的實現:
##### 1.addIfUnderCorePoolSize
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null; }
首先獲取鎖,由於涉及到線程池狀態的變化。而後再次判斷 if (poolSize < corePoolSize && runState == RUNNING)
,在execute()方法中咱們已經判斷過一次,這邊再次判斷是爲了防止其餘線程又新增了新線程或者調用了shutdown、shutdownNow方法,這邊起到了雙重檢查的一個效果。若是爲true
的話,進行t = addThread(firstTask)
新增線程執行任務。addThread方法裏面比較簡單,就是經過線程工廠建立線程thread,而後封裝到Worker對象中,加入到 workers隊列中,並執行線程,能夠把Worker對象當作是擁有一個線程的對象。
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); boolean workerStarted = false; if (t != null) { w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; try { t.start(); workerStarted = true; } } return t; }
這裏在介紹下Worker對象, 它實現了Runnable接口,你把它當成Runnable的一個代理類便可,最終也是執行它的run方法。只要注意一下Worker中的beforeExecute
和afterExecute
方法,這兩個方法在ThreadPoolExecutor中沒有具體實現,用戶能夠重寫這個方法和後面的afterExecute方法來進行一些統計信息,好比某個任務的執行時間等,而afterExecute方法還有一個Throwable t
參數,用戶能夠用來記錄一些異常信息,由於新線程中的異常時捕獲不到的,須要在afterExecute中記錄。
看起來這個是否是和spring 切面有點像,能夠看到 知識都是相通的。
看一下它的run方法:
public void run() { try { hasRun = true; Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { //1 runTask(task); task = null; } } finally { workerDone(this); } }
注意代碼塊1,能夠看到這邊在循環獲取任務,並執行,直到任務所有執行完畢。除了第一個任務,其餘任務都是經過getTask()
方法去取,這個方法是ThreadPoolExecutor中的一個方法。咱們猜一下,整個類中只有任務緩存隊列中保存了任務,應該就是去緩存隊列中取了。
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); //取任務 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //若是線程數大於核心池大小或者容許爲核心池線程設置空閒時間, //則經過poll取任務,若等待必定的時間取不到任務,則返回null r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { //若是沒取到任務,即r爲null,則判斷當前的worker是否能夠退出 if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); //中斷處於空閒狀態的worker return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
這裏有一個很是巧妙的設計方式,假如咱們來設計線程池,可能會有一個任務分派線程,當發現有線程空閒時,就從任務緩存隊列中取一個任務交給 空閒線程執行。可是在這裏,並無採用這樣的方式,由於這樣會要額外地對任務分派線程進行管理,無形地會增長難度和複雜度,這裏直接讓執行完任務的線程Worker去任務緩存隊列裏面取任務來執行,由於每個Worker裏面都包含了一個線程thread。
這個方法的實現思想和 addIfUnderCorePoolSize方法的實現思想很是類似,惟一的區別在於addIfUnderMaximumPoolSize方法是在線程 池中的線程數達到了核心池大小而且往任務隊列中添加任務失敗的狀況下執行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null; }
到這裏,大部分朋友應該對任務提交給線程池以後到被執行的整個過程有了一個基本的瞭解,下面總結一下:
這篇寫完了,後面會介紹一下任務緩存隊列的種類已經緩存的策略以及任務拒絕策略等。若是文章有什麼問題,歡迎你們指正,你們互相溝通,互相學習。