public interface Executor { void execute(Runnable command); }
執行提交的Runnable任務。其中的execute方法在未來的某個時候執行給定的任務,該任務能夠在新線程、池化線程或調用線程中執行,具體由Executor的實現者決定。java
ExecutorService繼承自Executor,下面挑幾個方法介紹:併發
void shutdown();
啓動有序關閉線程池,在此過程當中執行先前提交的任務,但不接受任何新任務。若是線程池已經關閉,調用此方法不會產生額外的效果。此方法不等待之前提交的任務完成執行,可使用awaitTermination去實現。異步
List<Runnable> shutdownNow();
嘗試中止全部正在積極執行的任務, 中止處理等待的任務,並返回等待執行的任務列表。 此方法不等待之前提交的任務完成執行,可使用awaitTermination去實現。除了盡最大努力中止處理積極執行的任務外,沒有任何保證。例如,典型的實現是:經過Thread#interrupt取消任務執行,可是任何未能響應中斷的任務均可能永遠不會終止。函數
boolean isShutdown();
返回線程池關閉狀態。oop
boolean isTerminated();
若是關閉後全部任務都已完成,則返回 true。注意,除非首先調用了shutdown或shutdownNow,不然isTerminated永遠不會返回true。測試
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
線程阻塞阻塞,直到全部任務都在shutdown請求以後執行完畢,或者超時發生,或者當前線程被中斷(以先發生的狀況爲準)。this
<T> Future<T> submit(Callable<T> task);
提交一個value-returning任務以執行,並返回一個表示該任務未決結果的Future。 Future的 get方法將在成功完成任務後返回任務的結果。線程
安排命令在給定的延遲以後運行,或者按期執行,繼承自ExecutorService接口由如下四個方法組成:設計
//在給定延遲以後啓動任務,返回ScheduledFuture public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit); //建立並執行一個週期性操做,該操做在給定的初始延遲以後首次啓動,而後在給定的週期內執行; //若是任務的任何執行遇到異常,則禁止後續執行。不然,任務只會經過執行器的取消或終止而終止。 //若是此任務的任何執行時間超過其週期,則後續執行可能會延遲開始,但不會併發執行。 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit); //建立並執行一個週期性操做,該操做在給定的初始延遲以後首次啓動,而後在一次執行的終止和下一次執行的開始之間使用給定的延遲。 //若是任務的任何執行遇到異常,則禁止後續執行。不然,任務只會經過執行器的取消或終止而終止。 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
public interface ThreadFactory { Thread newThread(Runnable r); }
按需建立新線程的對象。日誌
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
返回任務結果也可能拋出異常。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
Future表示異步計算的結果。方法用於檢查計算是否完成,等待計算完成並檢索計算結果。只有當計算完成時,纔可使用方法get檢索結果,若是須要,能夠阻塞,直到準備好爲止。取消由cancel方法執行。還提供了其餘方法來肯定任務是否正常完成或被取消。一旦計算完成,就不能取消計算。
public interface Delayed extends Comparable<Delayed> { //在給定的時間單位中返回與此對象關聯的剩餘延遲 long getDelay(TimeUnit unit); }
一種混合風格的接口,用於標記在給定延遲以後應該執行的對象。
public interface ScheduledFuture<V> extends Delayed, Future<V> {}
新任務進來時:
構造方法:
public ThreadPoolExecutor( int corePoolSize,int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
參數說明:
有4個ThreeadPoolExecutor內部類。
最好自定義飽和策略,實現RejectedExecutionHandler接口,如:記錄日誌或持久化存儲不能處理的任務。
線程池的內部狀態由AtomicInteger修飾的ctl表示,其高3位表示線程池的運行狀態,低29位表示線程池中的線程數量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
主池控制狀態ctl是一個原子整數,包含兩個概念字段:
爲了將這兩個字段打包成一個整型,因此將workerCount限制爲(2^29)-1個線程,而不是(2^31)-1個線程。
workerCount是工做線程數量。該值可能與實際活動線程的數量存在暫時性差別,例如,當ThreadFactory在被請求時沒法建立線程,以及退出的線程在終止前仍在執行bookkeeping時。 用戶可見的池大小報告爲工做線程集的當前大小。
runState提供了生命週期,具備如下值:
爲了容許有序比較,這些值之間的數值順序很重要。運行狀態會隨着時間單調地增長,但不須要達到每一個狀態。轉換:
當狀態達到TERMINATED時,在awaitTermination()中等待的線程將返回。
下面看如下其餘狀態信息:
//Integer.SIZE爲32,COUNT_BITS爲29 private static final int COUNT_BITS = Integer.SIZE - 3; //2^29-1 最大線程數 private static final int CAPACITY = (1 << COUNT_BITS) - 1; /** * 即高3位爲111,該狀態的線程池會接收新任務,並處理阻塞隊列中的任務; * 111 0 0000 0000 0000 0000 0000 0000 0000 * -1 原碼:0000 ... 0001 反碼:1111 ... 1110 補碼:1111 ... 1111 * 左移操做:後面補 0 * 111 0 0000 0000 0000 0000 0000 0000 0000 */ private static final int RUNNING = -1 << COUNT_BITS; /** * 即高3位爲000,該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務; * 000 0 0000 0000 0000 0000 0000 0000 0000 */ private static final int SHUTDOWN = 0 << COUNT_BITS; /** * 即高3位爲001,該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,並且會中斷正在* 運行的任務; * 001 0 0000 0000 0000 0000 0000 0000 0000 */ private static final int STOP = 1 << COUNT_BITS; /** * 即高3位爲010,全部任務都已終止,workerCount爲零,過渡到狀態TIDYING的線程將運行terminated()鉤子方法; * 010 0 0000 0000 0000 0000 0000 0000 0000 */ private static final int TIDYING = 2 << COUNT_BITS; /** * 即高3位爲011,terminated()方法執行完畢; * 011 0 0000 0000 0000 0000 0000 0000 0000 */ private static final int TERMINATED = 3 << COUNT_BITS; //根據ctl計算runState private static int runStateOf(int c) { //2^29 = 001 0 0000 0000 0000 0000 0000 0000 0000 //2^29-1 = 000 1 1111 1111 1111 1111 1111 1111 1111 //~(2^29-1)=111 0 0000 0000 0000 0000 0000 0000 0000 //假設c爲 STOP 001 0 0000 0000 0000 0000 0000 0000 0000 // 最終值: 001 0 0000 0000 0000 0000 0000 0000 0000 return c & ~CAPACITY; } //根據ctl計算 workerCount private static int workerCountOf(int c) { //2^29-1 = 000 1 1111 1111 1111 1111 1111 1111 1111 //假設c = 000 0 0000 0000 0000 0000 0000 0000 0001 1個線程 //最終值: 000 0 0000 0000 0000 0000 0000 0000 0001 1 return c & CAPACITY; } // 根據runState和workerCount計算ctl private static int ctlOf(int rs, int wc) { //假設 rs: STOP 001 0 0000 0000 0000 0000 0000 0000 0000 //假設 wc: 000 0 0000 0000 0000 0000 0000 0000 0001 1個線程 //最終值: 001 0 0000 0000 0000 0000 0000 0000 0001 return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } //RUNNING狀態爲負數,確定小於SHUTDOWN,返回線程池是否爲運行狀態 private static boolean isRunning(int c) { return c < SHUTDOWN; } //試圖增長ctl的workerCount字段值。 private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } //嘗試減小ctl的workerCount字段值。 private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } //遞減ctl的workerCount字段。這隻在線程忽然終止時調用(請參閱processWorkerExit)。在getTask中執行其餘遞減。 private void decrementWorkerCount() { do { } while (!compareAndDecrementWorkerCount(ctl.get())); }
Doug Lea大神的設計啊,感受計算機的基礎真的是數學。
Worker繼承了AbstractQueuedSynchronizer,而且實現了Runnable接口。 維護瞭如下三個變量,其中completedTasks由volatile修飾。
//線程這個工做程序正在運行。若是工廠失敗,則爲空。 final Thread thread; //要運行的初始任務。多是null。 Runnable firstTask; //線程任務計數器 volatile long completedTasks;
構造方法:
//使用ThreadFactory中給定的第一個任務和線程建立。 Worker(Runnable firstTask) { //禁止中斷,直到運行工做程序 setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
既然實現了Runnable接口,必然實現run方法:
//Delegates main run loop to outer runWorker public void run() { //核心 runWorker(this); }
先看一眼執行流程圖,再看源碼,會更清晰一點:
首先來看runWorker(Worker w)源碼:
final void runWorker(Worker w) { //獲取當前線程 Thread wt = Thread.currentThread(); //獲取第一個任務 Runnable task = w.firstTask; //第一個任務位置置空 w.firstTask = null; //由於Worker實現了AQS,此處是釋放鎖,new Worker()是state==-1,此處是調用Worker類的 release(1)方法,將state置爲0。Worker中interruptIfStarted()中只有state>=0才容許調用中斷 w.unlock(); //是否忽然完成,若是是因爲異常致使的進入finally,那麼completedAbruptly==true就是忽然完成的 boolean completedAbruptly = true; try { //先處理firstTask,以後依次處理其餘任務 while (task != null || (task = getTask()) != null) { //獲取鎖 w.lock(); //若是池中止,確保線程被中斷;若是沒有,請確保線程沒有中斷。這須要在第二種狀況下從新檢查,以處理清除中斷時的shutdownNow競爭 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //自定義實現 beforeExecute(wt, task); Throwable thrown = null; try { //執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //自定義實現 afterExecute(task, thrown); } } finally { task = null; //任務完成數+1 w.completedTasks++; //釋放鎖 w.unlock(); } } completedAbruptly = false; } finally { //Worker的結束後的處理工做 processWorkerExit(w, completedAbruptly); } }
下面再來看上述源碼中的getTask()與processWorkerExit(w, completedAbruptly)方法:
根據當前配置設置執行阻塞或定時等待任務,或者若是該worker由於任何緣由必須退出,則返回null,在這種狀況下workerCount將遞減。
返回空的狀況:
private Runnable getTask() { // Did the last poll() time out? boolean timedOut = false; for (; ; ) { //獲取線程池狀態 int c = ctl.get(); int rs = runStateOf(c); //僅在必要時檢查隊列是否爲空。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //遞減ctl的workerCount字段 decrementWorkerCount(); return null; } //獲取workerCount數量 int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //線程超時控制 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //嘗試減小ctl的workerCount字段 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //若是有超時控制,則使用帶超時時間的poll,不然使用take,沒有任務的時候一直阻塞,這兩個方法都會拋出InterruptedException Runnable r = timed ?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :workQueue.take(); //有任務就返回 if (r != null) return r; //獲取任務超時,確定是走了poll邏輯 timedOut = true; } catch (InterruptedException retry) { //被中斷 timedOut = false; } } }
爲垂死的worker進行清理和bookkeeping。僅從工做線程調用。除非completedAbruptly被設置,不然假定workerCount已經被調整以考慮退出。此方法從工做集中移除線程,若是線程池因爲用戶任務異常而退出,或者運行的工做池小於corePoolSize,或者隊列非空但沒有工做池, 則可能終止線程池或替換工做池。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // If abrupt, then workerCount wasn't adjusted // true:用戶線程運行異常,須要扣減 // false:getTask方法中扣減線程數量 if (completedAbruptly) //遞減ctl的workerCount字段。 decrementWorkerCount(); //獲取主鎖,鎖定 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //更新完成任務計數器 completedTaskCount += w.completedTasks; //移除worker workers.remove(w); } finally { //解鎖 mainLock.unlock(); } // 有worker線程移除,多是最後一個線程退出須要嘗試終止線程池 tryTerminate(); int c = ctl.get(); // 若是線程爲running或shutdown狀態,即tryTerminate()沒有成功終止線程池,則判斷是否有必要一個worker if (runStateLessThan(c, STOP)) { // 正常退出,計算min:須要維護的最小線程數量 if (!completedAbruptly) { // allowCoreThreadTimeOut 默認false:是否須要維持核心線程的數量 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 若是min ==0 或者workerQueue爲空,min = 1 if (min == 0 && !workQueue.isEmpty()) min = 1; // 若是線程數量大於最少數量min,直接返回,不須要新增線程 if (workerCountOf(c) >= min) return; // replacement not needed } // 添加一個沒有firstTask的worker addWorker(null, false); } }
提交有兩種:
任務執行流程圖:
三步處理:
public void execute(Runnable command) { //任務爲空,拋出異常 if (command == null) throw new NullPointerException(); //獲取線程控制字段的值 int c = ctl.get(); //若是當前工做線程數量少於corePoolSize(核心線程數) if (workerCountOf(c) < corePoolSize) { //建立新的線程並執行任務,若是成功就返回 if (addWorker(command, true)) return; //上一步失敗,從新獲取ctl c = ctl.get(); } //若是線城池正在運行,且入隊成功 if (isRunning(c) && workQueue.offer(command)) { //從新獲取ctl int recheck = ctl.get(); //若是線程沒有運行且刪除任務成功 if (!isRunning(recheck) && remove(command)) //拒絕任務 reject(command); //若是當前的工做線程數量爲0,只要還有活動的worker線程,就能夠消費workerQueue中的任務 else if (workerCountOf(recheck) == 0) //第一個參數爲null,說明只爲新建一個worker線程,沒有指定firstTask addWorker(null, false); } else if (!addWorker(command, false)) //若是線程池不是running狀態 或者 沒法入隊列,嘗試開啓新線程,擴容至maxPoolSize,若是addWork(command, false)失敗了,拒絕當前command reject(command); }
下面詳細看一下上述代碼中出現的方法:addWorker(Runnable firstTask, boolean core)。
檢查是否能夠根據當前池狀態和給定的界限(核心或最大值)添加新worker,若是是這樣,worker計數將相應地進行調整,若是可能,將建立並啓動一個新worker, 並將運行firstTask做爲其第一個任務。 若是池已中止或有資格關閉,則此方法返回false。若是線程工廠在被請求時沒有建立線程,則返回false。若是線程建立失敗,要麼是因爲線程工廠返回null,要麼是因爲異常 (一般是Thread.start()中的OutOfMemoryError)),咱們將回滾。
private boolean addWorker(Runnable firstTask, boolean core) { //很久沒見過這種寫法了 retry: //線程池狀態與工做線程數量處理,worker數量+1 for (; ; ) { //獲取當前線程池狀態與線程數 int c = ctl.get(); //獲取當前線程池狀態 int rs = runStateOf(c); // 僅在必要時檢查隊列是否爲空。若是池子處於SHUTDOWN,STOP,TIDYING,TERMINATED的時候 不處理提交的任務,判斷線程池是否能夠添加worker線程 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; //線程池處於工做狀態 for (; ; ) { //獲取工做線程數量 int wc = workerCountOf(c); //若是線程數量超過最大值或者超過corePoolSize或者超過maximumPoolSize 拒絕執行任務 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //試圖增長ctl的workerCount字段 if (compareAndIncrementWorkerCount(c)) //中斷外層循環 break retry; // Re-read ctl c = ctl.get(); //若是當前線程池狀態已經改變 if (runStateOf(c) != rs) //繼續外層循環 continue retry; //不然CAS因workerCount更改而失敗;重試內循環 } } //添加到worker線程集合,並啓動線程,工做線程狀態 boolean workerStarted = false; boolean workerAdded = false; //繼承AQS並實現了Runnable接口 Worker w = null; try { //將任務封裝 w = new Worker(firstTask); //獲取當前線程 final Thread t = w.thread; if (t != null) { //獲取全局鎖 final ReentrantLock mainLock = this.mainLock; //全局鎖定 mainLock.lock(); try { //持鎖時從新檢查。退出ThreadFactory故障,或者在獲取鎖以前關閉。 int rs = runStateOf(ctl.get()); //若是當前線程池關閉了 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //測試該線程是否活動。若是線程已經啓動而且尚未死,那麼它就是活的。 if (t.isAlive()) throw new IllegalThreadStateException(); //入工做線程池 workers.add(w); int s = workers.size(); //跟蹤最大的池大小 if (s > largestPoolSize) largestPoolSize = s; //狀態 workerAdded = true; } } finally { //釋放鎖 mainLock.unlock(); } //若是工做線程加入成功,開始線程的執行,並設置狀態 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //判斷工做線程是否啓動成功 if (!workerStarted) //回滾工做線程建立 addWorkerFailed(w); } //返回工做線程狀態 return workerStarted; }
再分析回滾工做線程建立邏輯方法:addWorkerFailed(w)。 回滾工做線程建立,若是存在,則從worker中移除worker, 遞減ctl的workerCount字段。,從新檢查終止,以防這個worker的存在致使終止。
private void addWorkerFailed(Worker w) { //獲取全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //若是存在,則從worker中移除worker if (w != null) workers.remove(w); //遞減ctl的workerCount字段。 decrementWorkerCount(); //從新檢查終止 tryTerminate(); } finally { mainLock.unlock(); } }
其中的tryTerminate()方法: 若是是SHUTDOWN或者STOP 且池子爲空,轉爲TERMINATED狀態。若是有條件終止,可是workerCount不爲零,則中斷空閒worker,以確保關機信號傳播。必須在任何可能使終止成爲可能的操做以後調用此方法--在關機期間減小worker數量或從隊列中刪除任務。該方法是非私有的,容許從ScheduledThreadPoolExecutor訪問。
final void tryTerminate() { for (; ; ) { int c = ctl.get(); //若是線程池處於運行中,或者阻塞隊列中仍有任務,返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return; //還有工做線程 if (workerCountOf(c) != 0) { //中斷空閒工做線程 interruptIdleWorkers(ONLY_ONE); return; } //獲取全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //設置ctl狀態TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //方法在執行程序終止時調用,默認什麼都不執行 terminated(); } finally { //完成terminated()方法,狀態爲TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //喚醒全部等待條件的節點 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } //方法在執行程序終止時調用,默認什麼都不執行 protected void terminated() {}
爲給定的命令調用被拒絕的執行處理程序。
final void reject(Runnable command) { handler.rejectedExecution(command, this); }