剛開始新學一門框架,直接看源碼是比較痛苦的,也諮詢過一些前輩,怎樣看源碼,他們說從「入口」看,那麼什麼是入口呢?我摸索了好久,我認爲入口是如下兩個java
initMethod
,通常這裏會作一些變量的賦值,或者起一些服務。這是我總結的兩個「入口」,也是我看源碼的主線,有更好的經驗能夠一塊兒交流。下面我將從這兩條主線來開始源碼的分析。編程
看一下構造器的內容併發
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { if (corePoolSize < 0 ||maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
線程池的初始化須要7個參數,看過上一篇文章的朋友應該對這些參數很熟悉了,在此就不贅述了。框架
在上一篇文章(2.5 入隊)中,咱們知道了當提交一個新任務,什麼狀況下會新建線程,何時塞入工做隊列,什麼狀況下會拒絕任務。請務必提早了解基礎概念,再看下文的源碼說明:ide
首先,咱們要了解下ctl變量表示什麼,代碼中會大量用到。ctl變量是一個AtomicInteger
,就意味着,ctl是原子性,一個int有32位,前3位表示當前線程池的狀態(RUNNING,STOP…),後29位表示當前的線程數(具體實現能夠百度,這裏不是重點),爲何要這樣作呢,由於狀態和線程數是兩個變量,而且這兩個變量的關係是息息相關的,若是分開賦值,那麼將沒法保證原子性。若是要保證原子性,就得上鎖,這樣會大大下降性能。AtomicInteger
類型作到了既保證了原子性,也無需上鎖,是否是很方便(以前我也不理解爲何,直到看到一篇博文,比較贊同這種說法,若是有不一樣意見,能夠一塊兒討論哈)性能
public void execute(Runnable command) { if (command == null) throw new NullPointerException() //先獲取ctl int c = ctl.get(); //1.若是池的工做線程數小於core size,就新建線程來處理這個任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; //若是新建線程失敗,就更新ctl c = ctl.get(); } //2.若是池的工做線程數大於等於core size,而且線程池的狀態是RUNNING, //而且能夠塞進工做隊列,就再從新拉一下ctl,作recheck //2.1若是線程池的狀態不是RUNNING,就從隊列中刪除這個任務,再拒絕這個任務 //2.2若是線程池的狀態是RUNNING,線程池又沒有線程在運行,就新建一個工做線程來處理隊列裏的任務 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3.若是池的工做線程數大於等於core size,但沒法塞入工做隊列, //那麼就新建工做線程來處理這個任務,若是無法新建線程, //意味着這個線程數SHUT DOWN,或者線程池飽和了,咱們就拒絕這個任務 else if (!addWorker(command, false)) reject(command); }
新建線程主要在addWorker
這個方法中完成,若是傳入task,表示新建的線程直接來處理這個新task,若是傳入的是null,那麼這個新建線程就是來處理工做隊列裏的任務。新建線程主要分爲3步:this
1)不停地check線程狀態和線程數,而後將線程數+1(修改ctl)。由於以前的全部操做都沒有上鎖,直到修改線程數以前,任何線程都有機會修改ctl。2)新建包裝類Worker。在此先認爲Worker包裝了一個Thread,一個處理任務的勞動力,詳細的後面再說。而後將worker加入workers。spa
3)啓動worker。線程
private boolean addWorker(Runnable firstTask, boolean core) { //第一步: retry: for (;;) { //再次拉取最新的線程池狀態 int c = ctl.get(); int rs = runStateOf(c); //若是線程池的狀態是RUNNING,或者雖然是SHUTDOWN的狀態,但firstTask是空,工做隊列非空 //(說明這個方法的調用是爲了增長工做線程來處理隊列任務),那就繼續,不然返回false,也就是addWorker失敗 if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())) return false; for (;;) { //獲取線程數,根據傳入的core來和core size/max size作對比,好比在上一步提交任務的時候, //當前線程數 < core size,因而以新建線程來處理新任務的模式傳參給addWorker(), //如今發現線程數變成 >= core size了,固然是不能繼續下去了,以此類推。 int wc = workerCountOf(c); if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize)) return false; //上一步recheck沒問題,那麼就將線程數 +1,跳出全部循環 if (compareAndIncrementWorkerCount(c)) break retry; //線程數+1失敗,說明有其餘線程改變了這個線程數,須要再次判斷當前線程池的狀態和線程數 c = ctl.get(); // Re-read ctl //線程狀態若是沒變,咱們就只重複當前循環check線程數,若是狀態也變了, //咱們就要跳到retry標誌的位置,也就是最外層的循環裏,從新check 線程池的狀態和線程數。 if (runStateOf(c) != rs) continue retry; } } //第二步: boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //在此先認爲Worker包裝了一個Thread,一個處理任務的勞動力,詳細的後面再說, //這裏須要注意的是,firstTask和thread沒有關係。 w = new Worker(firstTask); final Thread t = w.thread; //若是勞動力建立成功 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //如今咱們拿到了mainLock的鎖了,由於接下來,咱們要處理workers,在此以前, //須要再次確認線程數與狀態,以前都沒上鎖,萬一線程池被停了呢? int rs = runStateOf(ctl.get()); //若是狀態是RUNNING或者雖然狀態是SHUTDOWN,可是沒傳入新任務. if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) { //若是線程已經開始跑了(注意,咱們尚未手動啓動線程,執行start方法),就拋出異常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //加入到workers中,workers是一個hashset,裏面儲存了這些幹活勞動力 //而後更新largestPoolSize,就釋放鎖啦 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //第三步: //若是一切順利,線程數改了,勞動力也添加成功了,就來啓動線程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //若是線程啓動失敗,那就回滾以前所作的一些修改操做,並檢查,是否是須要中止這個線程池 if (! workerStarted) addWorkerFailed(w); } //返回新線程的啓動結果 return workerStarted; }
在上一步,咱們知道了處理任務的勞動力Worker,這裏簡單介紹一下Worker,詳細地不說,這裏咱們只瞭解在整個流程中用到的部分。調試
首先Worker實現了Runnable
接口,繼承自AbstractQueuedSynchronizer
,其中有兩個變量Thread thread
和Runnable firstTask
,首先,咱們來看下初始化:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
firstTask是咱們在一開始就提交的任務,若是咱們addWorker()
傳入的是null,那麼這個firstTask也是null。thread就是線程自己,ThreadFactory
可由線程池初始化的時候傳入自定義的工廠,若是沒有,線程池內部會有一個默認的工廠,默認狀況下,會new Thread()
。
這裏引伸一下,自定義的工廠類能夠作什麼呢?雖然咱們能夠藉此修改線程優先級或者守護狀態,可是不建議你這麼作。經常使用的作法,能夠經過自定義工廠傳入自定義的Thread,這裏咱們能夠定製線程的行爲,好比,修改線程的名字,設置自定義UncaughtExceptionHandler
向Logger
寫入信息,維護一些統計信息(包括有多少線程被建立和銷燬),以及在線程被建立或終止時把調試消息寫入日誌。---------------摘自《Java併發編程實戰》
如今咱們來看一下Worker的run()
方法,爲何會調用run()
方法呢?上一步啓動線程不是使用thread.start()
方法嗎?咱們來看一下這段註釋(來自thread.run()
):
/** * If this thread was constructed using a separate * <code>Runnable</code> run object, then that * <code>Runnable</code> object's <code>run</code> method is called; * otherwise, this method does nothing and returns. * <p> * Subclasses of <code>Thread</code> should override this method. * * @see #start() * @see #stop() * @see #Thread(ThreadGroup, Runnable, String) */ @Override public void run() { if (target != null) { target.run(); } }
也就是,若是Thread
對象使用了Runnable
作初始化,那麼在調用Thread.run()
的時候,會調用這個Runnable
對象的run
方法,還記得在Worker類的構造器中的 this.thread = getThreadFactory().newThread(this);
這句嗎?所以,當啓動工做線程的時候,咱們實際是運行的Worker.run()
。
Worker.run()
方法內部只有一句「runWorker(this)
」,因此咱們直接來看runWorker()
方法:
這個方法的主要任務,就是不斷從隊列中領取任務,運行任務。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //若是這個線程在構造的時候沒有傳入task,而且也沒辦法經過getTask()拿到新任務,那麼這個線程就被回收了 //getTask()在這個能夠認爲是從隊列中領取任務,具體實現,下文再說 while (task != null || (task = getTask()) != null) { //在運行任務前,咱們須要上鎖,除非線程池STOP了,不然不容許打斷這個線程 w.lock(); // 若是線程池的狀態是STOP了,確認當前線程是否被打斷,若是沒有被打斷,就打斷這個線程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //beforeExecute方法拋出的異常會致使線程回收 beforeExecute(wt, task); Throwable thrown = null; try { //run task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //運行中的任務若是拋出異常,雖然咱們會將異常傳入Thrown,可是也會致使線程的回收 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
由上面的代碼能夠看出,一個工做線程的大體工做流程是,首先看這個線程初始化的時候有沒有傳入task,沒有的話,就經過getTask()
方法嘗試獲取task。若是拿到這個task,就會先上鎖,而後執行task.run()
,若是沒拿到,那麼這個線程就會經過processWorkerExit()
回收掉。
首先,咱們看下getTask()
方法,有的人可能以爲無非是從隊列裏面取任務,的確這個方法的主要任務是這個,可是要記住線程池大部分的狀況下是不上鎖的,因此咱們要常常檢查線程池的狀態和線程數,還記得在初始化線程池的時候傳入的keep-alive time嗎?這裏就用到了這個參數。
咱們先看一下注釋:
getTask()
方法是用於經過阻塞或者等待時間來獲取任務,至於選擇哪一種模式根據配置而定。getTask()
方法在如下4種狀況下會返回null(也就是這個工做線程會被回收):
1)若是當前線程數大於max size(可能由於setMaximumPoolSize
方法改變了線程池的max size)。
2)線程池停下了(STOP)。
3)線程池SHUTDOWN而且隊列空了。
4)線程等待超時(設置了allowCoreThreadTimeOut或者線程數 > core size)
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //檢查是否符合第2,3種狀況,是就刪除線程數,返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //檢查是否符合第4種狀況 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //檢查是否符合第1種狀況 或者這個線程以前嘗試過獲取任務而且符合第四種狀況。 //若是當前線程數> 1或者隊列已經沒有任務了,就刪除線程數,返回null if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //若是設置了allowCoreThreadTimeOut或者當前線程數 > core size,就使用poll方法拿任務 //不然就用take方法阻塞拿任務 Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //若是poll方法超時,沒拿到任務,就將timedOut設置爲true timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
接下來,咱們看下,若是一個Worker被回收以前會接受怎樣的處理:
private void processWorkerExit(Worker w, boolean completedAbruptly) { //若是是因異常而被回收的線程,那就沒有經歷過線程數的-1的處理,這裏補充完成 if (completedAbruptly) decrementWorkerCount(); //接着從workers中刪除這個勞動力,和添加同樣,須要上鎖。而且記錄總的完成任務數 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //嘗試終止這個線程池,上文提到幾種終止線程池的狀況, //這個方法就是判斷此刻線程池的狀態是否須要終止, //線程被回收有可能意味着要終止線程,因此此處須要判斷,詳細的後文再說 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { //若是是線程是正常退出 if (!completedAbruptly) { //獲取線程池容許的最小線程數 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; //若是當前線程數不小於規定的最小值,就讓這個線程正常退出 if (workerCountOf(c) >= min) return; // replacement not needed } //若是這個勞動力不是正常退出,或者即便是正常退出,可是當前線程數小於規定的最小值, //就補充一個勞動力去處理隊列的任務 addWorker(null, false); } }
總結一下:
上文提到,咱們有四種拒絕任務的策略,如今咱們具體來看一看:
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
reject()
方法內部調用的handler.rejectedExecution()
方法根據handler的不一樣而具體實現也不相同。
1)AbortPolicy:默認策略,直接拋出異常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } 2)CallerRunsPolicy:調用execute(),嘗試將任務丟給線程池處理的線程自己處理任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } 3)DiscardOldestPolicy:將隊列頭部的舊任務丟掉,而後再次嘗試將新任務丟入線程池:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } 4)DiscardPolicy:忽略這個任務,空方法。
前文提到tryTerminate()
方法是用於檢測線程池的當前狀態是否是達到終止條件,若是符合條件,就終止這個線程池。咱們先看下哪些方法調用了tryTerminate()
:
這些方法都有可能致使線程池的終止,如今,咱們來看下tryTerminate
方法的具體實現:
final void tryTerminate() { for (;;) { int c = ctl.get(); //檢查線程池的狀態,若是線程池RUNING,或者是TIDYING,TERMINATED狀態 //或者是SHUTDOWN狀態,可是隊列非空,就退出 if (isRunning(c) ||runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //若是符合終止的條件,可是線程池還有線程,因而打斷其中一個線程,返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } //若是符合終止條件,而且線程池已經沒有線程池了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //先將狀態改成TIDYING,執行terminated方法(可被改寫,默認爲空),再將狀態改成TERMINATED if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
不管是shutdown()
仍是shutdownNow()
方法內部都是使用tryTerminate()
來終止線程池,因此在此就再也不贅述。