(手機橫屏看源碼更方便)java
注:java源碼分析部分如無特殊說明均基於 java8 版本。ide
注:線程池源碼部分如無特殊說明均指ThreadPoolExecutor類。oop
前面咱們一塊兒學習了Java中線程池的體系結構、構造方法和生命週期,本章咱們一塊兒來學習線程池中普通任務究竟是怎麼執行的。源碼分析
建議學習本章前先去看看彤哥以前寫的《死磕 java線程系列之本身動手寫一個線程池》那兩章,有助於理解本章的內容,且那邊的代碼比較短小,學起來相對容易一些。學習
(1)線程池中的普通任務是怎麼執行的?this
(2)任務又是在哪裏被執行的?線程
(3)線程池中有哪些主要的方法?翻譯
(4)如何使用Debug模式一步一步調試線程池?debug
咱們建立一個線程池,它的核心數量爲5,最大數量爲10,空閒時間爲1秒,隊列長度爲5,拒絕策略打印一句話。調試
若是使用它運行20個任務,會是什麼結果呢?
public class ThreadPoolTest01 { public static void main(String[] args) { // 新建一個線程池 // 核心數量爲5,最大數量爲10,空閒時間爲1秒,隊列長度爲5,拒絕策略打印一句話 ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(currentThreadName() + ", discard task"); } }); // 提交20個任務,注意觀察num for (int i = 0; i < 20; i++) { int num = i; threadPool.execute(()->{ try { System.out.println(currentThreadName() + ", "+ num + " running, " + System.currentTimeMillis()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } private static String currentThreadName() { return Thread.currentThread().getName(); } }
構造方法的7個參數咱們就不詳細解釋了,有興趣的能夠看看《死磕 java線程系列之線程池深刻解析——構造方法》那章。
咱們一塊兒來看看一次運行的結果:
pool-1-thread-1, 0 running, 1572678434411 pool-1-thread-3, 2 running, 1572678434411 pool-1-thread-2, 1 running, 1572678434411 pool-1-thread-4, 3 running, 1572678434411 pool-1-thread-5, 4 running, 1572678434411 pool-1-thread-6, 10 running, 1572678434412 pool-1-thread-7, 11 running, 1572678434412 pool-1-thread-8, 12 running, 1572678434412 main, discard task main, discard task main, discard task main, discard task main, discard task // 【本文由公從號「彤哥讀源碼」原創】 pool-1-thread-9, 13 running, 1572678434412 pool-1-thread-10, 14 running, 1572678434412 pool-1-thread-3, 5 running, 1572678436411 pool-1-thread-1, 6 running, 1572678436411 pool-1-thread-6, 7 running, 1572678436412 pool-1-thread-2, 8 running, 1572678436412 pool-1-thread-7, 9 running, 1572678436412
注意,觀察num值的打印信息,先是打印了0~4,再打印了10~14,最後打印了5~9,居然不是按順序打印的,爲何呢?
讓咱們一步一步debug進去查看。
execute()方法是線程池提交任務的方法之一,也是最核心的方法。
// 提交任務,任務並不是當即執行,因此翻譯成執行任務彷佛不太合適 public void execute(Runnable command) { // 任務不能爲空 if (command == null) throw new NullPointerException(); // 控制變量(高3位存儲狀態,低29位存儲工做線程的數量) int c = ctl.get(); // 1. 若是工做線程數量小於核心數量 if (workerCountOf(c) < corePoolSize) { // 就添加一個工做線程(核心) if (addWorker(command, true)) return; // 從新獲取下控制變量 c = ctl.get(); } // 2. 若是達到了核心數量且線程池是運行狀態,任務入隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次檢查線程池狀態,若是不是運行狀態,就移除任務並執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 容錯檢查工做線程數量是否爲0,若是爲0就建立一個 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3. 任務入隊列失敗,嘗試建立非核心工做線程 else if (!addWorker(command, false)) // 非核心工做線程建立失敗,執行拒絕策略 reject(command); }
關於線程池狀態的內容,咱們這裏不拿出來細講了,有興趣的能夠看看《死磕 java線程系列之線程池深刻解析——生命週期》那章。
提交任務的過程大體以下:
(1)工做線程數量小於核心數量,建立核心線程;
(2)達到核心數量,進入任務隊列;
(3)任務隊列滿了,建立非核心線程;
(4)達到最大數量,執行拒絕策略;
其實,就是三道坎——核心數量、任務隊列、最大數量,這樣就比較好記了。
流程圖大體以下:
任務流轉的過程咱們知道了,可是任務是在哪裏執行的呢?繼續往下看。
這個方法主要用來建立一個工做線程,並啓動之,其中會作線程池狀態、工做線程數量等各類檢測。
private boolean addWorker(Runnable firstTask, boolean core) { // 判斷有沒有資格建立新的工做線程 // 主要是一些狀態/數量的檢查等等 // 這段代碼比較複雜,能夠先跳過 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 線程池狀態檢查 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 工做線程數量檢查 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 數量加1並跳出循環 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 若是上面的條件知足,則會把工做線程數量加1,而後執行下面建立線程的動做 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 建立工做線程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次檢查線程池的狀態 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 添加到工做線程隊列 workers.add(w); // 還在池子中的線程數量(只能在mainLock中使用) int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 標記線程添加成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 線程添加成功以後啓動線程 t.start(); workerStarted = true; } } } finally { // 線程啓動失敗,執行失敗方法(線程數量減1,執行tryTerminate()方法等) if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
這裏其實還沒到任務執行的地方,上面咱們能夠看到線程是包含在Worker這個類中的,那麼,咱們就跟蹤到這個類中看看。
Worker內部類能夠看做是對工做線程的包裝,通常地,咱們說工做線程就是指Worker,但其實是指其維護的Thread實例。
// Worker繼承自AQS,自帶鎖的屬性 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 真正工做的線程 final Thread thread; // 第一個任務,從構造方法傳進來 Runnable firstTask; // 完成任務數 volatile long completedTasks; // 構造方法// 【本文由公從號「彤哥讀源碼」原創】 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 使用線程工廠生成一個線程 // 注意,這裏把Worker自己做爲Runnable傳給線程 this.thread = getThreadFactory().newThread(this); } // 實現Runnable的run()方法 public void run() { // 調用ThreadPoolExecutor的runWorker()方法 runWorker(this); } // 省略鎖的部分 }
這裏要可以看出來工做線程Thread啓動的時候實際是調用的Worker的run()方法,進而調用的是ThreadPoolExecutor的runWorker()方法。
runWorker()方法是真正執行任務的地方。
final void runWorker(Worker w) { // 工做線程 Thread wt = Thread.currentThread(); // 任務 Runnable task = w.firstTask; w.firstTask = null; // 強制釋放鎖(shutdown()裏面有加鎖) // 這裏至關於無視那邊的中斷標記 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 取任務,若是有第一個任務,這裏先執行第一個任務 // 只要能取到任務,這就是個死循環 // 正常來講getTask()返回的任務是不可能爲空的,由於前面execute()方法是有空判斷的 // 那麼,getTask()何時纔會返回空任務呢? while (task != null || (task = getTask()) != null) { w.lock(); // 檢查線程池的狀態 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 鉤子方法,方便子類在任務執行前作一些處理 beforeExecute(wt, task); Throwable thrown = null; try { // 真正任務執行的地方 task.run(); // 異常處理 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 鉤子方法,方便子類在任務執行後作一些處理 afterExecute(task, thrown); } } finally { // task置爲空,從新從隊列中取 task = null; // 完成任務數加1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 到這裏確定是上面的while循環退出了 processWorkerExit(w, completedAbruptly); } }
這個方法比較簡單,忽略狀態檢測和鎖的內容,若是有第一個任務,就先執行之,以後再從任務隊列中取任務來執行,獲取任務是經過getTask()來進行的。
從隊列中獲取任務的方法,裏面包含了對線程池狀態、空閒時間等的控制。
private Runnable getTask() { // 是否超時 boolean timedOut = false; // 死循環 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 線程池狀態是SHUTDOWN的時候會把隊列中的任務執行完直到隊列爲空 // 線程池狀態是STOP時當即退出 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 工做線程數量// 【本文由公從號「彤哥讀源碼」原創】 int wc = workerCountOf(c); // 是否容許超時,有兩種狀況: // 1. 是容許核心線程數超時,這種就是說全部的線程均可能超時 // 2. 是工做線程數大於了核心數量,這種確定是容許超時的 // 注意,非核心線程是必定容許超時的,這裏的超時實際上是指取任務超時 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 超時判斷(還包含一些容錯判斷) if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 超時了,減小工做線程數量,並返回null if (compareAndDecrementWorkerCount(c)) return null; // 減小工做線程數量失敗,則重試 continue; } try { // 真正取任務的地方 // 默認狀況下,只有當工做線程數量大於核心線程數量時,纔會調用poll()方法觸發超時調用 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 取到任務了就正常返回 if (r != null) return r; // 沒取到任務代表超時了,回到continue那個if中返回null timedOut = true; } catch (InterruptedException retry) { // 捕獲到了中斷異常 // 中斷標記是在調用shutDown()或者shutDownNow()的時候設置進去的 // 此時,會回到for循環的第一個if處判斷狀態是否要返回null timedOut = false; } } }
注意,這裏取任務會根據工做線程的數量判斷是使用BlockingQueue的poll(timeout, unit)方法仍是take()方法。
poll(timeout, unit)方法會在超時時返回null,若是timeout<=0,隊列爲空時直接返回null。
take()方法會一直阻塞直到取到任務或拋出中斷異常。
因此,若是keepAliveTime設置爲0,當任務隊列爲空時,非核心線程取不出來任務,會當即結束其生命週期。
默認狀況下,是不容許核心線程超時的,可是能夠經過下面這個方法設置使核心線程也可超時。
public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value) interruptIdleWorkers(); } }
至此,線程池中任務的執行流程就結束了。
觀察num值的打印信息,先是打印了0~4,再打印了10~14,最後打印了5~9,居然不是按順序打印的,爲何呢?
線程池的參數:核心數量5個,最大數量10個,任務隊列5個。
答:執行前5個任務執行時,正好還不到核心數量,因此新建核心線程並執行了他們;
執行中間的5個任務時,已達到核心數量,因此他們先入隊列;
執行後面5個任務時,已達核心數量且隊列已滿,因此新建非核心線程並執行了他們;
再執行最後5個任務時,線程池已達到滿負荷狀態,因此執行了拒絕策略。
本章經過一個例子並結合線程池的重要方法咱們一塊兒分析了線程池中普通任務執行的流程。
(1)execute(),提交任務的方法,根據核心數量、任務隊列大小、最大數量,分紅四種狀況判斷任務應該往哪去;
(2)addWorker(),添加工做線程的方法,經過Worker內部類封裝一個Thread實例維護工做線程的執行;
(3)runWorker(),真正執行任務的地方,先執行第一個任務,再源源不斷從任務隊列中取任務來執行;
(4)getTask(),真正從隊列取任務的地方,默認狀況下,根據工做線程數量與核心數量的關係判斷使用隊列的poll()仍是take()方法,keepAliveTime參數也是在這裏使用的。
核心線程和非核心線程有什麼區別?
答:實際上並無什麼區別,主要是根據corePoolSize來判斷任務該去哪裏,二者在執行任務的過程當中並無任何區別。有可能新建的時候是核心線程,而keepAliveTime時間到告終束了的也多是剛開始建立的核心線程。
Worker繼承自AQS有何意義?
前面咱們看了Worker內部類的定義,它繼承自AQS,天生自帶鎖的特性,那麼,它的鎖是用來幹什麼的呢?跟任務的執行有關係嗎?
答:既然是跟鎖(同步)有關,說明Worker類跨線程使用了,此時咱們查看它的lock()方法發現只在runWorker()方法中使用了,可是其tryLock()倒是在interruptIdleWorkers()方法中使用的。
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers()方法的意思是中斷空閒線程的意思,它只會中斷BlockingQueue的poll()或take()方法,而不會中斷正在執行的任務。
通常來講,interruptIdleWorkers()方法的調用不是在本工做線程,而是在主線程中調用的,還記得《死磕 java線程系列之線程池深刻解析——生命週期》中說過的shutdown()和shutdownNow()方法嗎?
觀察兩個方法中中斷線程的方法,shutdown()中就是調用了interruptIdleWorkers()方法,這裏tryLock()獲取到鎖了再中斷,若是沒有獲取到鎖則不中斷,沒獲取到鎖只有一種狀況,也就是lock()所在的地方,也就是有任務正在執行。
而shutdownNow()中中斷線程則很暴力,並無tryLock(),而是直接中斷了線程,因此調用shutdownNow()可能會中斷正在執行的任務。
因此,Worker繼承自AQS實際是要使用其鎖的能力,這個鎖主要是用來控制shutdown()時不要中斷正在執行任務的線程。
歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。