本文你將得到如下信息:html
爲了方便讀者理解,本文會由淺入深,先從線程池的使用開始再延伸到源碼解讀和源碼分析等高級內容,讀者可根據本身的狀況自主選擇閱讀順序和須要瞭解的章節。java
線程池可以更加充分的利用CPU、內存、網絡、IO等系統資源,線程池的主要做用以下:git
阿里巴巴Java開發手冊強制規定:線程資源必須經過線程池提供,以下圖:github
本節會介紹7種線程池的建立與使用,線程池的狀態介紹,ThreadPoolExecutor參數介紹等。算法
線程池可使用Executors和ThreadPoolExecutor,其中使用Executors有六種建立線程池的方法,以下圖:api
// 使用Executors方式建立 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2); ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); ExecutorService workStealingPool = Executors.newWorkStealingPool(); // 原始建立方式 ThreadPoolExecutor tp = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
總結: 其中newSingleThreadExecutor、newCachedThreadPool、newFixedThreadPool是對ThreadPoolExecutor的封裝實現,newSingleThreadScheduledExecutor、newScheduledThreadPool則爲ThreadPoolExecutor子類ScheduledThreadPoolExecutor的封裝,用於執行延遲任務,newWorkStealingPool則爲Java 8新加的方法。緩存
從以上代碼能夠看出newSingleThreadExecutor和newSingleThreadScheduledExecutor建立的都是單線程池,那麼單線程池的意義是什麼呢?網絡
雖然是單線程池,但提供了工做隊列,生命週期管理,工做線程維護等功能。併發
ThreadPoolExecutor做爲線程池的核心方法,咱們來看一下ThreadPoolExecutor內部實現,以及封裝類是怎麼調用ThreadPoolExecutor的。oracle
先從構造函數提及,構造函數源碼以下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
參數說明:
來看一下線程池封裝類對於ThreadPoolExecutor的調用:
newSingleThreadExecutor對ThreadPoolExecutor的封裝源碼以下:
public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newCachedThreadPool對ThreadPoolExecutor的封裝源碼以下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newFixedThreadPool對ThreadPoolExecutor的封裝源碼以下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ScheduledExecutorService對ThreadPoolExecutor的封裝源碼以下:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
newSingleThreadScheduledExecutor使用的是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor,以下圖所示:
newScheduledThreadPool對ThreadPoolExecutor的封裝源碼以下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
newScheduledThreadPool使用的也是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor。
查看ThreadPoolExecutor源碼可知線程的狀態以下:
線程狀態解讀(如下內容來源於:https://javadoop.com/post/java-thread-pool):
RUNNING 定義爲 -1,SHUTDOWN 定義爲 0,其餘的都比 0 大,因此等於 0 的時候不能提交任務,大於 0 的話,連正在執行的任務也須要中斷。
看了這幾種狀態的介紹,讀者大致也能夠猜到十之八九的狀態轉換了,各個狀態的轉換過程有如下幾種:
說了那麼多下來一塊兒來看線程池的是怎麼執行任務的,線程池任務提交有兩個方法:
其中execute只能接受Runnable類型的任務,使用以下:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); singleThreadExecutor.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } });
submit能夠接受Runnable或Callable類型的任務,使用以下:
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } });
使用submit傳遞Callable類能夠獲取執行任務的返回值,Callable是JDK 1.5 添加的特性用於補充Runnable無返回的狀況。
ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Long> result = executorService.submit(new Callable<Long>() { @Override public Long call() throws Exception { return new Date().getTime(); } }); try { System.out.println("運行結果:" + result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
在線程池中newSingleThreadScheduledExecutor和newScheduledThreadPool返回的是ScheduledExecutorService,用於執行延遲線程池的,代碼以下:
// 延遲線程池 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); scheduledThreadPool.schedule(new Runnable() { @Override public void run() { System.out.println("time:" + new Date().getTime()); } }, 10, TimeUnit.SECONDS);
完整示例下載地址: https://github.com/vipstone/java-core-example
閱讀線程池的源碼有一個小技巧,能夠按照線程池執行的順序進行串連關聯閱讀,這樣更容易理解線程池的實現。
源碼閱讀流程解讀
咱們先從線程池的任務提交方法execute()開始閱讀,從execute()咱們會發現線程池執行的核心方法是addWorker(),在addWorker()中咱們發現啓動線程調用了start()方法,調用start()方法以後會執行Worker類的run()方法,run裏面調用runWorker(),運行程序的關鍵在於getTask()方法,getTask()方法以後就是此線程的關閉,整個線程池的工做流程也就完成了,下來一塊兒來看吧(若是本段文章沒看懂的話也能夠看完源碼以後,回過頭來再看一遍)。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 若是當前線程數少於核心線程數,那麼直接添加一個 worker 來執行任務, // 建立一個新的線程,並把當前任務 command 做爲這個線程的第一個任務(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任務成功,那麼就結束了。提交任務嘛,線程池已經接受了這個任務,這個方法也就能夠返回了 // 至於執行的結果,到時候會包裝到 FutureTask 中。 // 返回 false 表明線程池不容許提交任務 if (addWorker(command, true)) return; c = ctl.get(); } // 到這裏說明,要麼當前線程數大於等於核心線程數,要麼剛剛 addWorker 失敗了 // 若是線程池處於 RUNNING 狀態,把這個任務添加到任務隊列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { /* 這裏面說的是,若是任務進入了 workQueue,咱們是否須要開啓新的線程 * 由於線程數在 [0, corePoolSize) 是無條件開啓新的線程 * 若是線程數已經大於等於 corePoolSize,那麼將任務添加到隊列中,而後進到這裏 */ int recheck = ctl.get(); // 若是線程池已不處於 RUNNING 狀態,那麼移除已經入隊的這個任務,而且執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 若是線程池仍是 RUNNING 的,而且線程數爲 0,那麼開啓新的線程 // 到這裏,咱們知道了,這塊代碼的真正意圖是:擔憂任務提交到隊列中了,可是線程都關閉了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是 workQueue 隊列滿了,那麼進入到這個分支 // 以 maximumPoolSize 爲界建立新的 worker, // 若是失敗,說明當前線程數已經達到 maximumPoolSize,執行拒絕策略 else if (!addWorker(command, false)) reject(command); }
// 第一個參數是準備提交給這個線程執行的任務,以前說了,能夠爲 null // 第二個參數爲 true 表明使用核心線程數 corePoolSize 做爲建立線程的界線,也就說建立這個線程的時候, // 若是線程池中的線程總數已經達到 corePoolSize,那麼不能響應此次建立線程的請求 // 若是是 false,表明使用最大線程數 maximumPoolSize 做爲界線 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 這個很是很差理解 // 若是線程池已關閉,並知足如下條件之一,那麼不建立新的 worker: // 1. 線程池狀態大於 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED // 2. firstTask != null // 3. workQueue.isEmpty() // 簡單分析下: // 仍是狀態控制的問題,當線程池處於 SHUTDOWN 的時候,不容許提交任務,可是已有的任務繼續執行 // 當狀態大於 SHUTDOWN 時,不容許提交任務,且中斷正在執行的任務 // 多說一句:若是線程池處於 SHUTDOWN,可是 firstTask 爲 null,且 workQueue 非空,那麼是容許建立 worker 的 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 若是成功,那麼就是全部建立線程前的條件校驗都知足了,準備建立線程執行任務了 // 這裏失敗的話,說明有其餘線程也在嘗試往線程池中建立線程 if (compareAndIncrementWorkerCount(c)) break retry; // 因爲有併發,從新再讀取一下 ctl c = ctl.get(); // 正常若是是 CAS 失敗的話,進到下一個裏層的for循環就能夠了 // 但是若是是由於其餘線程的操做,致使線程池的狀態發生了變動,若有其餘線程關閉了這個線程池 // 那麼須要回到外層的for循環 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } /* * 到這裏,咱們認爲在當前這個時刻,能夠開始建立線程來執行任務了, * 由於該校驗的都校驗了,至於之後會發生什麼,那是之後的事,至少當前是知足條件的 */ // worker 是否已經啓動 boolean workerStarted = false; // 是否已將這個 worker 添加到 workers 這個 HashSet 中 boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; // 把 firstTask 傳給 worker 的構造方法 w = new Worker(firstTask); // 取 worker 中的線程對象,以前說了,Worker的構造方法會調用 ThreadFactory 來建立一個新的線程 final Thread t = w.thread; if (t != null) { // 這個是整個類的全局鎖,持有這個鎖才能讓下面的操做「瓜熟蒂落」, // 由於關閉一個線程池須要這個鎖,至少我持有鎖的期間,線程池不會被關閉 mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); // 小於 SHUTTDOWN 那就是 RUNNING,這個自沒必要說,是最正常的狀況 // 若是等於 SHUTDOWN,前面說了,不接受新的任務,可是會繼續執行等待隊列中的任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // worker 裏面的 thread 可不能是已經啓動的 if (t.isAlive()) throw new IllegalThreadStateException(); // 加到 workers 這個 HashSet 中 workers.add(w); int s = workers.size(); // largestPoolSize 用於記錄 workers 中的個數的最大值 // 由於 workers 是不斷增長減小的,經過這個值能夠知道線程池的大小曾經達到的最大值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加成功的話,啓動這個線程 if (workerAdded) { // 啓動線程 t.start(); workerStarted = true; } } } finally { // 若是線程沒有啓動,須要作一些清理工做,如前面 workCount 加了 1,將其減掉 if (! workerStarted) addWorkerFailed(w); } // 返回線程是否啓動成功 return workerStarted; }
在這段代碼能夠看出,調用了t.start();
根據上面代碼可知,調用了Worker的t.start()以後,緊接着會調用Worker的run()方法,run()源碼以下:
public void run() { runWorker(this); }
runWorker()源碼以下:
// worker 線程啓動後調用,while 循環(即自旋!)不斷從等待隊列獲取任務並執行 // worker 初始化時,可指定 firstTask,那麼第一個任務也就能夠不須要從隊列中獲取 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 該線程的第一個任務(如有) Runnable task = w.firstTask; w.firstTask = null; // 容許中斷 w.unlock(); boolean completedAbruptly = true; try { // 循環調用 getTask 獲取任務 while (task != null || (task = getTask()) != null) { w.lock(); // 若線程池狀態大於等於 STOP,那麼意味着該線程也要中斷 /** * 若線程池STOP,請確保線程 已被中斷 * 若是沒有,請確保線程未被中斷 * 這須要在第二種狀況下進行從新檢查,以便在關中斷時處理shutdownNow競爭 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 這是一個鉤子方法,留給須要的子類實現 beforeExecute(wt, task); Throwable thrown = null; try { // 到這裏終於能夠執行任務了 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { // 這裏不容許拋出 Throwable,因此轉換爲 Error thrown = x; throw new Error(x); } finally { // 也是一個鉤子方法,將 task 和異常做爲參數,留給須要的子類實現 afterExecute(task, thrown); } } finally { // 置空 task,準備 getTask 下一個任務 task = null; // 累加完成的任務數 w.completedTasks++; // 釋放掉 worker 的獨佔鎖 w.unlock(); } } completedAbruptly = false; } finally { // 到這裏,須要執行線程關閉 // 1. 說明 getTask 返回 null,也就是說,這個 worker 的使命結束了,執行關閉 // 2. 任務執行過程當中發生了異常 // 第一種狀況,已經在代碼處理了將 workCount 減 1,這個在 getTask 方法分析中說 // 第二種狀況,workCount 沒有進行處理,因此須要在 processWorkerExit 中處理 processWorkerExit(w, completedAbruptly); } }
runWorker裏面的有getTask(),來看下具體的實現:
// 此方法有三種可能 // 1. 阻塞直到獲取到任務返回。默認 corePoolSize 以內的線程是不會被回收的,它們會一直等待任務 // 2. 超時退出。keepAliveTime 起做用的時候,也就是若是這麼多時間內都沒有任務,那麼應該執行關閉 // 3. 若是發生瞭如下條件,須返回 null // 池中有大於 maximumPoolSize 個 workers 存在(經過調用 setMaximumPoolSize 進行設置) // 線程池處於 SHUTDOWN,並且 workQueue 是空的,前面說了,這種再也不接受新的任務 // 線程池處於 STOP,不只不接受新的線程,連 workQueue 中的線程也再也不執行 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // 容許核心線程數內的線程回收,或當前線程數超過了核心線程數,那麼有可能發生超時關閉 // 這裏 break,是爲了避免往下執行後一個 if (compareAndDecrementWorkerCount(c)) // 兩個 if 一塊兒看:若是當前線程數 wc > maximumPoolSize,或者超時,都返回 null // 那這裏的問題來了,wc > maximumPoolSize 的狀況,爲何要返回 null? // 換句話說,返回 null 意味着關閉線程。 // 那是由於有可能開發者調用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調小了 // 若是此 worker 發生了中斷,採起的方案是重試 // 解釋下爲何會發生中斷,這個讀者要去看 setMaximumPoolSize 方法, // 若是開發者將 maximumPoolSize 調小了,致使其小於當前的 workers 數量, // 那麼意味着超出的部分線程要被關閉。從新進入 for 循環,天然會有部分線程會返回 null int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // CAS 操做,減小工做線程數 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 若是此 worker 發生了中斷,採起的方案是重試 // 解釋下爲何會發生中斷,這個讀者要去看 setMaximumPoolSize 方法, // 若是開發者將 maximumPoolSize 調小了,致使其小於當前的 workers 數量, // 那麼意味着超出的部分線程要被關閉。從新進入 for 循環,天然會有部分線程會返回 null timedOut = false; } } }
線程池的執行流程以下圖:
本文總結以問答的形式展現,引自《深度解讀 java 線程池設計思想及源碼實現》,最下方附參考地址。
corePoolSize 到 maximumPoolSize 之間的線程會被回收,固然 corePoolSize 的線程也能夠經過設置而獲得回收(allowCoreThreadTimeOut(true))。
workQueue 用於存聽任務,添加任務的時候,若是當前線程數超過了 corePoolSize,那麼往該隊列中插入任務,線程池中的線程會負責到隊列中拉取任務。
keepAliveTime 用於設置空閒時間,若是線程數超出了 corePoolSize,而且有些線程的空閒時間超過了這個值,會執行關閉這些線程的操做
rejectedExecutionHandler 用於處理當線程池不能執行此任務時的狀況,默認有拋出 RejectedExecutionException 異常、忽略任務、使用提交任務的線程來執行此任務和將隊列中等待最久的任務刪除,而後提交此任務這四種策略,默認爲拋出異常。
若是當前線程數少於 corePoolSize,那麼提交任務的時候建立一個新的線程,並由這個線程執行這個任務;
若是當前線程數已經達到 corePoolSize,那麼將提交的任務添加到隊列中,等待線程池中的線程去隊列中取任務;
若是隊列已滿,那麼建立新的線程來執行任務,須要保證池中的線程數不會超過 maximumPoolSize,若是此時線程數超過了 maximumPoolSize,那麼執行拒絕策略。
若是某個任務執行出現異常,那麼執行任務的線程會被關閉,而不是繼續接收其餘任務。而後會啓動一個新的線程來代替它。
書籍:《碼出高效:Java開發手冊》
Java核心技術36講:http://t.cn/EwUJvWA
深度解讀 java 線程池設計思想及源碼實現:https://javadoop.com/post/java-thread-pool
Java線程池-ThreadPoolExecutor源碼解析(基於Java8):https://www.imooc.com/article/42990
課程推薦: