Java裏面線程池頂級接口是Executor,但嚴格意義上講Executor並非一個線程池,而是一個線程執行工具,真正的線程池接口是ExecutorService.關係類圖以下:html
首先Executor的execute方法只是執行一個Runnable任務而已,固然從某種角度上講最後的實現類也是在線程中啓動該任務。java
void execute(Runnable command);
ExecutorService在Executor的基礎上增長了一些任務提交方法:緩存
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
這些方法都是向線程池提交任務,與Executor.execute()不一樣的是:Executor方法中在執行完畢後沒有結果,而ExecutorService.submit()方法在任務執行完畢後會返回一個結果。這在多個線程中傳遞狀態和結果是很是喲用。這三種方法都返回一個Future對象,Future對象能夠阻塞線程知道任務執行完畢(得到結果,若是有的話),也能夠取消任務執行,固然也能檢測任務是不是被取消或者是否執行完畢。併發
在沒有Future以前咱們檢測一個線程是否執行完畢一般使用Thread.join()或者用一個死循環加狀態來描述線程任務執行完畢。如今有了更哈的方法來阻塞線程,檢測任務執行完畢甚至取消執行中的或者未開始執行的任務。高併發
另外ExecutorService加入一些做爲一種服務而具備的服務方法,這也是爲何上面說executor Service纔是做爲線程池接口的真正緣由:工具
List<Runnable> shutdownNow(); void shutdown(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
其中shutdown,shutdownNow的區別在於,shutdown對應線程池實現類ThreadPoolExecutor內部的SHUTDOWN狀態,該狀態下,線程池會拒絕接受新任務,並繼續運行真在運行和還沒有運行在blocking queue隊列中的任務,最後中止;而shutdownNow對應線程池內部的STOP狀態,該狀態下,線程池拒絕新任務的提交,並嘗試中止真在運行的任務(前提該任務可interrupt),並取消隊列中等待執行的任務,並返回一個爲執行任務list.oop
一:ThreadPoolExecutor構造器ui
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
下面解釋下一下構造器中各個參數的含義:this
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
二:線程池實現原理源碼解析(1.7)atom
1.線程池狀態和線程數分析:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
AtomicInterger類型的ctl狀態變量,用一個integer(32)來包含線程池狀態和數量的表示,高三位爲線程池狀態位,後(2^29)-1 (大約 500 百萬)爲線程數限制,目前夠用,做者Doug lea表示將來擴展可經過long類型來突破線程數大小限制 (atomic類型保證內存可見性的同時,支持CAS無鎖原子性操做,在源碼中處處充斥了該操做,提升併發性);
下面的幾個static final變量表示runState可能的幾個取值。
當建立線程池後,初始時,線程池處於RUNNING狀態;
若是調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢;
若是調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務;
TIDYING:當線程池全部tasks都terminated,而且線程數爲0的狀態,該狀態下會調用terminated()方法,向terminated狀態轉變。
當terminated()方法執行完畢後,線程池被設置爲TERMINATED狀態。
下面是簡單的狀態轉變狀況:
* RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * STOP -> TIDYING * When pool is empty * TIDYING -> TERMINATED * When the terminated() hook method has completed
下面是ctl變量與狀態,線程數之間包裝和解包裝(一些mask,offset bit操做):
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
狀態判斷方法:
//經過int比較,剛表述了該int由兩部分組成 runState | number ,線程池運行過程當中
//線程數number的變化,並不影響高位狀態runState做爲int的比較.
private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; }
2:任務的執行:
在瞭解將任務提交給線程池到任務執行完畢整個過程以前,咱們先來看一下ThreadPoolExecutor類中其餘的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue;//線程池任務隊列,用於存放等待執行的任務 private final ReentrantLock mainLock = new ReentrantLock();//用於線程池一些統計信息更新時的鎖,e.g.以下的largestPoolSize,complectedTaskNum,ctl的狀態和線程數更新時。 private final HashSet<Worker> workers = new HashSet<Worker>();//線程池工做集,Runnable的實現,裏面封裝了thread變量成員,用於執行任務 private final Condition termination = mainLock.newCondition();//客戶端調用awaitTerminate()時,將阻塞與此,線程池處於terminate後,會調用condition.signalAll()通知(有time參數的另外考慮)。 private int largestPoolSize;//用於記錄線程池運行過程曾出現過得最大線程數,不一樣於maxiumPoolSize private long completedTaskCount; //用於動態記錄線程池完成的任務數 private volatile ThreadFactory threadFactory;//用於建立新的線程,newThread(Runnable). private volatile RejectedExecutionHandler handler;//拒絕任務提交的處理器 private volatile long keepAliveTime;//當線程數大於corePoolsize時,多出那部分線程處於idle狀態時,最大保活時間。 private volatile boolean allowCoreThreadTimeOut;//當線程數小於corePoolsize時,是否容許其也運用保活時間限制。 private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列) private volatile int maximumPoolSize;//線程池運行的最大線程數。
每一個變量的做用都已經標明出來了,這裏要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。
corePoolSize在不少地方被翻譯成核心池大小,其實個人理解這個就是線程池的大小。舉個簡單的例子:
假若有一個工廠,工廠裏面有10個工人,每一個工人同時只能作一件任務。
所以只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人作;
當10個工人都有任務在作時,若是還來了任務,就把任務進行排隊等待;
若是說新任務數目增加的速度遠遠大於工人作任務的速度,那麼此時工廠主管可能會想補救措施,好比從新招4個臨時工人進來;
而後就將任務也分配給這4個臨時工人作;
若是說着14個工人作任務的速度仍是不夠,此時工廠主管可能就要考慮再也不接收新的任務或者拋棄前面的一些任務了。
當這14個工人當中有人空閒時,而新任務增加的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。
這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量忽然過大時的一種補救措施。
不過爲了方便理解,在本文後面仍是將corePoolSize翻譯成核心池大小。
largestPoolSize只是一個用來起記錄做用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關係。
<********************************************進源碼裏面see see*******************************************************************************>
從ThreadPoolExecutor的入口方法execute開始吧,submit()也是封裝下任務仍是調用execute():
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
貼上execute():
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get();//讀取ctl變量 if (workerCountOf(c) < corePoolSize) {//當前線程數和corePoolSize比較,當小於時: if (addWorker(command, true)) //addWorker一個new thread來處理該任務(true的狀況),直接返回;若是addWork返回false(線程池被shutdown or shutdownNow;或者同時又別的客戶端提交任務,而且此時線程數大於corePoolSize);繼續往下執行 return; c = ctl.get();//從新讀取ctl變量 } if (isRunning(c) && workQueue.offer(command)) {//對addWorker返回false的狀況進行判斷,當線程池還運行着,說明是由於thread number 大於corePoolSize的狀況,則&&操做第二個表達式把任務添加到workQueue隊列//中. int recheck = ctl.get();//再次讀取ctl,防止併發哦(能夠看出j.u.c做者Doug lea的強大之處,不愧是併發的世界大牛,思惟如此嚴謹) if (! isRunning(recheck) && remove(command)) //吧任務加入隊列的同時,pool被shutdown, 則從隊列中刪除task,而且調用rejectHandler的方法 reject(command); else if (workerCountOf(recheck) == 0) //若是此時線程數爲0(即設置了allowCorePoolSizeTimeOut爲true的狀況),則追加一個new thread(初始任務爲null) addWorker(null, false); } else if (!addWorker(command, false))//對於其餘addWorker()爲false的狀況,即!isRunning和workQueue.offer()失敗的狀況,再次嘗試addWorker()添加new thread,若是爲false,說明pool被關閉或者達到pool飽和,直接reject. reject(command); }
註解中屢次提到addWorker(),而且根據其返回true,false來進一步判斷添加new thread成功或失敗,說明其中也有判斷pool狀態的代碼,爲了完整性,先貼上reject()方法:
final void reject(Runnable command) { handler.rejectedExecution(command, this);//調用rejectHander的處理方法處理。 }
繼續看一下addWorker(),在execute()中,兩種帶參數的addWorker(),addWorker(null, false),addWoker(command,false),addWorker(command,true);走完addWorker代碼,再來具體討論每一種意義:
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get();//添加new thread前,仍是先讀取pool ctl變量,判斷一個pool state. 5 int rs = runStateOf(c); 6 //主要檢測pool是否處於 >= shutdown 而且 (queue爲空 || firstTask != null)的狀況。 8 if (rs >= SHUTDOWN && //!(rs == Shutdown && firstTask == null && ! workQueue.isEmpty()) == ( rs != shutdwon || fisrtTask != null || workQueue.isEmpty()) 9 ! (rs == SHUTDOWN && //即條件爲 rs >= shutdown && (rs != shutdown || firstTast != null || workQueue.isEmpty()), 10 firstTask == null && //大概分三種狀況吧: 1,rs >= shutdown && rs != shutdown .說明pool狀態大於shutdown,即爲stop ,terminate,tidiy,此時pool將中止等待執行的任務,和
//試着interrupt真正執行的任務因此不必加入 new thread. 11 ! workQueue.isEmpty())) // 2: rs >= shutdown && firstTask,!= null .即爲狀態大於等於shutdown,客戶端調用了shutdown(),而且此時新添加任務,拒絕false;對execute方法添加任務的二次檢測
// 防止添加任務的同時,pool被shutdown. 12 return false; // 3: rs >= shutdown && workQueue.isEmpty(),即爲狀態大於等於shutdown,而且等待執行的任務爲0,支持也麼必要添加new thread了。 13 //其餘組合方法大概也差很少, 14 for (;;) { //剩下來的狀況爲 1: pool state < shutdown(還運行着) 2 :shutdown狀態,但queue不爲空。 15 int wc = workerCountOf(c); //讀取線程數。 16 if (wc >= CAPACITY || 17 wc >= (core ? corePoolSize : maximumPoolSize)) //若是線程數大於總大小限制2^29-1,或者根據core變量來判斷是否大於 corePoolsize, maixmuPoolSize是否添加thread. 18 return false; 19 if (compareAndIncrementWorkerCount(c)) //cas操做,增長thread++成功,直接跳出外層循環;失敗的話,說明ctl有改變,要麼組成c的state變化,或thread number有變化 20 break retry; 21 c = ctl.get(); // ctl變化時,從新讀取ctl值 22 if (runStateOf(c) != rs)//state變化時,從新從外部循環開始,從新判斷pool狀態,在來判斷。 23 continue retry; 24 // 剩下cas操做失敗因爲thread number變化,從新內部循環,從新讀取workerCount (思惟嚴謹,徹底體如今代碼邏輯組織關係上,膜拜Doug lea) 25 } 26 } 27 //能夠添加的thread的狀況 28 boolean workerStarted = false; 29 boolean workerAdded = false; 30 Worker w = null; 31 try { 32 final ReentrantLock mainLock = this.mainLock; 33 w = new Worker(firstTask);//new thread的封裝 34 final Thread t = w.thread; 35 if (t != null) { //判斷threadFactory.newThread是否爲null。 36 mainLock.lock(); 37 try { 38 41 int c = ctl.get(); //得到lock後,再次檢查ctl state.防止得到mainLock以前,pool關閉, 42 int rs = runStateOf(c); 43 44 if (rs < SHUTDOWN || 45 (rs == SHUTDOWN && firstTask == null)) { 46 if (t.isAlive()) // 檢查thread狀態。 47 throw new IllegalThreadStateException(); 48 workers.add(w); 49 int s = workers.size(); 50 if (s > largestPoolSize)//更新線程池最大線程數 51 largestPoolSize = s; 52 workerAdded = true; //添加成功標誌 53 } 54 } finally { 55 mainLock.unlock(); 56 } 57 if (workerAdded) { 58 t.start();//此時有可能出現OOM異常,workerStart將得不到true賦值。 59 workerStarted = true; 60 } 61 } 62 } finally { 63 if (! workerStarted) //若是出現爲false,OOM異常,或者ThreadFactory返回null thread, 64 addWorkerFailed(w); //清理worker工做 65 } 66 return workerStarted; 67 }
如今來看看execute出現三種addWorker. 1:addWorkder(command,true),以command(runnable)爲參數建立Worker,並以command爲初始執行的任務,true參數爲 根據corePoolSize參數判斷是否添加new worker。
2:addWorker(null, false),實在workercountOf() == 0狀況下添加一個初始任務爲null的worker來執行剛添加到隊列中的任務(可能出現添加任務時,一個線程剛死掉timeOut了) 3:addWorker(command,false),是在添加task到隊列中失敗時,經過addWorker來判斷,若是失敗,說明pool shutdown或者超容量了。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; //在addWorker中也是經過mainLock開鎖定,來實現添加線程和shutdown同步互斥的。 mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
Worker添加失敗的清理工做:
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w);//清除hashset<worker>中的該工做worker. decrementWorkerCount();//workerCount--. tryTerminate();//檢查是否pool termination,防止該worker中有terminate操做。 } finally { mainLock.unlock(); } }
final void tryTerminate() { for (;;) { //轉到terminated狀態的兩種狀況(1:shutdown狀態,queue空,pool空; 2:STOP狀態,pool空) int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //1:當處於running狀態直接返回,不可能有轉向terminated的趨勢, 2: 狀態至少爲TIDYING,此時就差terminated()方法調用或者 return; //已經處於terminated狀態直接返回, 3:處於shutdown,而且隊列不爲空,還得執行隊列中的任務呢,返回。 if (workerCountOf(c) != 0) { // 到這步指的哪些 shutdown & queue.isEmpty() & pool(workerCounter) != 0 和 stop狀態 & pool != 0的狀況,(這兩有轉往terminated狀態的趨勢,只要kill workerHashSet) interruptIdleWorkers(ONLY_ONE);//一個個的清除空閒線程 return; } //到這裏就是上面指的 (shutdown & queue.isEmpty() & workerCounter == 0) || (stop & workerCounter == 0)的狀況 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //置於TIDYING狀態(ctl值) try { terminated();//調用terminated(),轉向terminated狀態 } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll();//通知哪些阻塞awaitTerminate()方法客戶端。 } return; } } finally { mainLock.unlock(); } // 上面cas設置ctl的TIDYING狀態時失敗,for循環操做。 } }
漂亮的cas失敗重試操做,再來看看Worker的代碼:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable //AQS之後在分析,實現runnable. { volatile long completedTasks;//Worker完成的task數量,用於線程池統計。 Worker(Runnable firstTask) { setState(-1); //初始狀態 -1,防止在調用runWorker(),也就是真正執行task前中斷thread。 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);//thread封裝 } public void run() { runWorker(this);//實際的task操做,調用thread pool的loop操做,有初始task,先幹它,而後在不斷循環取隊列中task幹。 }
//0 表明無鎖狀態, 1:上鎖狀態 AQS方法。 protected boolean isHeldExclusively() { return getState() != 0; } //底下一些阻塞,非阻塞上鎖,去鎖操做 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } //用於shutdownNow,中斷當前真在執行的線程,(-1狀態下不可中斷) void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
既然addWorker中start了Worker,就繼續看看真在的run()中的runWork()吧:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); //容許中斷了 boolean completedAbruptly = true;//是否在task運行過程有異常. try { while (task != null || (task = getTask()) != null) {//loop 初始task不爲null,先執行初始task,執行完在loop在隊列中去task來執行 w.lock();//該鎖,用於識別該worker是否爲idle worker,當lock時,說明該worker真在幹活呢,不是idle,和interruptIdleWorkers(boolean)相呼應,interrupt idle worker時,會嘗試tryLock(). if ((runStateAtLeast(ctl.get(), STOP) || // 1: 當pool處於STOP或者更高狀態時,確保該線程被interrupted. (Thread.interrupted() && // 2: 當pool處於STOP之下的狀態時,確保該thread沒有被中斷。a: 當第一個runStateAtLeast(Stop)返會true,直接跳到!wt.isInterrupted()來判斷 runStateAtLeast(ctl.get(), STOP))) && //是否中斷過,若是爲中斷過,結果爲false,直接跳過; b:當沒有中斷過,返回true,中斷該線程; c: 當第一個runstateAtLeast(stop)爲false時,即 !wt.isInterrupted()) //state < stop,未執行的任務得照常執行,執行|| 後面的(),條件2說了確保處於running 或shutdown狀態下,確保該線程未被interrupt過(爲何得 wt.interrupt(); // 確保執行線程未被中斷過呢,由於shutdownNow()方法調用時會嘗試interruptIfStart()方法,而該方法interrupt條件其中之一爲!t.isInterrupted try { // ,就是說若是shutdownNow調用時,真正運行的thread的interrpt標誌已經存在,則不會中斷真在運行的thread.那什麼東西給線程設置的interrupt標誌呢? beforeExecute(wt, task); //大概也許貌似是Future.cancel()。) Throwable thrown = null; try { //beforeExecute(),afterExecute()能夠在task執行先後作一些工做,例如統計,日誌什麼的,看你心情。但若是beforeExecute中throw出異常的話, task.run(); //能夠致使thread game over die的,而且task並不會執行,此時的complectedAbruptly 爲true,標誌死的太忽然, } catch (RuntimeException x) { //task執行可能會跑出異常,三種封裝交給Thread.uncaughtedExceptionHandler處理, thrown = x; throw x; //afterExecute也能拋出異常,致使thread die。 task的異常(有的話)會交給afterExecute,因此能夠作一些記錄啊什麼的。 } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++;// 統計工做 w.unlock(); } } completedAbruptly = false; //這裏有退出方式 1:beforeExecute,afterExecute, task run時throw execption, 這幾種死法對應completedAbruptly 爲true. } finally { // 2:getTask() == null ,退出while loop,此時 completedAbruptly爲false. 不一樣的死法對應了底下processWorkerExit()不一樣的處理。 processWorkerExit(w, completedAbruptly); } }
接着看getTask():
private Runnable getTask() { boolean timedOut = false; //該標籤loop嵌套循環,用於標誌是否在blockingQueue等待超時。 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 檢測state是否大於等於STOP,或者SHUTDWON & taskQueue.isEMpty().這兩種能夠就清場,不幹活了。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); //若是是的話,該thread 準備死了,cas操做,失敗loop重試,減少workerCount,爲它死作好鋪墊嘛。 return null; } boolean timed; //是否有多餘corePoolSize的thread,或者allowCorePoolSize,則爲true(== 0) for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) // wc <= maximunPoolSize && (! timedOut || ! timed) 即線程數大於maxiumPoolSize,或者在keepAlive時間內沒獲得task,而且 break; //timed爲true(即有多餘的線程),則該線程準備也死了; 不然跳出內部循環去等待task. if (compareAndDecrementWorkerCount(c)) //cas操做,失敗loop重試 return null; 返回null c = ctl.get(); if (runStateOf(c) != rs) //狀態改變,外部循環從新開始 continue retry;
//workerCounter的改變,內部loop。
} try { Runnable r = timed ? //若是有多餘的線程(即workercounter > corepoolSize 或者allowCorePoolSizeTimeOut),則等待keepAlive時間返回。好讓他們沒task時,去死嘛!,不然即wc <= corePoolSize & workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // allowCorePoolTimeOut 爲false ,則一直阻塞下去,知道有task. workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
在來看看該死的thread的清場:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 若是死的忽然,即task執行 begin,執行中和,after有異常。 decrementWorkerCount(); //workerCounter --. final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks;//統計工做 workers.remove(w);//移除屍體 } finally { mainLock.unlock(); } tryTerminate(); //檢查terminated狀態的趨勢。 int c = ctl.get(); if (runStateLessThan(c, STOP)) { //狀態判斷 ,小於STOP,等待的task仍是得運行的。 if (!completedAbruptly) { //不是異常死的 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 檢查系統最低線程數,1:allowCoreThreadTimeout, 最低爲0 ,2: 爲false,最低爲corePoolSize, if (min == 0 && ! workQueue.isEmpty()) //allowCoreThreadTimeOut爲true, 若是隊列不爲空,最小爲1,(人家還有task要作,又不stop狀態,好歹給人家一個線程運行啊) min = 1; if (workerCountOf(c) >= min) return; // workercounter > min的話,沒必須補充能量 } addWorker(null, false); //補充線程(無初始任務的)來執行隊列中的任務 } }
至此線程池大致流程以結束。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers();//中斷全部線程啊??? tasks = drainQueue();//提取隊列中的任務,返回 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted();//worker的方法,即運行了才interrupt. } finally { mainLock.unlock(); } }
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) //條件判斷,不是一會兒就閉着眼睛就去阻塞等待 return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); //等待,signalAll來拯救 } } finally { mainLock.unlock(); } }
大致完畢,謝謝!