本文首發於一世流雲專欄: https://segmentfault.com/blog...
在juc-executors框架概述的章節中,咱們已經簡要介紹過ThreadPoolExecutor
了,經過Executors工廠,用戶能夠建立本身須要的執行器對象。ThreadPoolExecutor,它是J.U.C在JDK1.5時提供的一種實現了ExecutorService接口的執行器,或者說線程池。java
ThreadPoolExecutor並無本身直接實現ExecutorService接口,由於它只是其中一種Executor的實現而已,因此Doug Lea把一些通用部分封裝成一個抽象父類——AbstractExecutorService,供J.U.C中的其它執行器繼承。若是讀者須要本身實現一個Executor,也能夠繼承該抽象類。算法
AbstractExecutorService提供了 ExecutorService 接口的默認實現——主要實現了 submit、invokeAny 、invokeAll這三類方法,若是讀者看過上一篇綜述文章,就應該知道,ExecutorService的這三類方法幾乎都是返回一個Future對象。而Future是一個接口,AbstractExecutorService既然實現了這些方法,必然要實現該Future接口,咱們來看下AbstractExecutorService實現的submit方法:數據庫
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
能夠看到,上述方法首先對Runnable和返回值value進行了封裝,經過newTaskFor
方法,封裝成了一個FutureTask對象,而後經過execute方法執行任務,最後返回異步任務對象。segmentfault
這裏實際上是模板方法模式的運用,execute是抽象方法,須要由繼承AbstractExecutorService的子類來實現。
上述須要注意的是newTaskFor方法,該方法建立了一個Future對象:設計模式
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
FutureTask其實就是Future接口的實現類:
緩存
咱們以前講過,J.U.C中的Future接口是「Future模式」的多線程設計模式的實現,可讓調用方以異步方式獲取任務的執行結果。而FutureTask即是這樣一類支持異步返回結果的任務,既然是任務就須要實現Runnable接口,同時又要支持異步功能,因此又須要實現Future接口。J.U.C爲了方便,新定義了一個接口—— RunnableFuture,該接口同時繼承Runnable和Future,表明支持異步處理的任務,而FutureTask即是它的默認實現。
本節不會在Futrure模式上花費太多筆墨,之後咱們會專門講解J.U.C對Future模式的支持。多線程
回到ThreadPoolExecutor,從該類的命名也能夠看出,這是一種線程池執行器。線程池你們應該並不陌生,應用開發中常常須要用到數據庫鏈接池,數據庫鏈接池裏維護着一些數據庫鏈接,當應用須要鏈接數據庫時,並非本身建立鏈接,而是從鏈接池中獲取可用鏈接;當關閉數據庫鏈接時,只是將該鏈接還給鏈接池,以供複用。併發
而線程池也是相似的概念,當有任務須要執行時,線程池會給該任務分配線程,若是當前沒有可用線程,通常會將任務放進一個隊列中,當有線程可用時,再從隊列中取出任務並執行,以下圖:框架
線程池的引入,主要解決如下問題:異步
瞭解了線程池和ThreadPoolExecutor的繼承體系,接下來,咱們來看下J.U.C是如何實現一個普通線程池的。
咱們先來看下ThreadPoolExecutor的構造器,其實以前在講Executors時已經接觸過了,Executors工廠方法建立的三種線程池:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,內部都是經過ThreadPoolExecutor的下面這個構造器實例化了ThreadPoolExecutor對象:
/** * 使用給定的參數建立ThreadPoolExecutor. * * @param corePoolSize 核心線程池中的最大線程數 * @param maximumPoolSize 總線程池中的最大線程數 * @param keepAliveTime 空閒線程的存活時間 * @param unit keepAliveTime的單位 * @param workQueue 任務隊列, 保存已經提交但還沒有被執行的線程 * @param threadFactory 線程工廠(用於指定若是建立一個線程) * @param handler 拒絕策略 (當任務太多致使工做隊列滿時的處理策略) */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); // 使用納秒保存存活時間 this.threadFactory = threadFactory; this.handler = handler; }
爲了用戶使用方便,ThreadPoolExecutor一共提供了4種構造器,但其它三種內部其實都調用了上面的構造器。
正是經過上述參數的組合變換,使得Executors工廠能夠建立不一樣類型的線程池。這裏先簡要講一下corePoolSize
和maximumPoolSize
這兩個參數:
ThreadPoolExecutor在邏輯上將自身管理的線程池劃分爲兩部分:核心線程池(大小對應爲corePoolSize)、非核心線程池(大小對應爲maximumPoolSize-corePoolSize)。
當咱們向線程池提交一個任務時,將建立一個工做線程——咱們稱之爲Worker,Worker在邏輯上從屬於下圖中的【核心線程池】或【非核心線程池】,具體屬於哪種,要根據corePoolSize、maximumPoolSize、Worker總數進行判斷:
注意:咱們上面一直在提【工做線程】、【核心線程池】、【非核心線程池】,讀者可能都看暈了,包括我本身第一次學習ThreadPoolExecutor時也被網上和垃圾國產技術書籍的錯誤描述給誤導了。我這裏先提一下,後面咱們分析線程池的任務調度流程時會再詳細說明:
- ThreadPoolExecutor中只有一種類型的線程,名叫Worker,它是ThreadPoolExecutor定義的內部類,同時封裝着Runnable任務和執行該任務的Thread對象,咱們稱它爲【工做線程】,它也是ThreadPoolExecutor惟一須要進行維護的線程;
- 【核心線程池】【非核心線程池】都是邏輯上的概念,ThreadPoolExecutor在任務調度過程當中會根據
corePoolSize
和maximumPoolSize
的大小,判斷應該如何調度任務.
到這裏,讀者可能會思考一個問題:既然是線程池,那麼必然有線程池狀態,同時也涉及對其中的工做線程(Worker)的管理,ThreadPoolExecutor是如何作的呢?
ThreadPoolExecutor內部定義了一個AtomicInteger變量——ctl,經過按位劃分的方式,在一個變量中記錄線程池狀態和工做線程數——低29位保存線程數,高3位保存線程池狀態:
/** * 保存線程池狀態和工做線程數: * 低29位: 工做線程數 * 高3位 : 線程池狀態 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 最大線程數: 2^29-1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 00011111 11111111 11111111 11111111 // 線程池狀態 private static final int RUNNING = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 00000000 00000000 00000000 00000000 private static final int STOP = 1 << COUNT_BITS; // 00100000 00000000 00000000 00000000 private static final int TIDYING = 2 << COUNT_BITS; // 01000000 00000000 00000000 00000000 private static final int TERMINATED = 3 << COUNT_BITS; // 01100000 00000000 00000000 00000000
能夠看到,ThreadPoolExecutor一共定義了5種線程池狀態:
各個狀態之間的流轉圖:
另外,咱們剛纔也提到工做線程(Worker),Worker被定義爲ThreadPoolExecutor的內部類,實現了AQS框架,ThreadPoolExecutor經過一個HashSet來保存工做線程:
/** * 工做線程集合. */ private final HashSet<Worker> workers = new HashSet<Worker>();
工做線程的定義以下:
/** * Worker表示線程池中的一個工做線程, 能夠與任務相關聯. * 因爲實現了AQS框架, 其同步狀態值的定義以下: * -1: 初始狀態 * 0: 無鎖狀態 * 1: 加鎖狀態 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * 與該Worker關聯的線程. */ final Thread thread; /** * Initial task to run. Possibly null. */ Runnable firstTask; /** * Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // 初始的同步狀態值 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** * 執行任務 */ public void run() { runWorker(this); } /** * 是否加鎖 */ protected boolean isHeldExclusively() { return getState() != 0; } /** * 嘗試獲取鎖 */ protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 嘗試釋放鎖 */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } /** * 中斷線程(僅任務非初始狀態) */ void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
經過Worker的定義能夠看到,每一個Worker對象都有一個Thread線程對象與它相對應,當任務須要執行的時候,實際是調用內部Thread對象的start方法,而Thread對象是在Worker的構造器中經過getThreadFactory().newThread(this)
方法建立的,建立的Thread將Worker自身做爲任務,因此當調用Thread的start
方法時,最終實際是調用了Worker.run()
方法,該方法內部委託給runWorker
方法執行任務,這個方法咱們後面會詳細介紹。
ThreadFactory用來建立單個線程,當線程池須要建立一個線程時,就要調用該類的newThread(Runnable r)
方法建立線程(ThreadPoolExecutor中實際建立線程的時刻是在將任務包裝成工做線程Worker時)。
ThreadPoolExecutor在構造時若是用戶不指定ThreadFactory,則默認使用Executors.defaultThreadFactory()
建立一個ThreadFactory,即Executors.DefaultThreadFactory:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } /** * 默認的線程工廠. */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
這裏的關鍵是要明白爲何須要用ThreadFactory來建立線程,而不是直接經過new Thread()的方式。這個問題在executors框架概述中已經談過了,這樣作的好處是:一來解耦對象的建立與使用,二來能夠批量配置線程信息(優先級、線程名稱、是否守護線程等),以自由設置池子中全部線程的狀態。
ExecutorService的核心方法是submit方法——用於提交一個待執行的任務,若是讀者閱讀ThreadPoolExecutor的源碼,會發現它並無覆寫submit方法,而是沿用了父類AbstractExecutorService的模板,而後本身實現了execute
方法:
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
ThreadPoolExecutor的execute方法定義以下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // CASE1: 工做線程數 < 核心線程池上限 if (addWorker(command, true)) // 添加工做線程並執行 return; c = ctl.get(); } // 執行到此處, 說明工做線程建立失敗 或 工做線程數≥核心線程池上限 if (isRunning(c) && workQueue.offer(command)) { // CASE2: 插入任務至隊列 // 再次檢查線程池狀態 int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) // CASE3: 插入隊列失敗, 判斷工做線程數 < 總線程池上限 reject(command); // 執行拒絕策略 }
上述execute的執行流程能夠用下圖描述:
這裏須要特別注意的是 CASE2中的addWorker(null, false)
,當將任務成功添加到隊列後,若是此時的工做線程數爲0,就會執行這段代碼。
通常來說每一個工做線程(Worker)都有一個Runnable任務和一個對應的執行線程Thread,當咱們調用addWorker方法時,若是不傳入相應的任務,那麼就只是新建了一個沒有任務的工做線程(Worker),該Worker就會從工做隊列中取任務來執行(由於本身沒有綁定任務)。若是傳入了任務,新建的工做線程就會執行該任務。
因此execute方法的CASE2中,將任務添加到隊列後,須要判斷工做線程數是否爲0,若是是0那麼就必須新建一個空任務的工做線程,未來在某一時刻它會去隊列取任務執行,不然沒有工做線程的話,該隊列中的任務永遠不會被執行。
另外,這裏又要回到【工做線程】、【核心線程池】、【非核心線程池】、【總線程池】的概念上了。
再強調一遍,maximumPoolSize限定了整個線程池的大小,corePoolSize限定了核心線程池的大小,corePoolSize≤maximumPoolSize(當相等時表示爲固定線程池);maximumPoolSize-corePoolSize表示非核心線程池。
execute的整個執行流程關鍵是下面兩點:
CorePoolSize ≤ 工做線程數 < maximumPoolSize
)新建一個工做線程當即執行任務,不然執行拒絕策略。瞭解了ThreadPoolExecutor的整個執行流程,咱們來看下它是如何添加工做線程並執行任務的,execute方法內部調用了addWorker方法來添加工做線程並執行任務:
/** * 添加工做線程並執行任務 * * @param firstTask 若是指定了該參數, 表示將當即建立一個新工做線程執行該firstTask任務; 不然複用已有的工做線程,從工做隊列中獲取任務並執行 * @param core 執行任務的工做線程歸屬於哪一個線程池: true-核心線程池 false-非核心線程池 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 獲取線程池狀態 /** * 這個if主要是判斷哪些狀況下, 線程池再也不接受新任務執行, 而是直接返回.總結下, 有如下幾種狀況: * 1. 線程池狀態爲 STOP 或 TIDYING 或 TERMINATED: 線程池狀態爲上述任一一種時, 都不會再接受任務,因此直接返回 * 2. 線程池狀態≥ SHUTDOWN 且 firstTask != null: 由於當線程池狀態≥ SHUTDOWN時, 再也不接受新任務的提交,因此直接返回 * 3. 線程池狀態≥ SHUTDOWN 且 隊列爲空: 隊列中已經沒有任務了, 因此也就不須要執行任何任務了,能夠直接返回 */ if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; for (; ; ) { int wc = workerCountOf(c); // 獲取工做線程數 /** * 這個if主要是判斷工做線程數是否超限, 如下任一狀況屬於屬於超限, 直接返回: * 1. 工做線程數超過最大工做線程數(2^29-1) * 2. 工做線程數超過核心線程池上限(入參core爲true, 表示歸屬核心線程池) * 3. 工做線程數超過總線程池上限(入參core爲false, 表示歸屬非核心線程池) */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // 工做線程數加1 break retry; // 跳出最外層循環 c = ctl.get(); if (runStateOf(c) != rs) // 線程池狀態發生變化, 從新自旋判斷 continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // 將任務包裝成工做線程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 從新檢查線程池狀態 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); // 加入工做線程集合 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (!workerStarted) // 建立/啓動工做線程失敗, 須要執行回滾操做 addWorkerFailed(w); } return workerStarted; }
整個addWorker的邏輯並不複雜,分爲兩部分:
第一部分是一個自旋操做,主要是對線程池的狀態進行一些判斷,若是狀態不適合接受新任務,或者工做線程數超出了限制,則直接返回false。
這裏須要注意的就是core
參數,爲true時表示新建的工做線程在邏輯上歸屬於核心線程池,因此須要判斷條件工做線程數 < corePoolSize
是否知足;core爲false時表示在新增的工做線程邏輯上屬於非核心線程池,因此須要判斷條件工做線程數 < maximumPoolSize
是否知足。
通過第一部分的過濾,第二部分才真正去建立工做線程並執行任務:
首先將Runnable任務包裝成一個Worker對象,而後加入到一個工做線程集合中(名爲workers的HashSet),最後調用工做線程中的Thread對象的start方法執行任務,其實最終是委託到Worker的下面方法執行:
/** * 執行任務 */ public void run() { runWorker(this); }
runWoker用於執行任務,總體流程以下:
getTask()
方法從隊列中獲取任務(若是工做線程自身攜帶着任務,則執行攜帶的任務);task.run()
執行任務;final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 執行任務的線程 Runnable task = w.firstTask; // 任務, 若是是null則從隊列取任務 w.firstTask = null; w.unlock(); // 容許執行線程被中斷 boolean completedAbruptly = true; // 表示是否由於中斷而致使退出 try { while (task != null || (task = getTask()) != null) { // 當task==null時會經過getTask從隊列取任務 w.lock(); /** * 下面這個if判斷的做用以下: * 1.保證當線程池狀態爲STOP/TIDYING/TERMINATED時,當前執行任務的線程wt是中斷狀態(由於線程池處於上述任一狀態時,均不能再執行新任務) * 2.保證當線程池狀態爲RUNNING/SHUTDOWN時,當前執行任務的線程wt不是中斷狀態 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); // 鉤子方法,由子類自定義實現 Throwable thrown = null; try { task.run(); // 執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); // 鉤子方法,由子類自定義實現 } } finally { task = null; w.completedTasks++; // 完成任務數+1 w.unlock(); } } // 執行到此處, 說明該工做線程自身既沒有攜帶任務, 也沒從任務隊列中獲取到任務 completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); // 處理工做線程的退出工做 } }
這裏要特別注意第一個IF方法,該方法的核心做用,用一句話歸納就是:
確保正在中止的線程池(STOP/TIDYING/TERMINATED)再也不接受新任務,若是有新任務那麼該任務的工做線程必定是中斷狀態;確保正常狀態的線程池(RUNNING/SHUTDOWN),其所執行的任務都是不能被中斷的。
另外,getTask方法用於從任務隊列中獲取一個任務,若是獲取不到任務,會跳出while循環,最終會經過processWorkerExit方法清理工做線程。注意這裏的completedAbruptly
字段,它表示該工做線程是不是由於中斷而退出,while循環的退出有如下幾種可能:
processWorkerExit(worker,false);
processWorkerExit(worker,true);
經過上面的討論,咱們知道工做線程是在processWorkerExit中被清理的,來看下定義:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 工做線程因異常狀況而退出 decrementWorkerCount(); // 工做線程數減1(若是工做線程執行時沒有出現異常, 在getTask()方法中已經對線程數減1了) final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // completedTaskCount記錄線程池完成的總任務數 workers.remove(w); // 從工做線程集合中移除(該工做線程會自動被GC回收) } finally { mainLock.unlock(); } tryTerminate(); // 根據線程池狀態, 判斷是否須要終止線程池 int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 若是線程池狀態爲RUNNING/SHUTDOWN if (!completedAbruptly) { // 工做線程爲正常退出 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); // 新建一個工做線程 } }
processWorkerExit的做用就是將該退出的工做線程清理掉,而後看下線程池是否須要終止。
processWorkerExit執行完以後,整個工做線程的生命週期也結束了,咱們能夠經過下圖來回顧下它的整個生命週期:
最後,咱們來看下任務的獲取,也就是runWorker中使用的getTask
方法:
private Runnable getTask() { boolean timedOut = false; // 表示上次從阻塞隊列中取任務時是否超時 for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 獲取線程池狀態 /** * 如下IF用於判斷哪些狀況下不容許再從隊列獲取任務: * 1. 線程池進入中止狀態(STOP/TIDYING/TERMINATED), 此時即便隊列中還有任務未執行, 也再也不執行 * 2. 線程池非RUNNING狀態, 且隊列爲空 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); // 工做線程數減1 return null; } int wc = workerCountOf(c); // 獲取工做線程數 /** * timed變量用於判斷是否須要進行超時控制: * 對於核心線程池中的工做線程, 除非設置了allowCoreThreadTimeOut==true, 不然不會超時回收; * 對於非核心線程池中的工做線程, 都須要超時控制 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 這裏主要是當外部經過setMaximumPoolSize方法從新設置了最大線程數時,須要回收多出的工做線程 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; // 超時仍未獲取到任務 } catch (InterruptedException retry) { timedOut = false; } } }
getTask方法的主要做用就是:經過自旋,不斷地嘗試從阻塞隊列中獲取一個任務,若是獲取失敗則返回null。
阻塞隊列就是在咱們構建ThreadPoolExecutor對象時,在構造器中指定的。因爲隊列是外部指定的,因此根據阻塞隊列的特性不一樣,getTask方法的執行狀況也不一樣。咱們曾經在J.U.C之collections框架系列中全面剖析過J.U.C中的全部阻塞隊列:
隊列特性 | 有界隊列 | 近似無界隊列 | 無界隊列 | 特殊隊列 |
---|---|---|---|---|
有鎖算法 | ArrayBlockingQueue | LinkedBlockingQueue、LinkedBlockingDeque | / | PriorityBlockingQueue、DelayQueue |
無鎖算法 | / | / | LinkedTransferQueue | SynchronousQueue |
咱們能夠根據業務需求、任務特色等選擇上表中的某一種阻塞隊列,根據Oracle官方文檔的提示,任務在阻塞隊列中排隊一共有三種狀況:
1.直接提交
即直接將任務提交給等待的工做線程,這時能夠選擇SynchronousQueue。由於SynchronousQueue是沒有容量的,並且採用了無鎖算法,因此性能較好,可是每一個入隊操做都要等待一個出隊操做,反之亦然。
使用SynchronousQueue時,當核心線程池滿了之後,若是不存在空閒的工做線程,則試圖把任務加入隊列將當即失敗(execute方法中使用了隊列的offer方法進行入隊操做,而SynchronousQueue在調用offer時若是沒有另外一個線程等待出隊操做,則會當即返回false),所以會構造一個新的工做線程(未超出最大線程池容量時)。
因爲,核心線程池是很容易滿的,因此當使用SynchronousQueue時,通常須要將maximumPoolSizes
設置得比較大,不然入隊很容易失敗,最終致使執行拒絕策略,這也是爲何Executors工做默認提供的緩存線程池使用SynchronousQueue做爲任務隊列的緣由。
2.無界任務隊列
無界任務隊列咱們的選擇主要有LinkedTransferQueue、LinkedBlockingQueue(近似無界,構造時不指定容量便可),從性能角度來講LinkedTransferQueue採用了無鎖算法,高併發環境下性能相對更好,但若是隻是作任務隊列使用相差並不大。
使用無界隊列須要特別注意系統資源的消耗狀況,由於當核心線程池滿了之後,會首先嚐試將任務放入隊列,因爲是無界隊列因此幾乎必定會成功,那麼系統瓶頸其實就是硬件了。若是任務的建立速度遠快於工做線程處理任務的速度,那麼最終會致使系統資源耗盡。Executors工廠中建立固定線程池的方法內部就是用了LinkedBlockingQueue。
3.有界任務隊列
有界任務隊列,好比ArrayBlockingQueue ,能夠防止資源耗盡的狀況。當核心線程池滿了之後,若是隊列也滿了,則會建立歸屬於非核心線程池的工做線程,若是非核心線程池也滿了 ,纔會執行拒絕策略。
ThreadPoolExecutor在如下兩種狀況下會執行拒絕策略:
所謂拒絕策略,就是在構造ThreadPoolExecutor時,傳入的RejectedExecutionHandler對象:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
ThreadPoolExecutor一共提供了4種拒絕策略:
1.AbortPolicy(默認)
AbortPolicy策略其實就是拋出一個RejectedExecutionException異常:
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
2.DiscardPolicy
DiscardPolicy策略其實就是無爲而治,什麼都不作,等任務本身被回收:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
3.DiscardOldestPolicy
DiscardOldestPolicy策略是丟棄任務隊列中的最近一個任務,並執行當前任務:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 線程池未關閉(RUNNING) e.getQueue().poll(); // 丟棄任務隊列中的最近任務 e.execute(r); // 執行當前任務 } } }
4.CallerRunsPolicy
CallerRunsPolicy策略至關於以自身線程來執行任務,這樣能夠減緩新任務提交的速度。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 線程池未關閉(RUNNING) r.run(); // 執行當前任務 } } }
ExecutorService接口提供兩種方法來關閉線程池,這兩種方法的區別主要在因而否會繼續處理已經添加到任務隊列中的任務。
shutdown方法將線程池切換到SHUTDOWN狀態(若是已經中止,則不用切換),並調用interruptIdleWorkers方法中斷全部空閒的工做線程,最後調用tryTerminate嘗試結束線程池:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); // 若是線程池爲RUNNING狀態, 則切換爲SHUTDOWN狀態 interruptIdleWorkers(); // 中斷全部空閒線程 onShutdown(); // 鉤子方法, 由子類實現 } finally { mainLock.unlock(); } tryTerminate(); }
這裏要注意,若是執行Runnable任務的線程自己不響應中斷,那麼也就沒有辦法終止任務。
shutdownNow方法的主要不一樣之處就是,它會將線程池的狀態至少置爲STOP,同時中斷全部工做線程(不管該線程是空閒仍是運行中),同時返回任務隊列中的全部任務。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 若是線程池爲RUNNING或SHUTDOWN狀態, 則切換爲STOP狀態 interruptWorkers(); // 中斷全部工做線程 tasks = drainQueue(); // 抽空任務隊列中的全部任務 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
最後,咱們來回顧下ThreadPoolExecutor的總體結構,ThreadPoolExecutor的核心方法是execute,控制着工做線程的建立和任務的執行,以下圖:
同時,ThreadPoolExecutor中有幾個比較重要的組件:阻塞隊列、核心線程池、拒絕策略,它們的關係以下圖,圖中的序號表示execute的執行順序,能夠配合上面的流程圖來理解:
關於ThreadPoolExecutor這個線程池,最重要的是根據系統實際狀況,合理進行線程池參數的設置以及阻塞隊列的選擇。現實狀況下,通常會本身經過ThreadPoolExecutor的構造器去構建線程池,而非直接使用Executors工廠建立,由於這樣更利於對參數的控制和調優。
另外,根據任務的特色,要有選擇的配置核心線程池的大小:
ThreadPoolExecutor到此就介紹完了,下一節咱們將介紹一種可控制任務執行週期的線程池——ScheduledThreadPoolExecutor,其實咱們以前講ScheduledExecutorService接口的時候已經接觸過了,下一節會深刻它的實現原理。