咱們的程序裏,時常要使用多線程。所以多線程的管理變的尤其重要。ThreadPoolExecutor很好的解決了這一點。本篇文章主要從源碼入手,分析ThreadPoolExecutor的原理。算法
###1.標記和構造方法####編程
和不少狀態對象同樣,ThreadPoolExecutor也經過一個int的頭3位來記錄線程池的狀態,後面20多位來標記工做線程數量。而且提供通用的位運算接口來得到你所須要的數據。多線程
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
咱們先來看下ThreadPoolExecutor的構造方法,這裏彷佛咱們又要老生常談了,網上已經有不少關於線程池各個參數的介紹了,這裏,非墨仍是會再說一遍,這樣加深一下你們的印象。oop
public ThreadPoolExecutor(int corePoolSize, //核心線程數量 int maximumPoolSize,//最大線程數量 long keepAliveTime,//存活時間 TimeUnit unit,//存活時間單位 BlockingQueue<Runnable> workQueue,//工做隊列 ThreadFactory threadFactory,//線程構造工廠 RejectedExecutionHandler handler//拒絕請求回調 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
###2.執行流程####post
按照咱們熟知的線程池機制, 1.當請求被post到咱們的線程池中,咱們的線程池會先生成一個核心線程來執行它 2.當核心線程滿了的時候,將會把這個請求放入到咱們的工做請求隊列workQueue中。 3.若是你提供的隊列是一個有界隊列的時候,線程池將會判斷你的最大線程數是否超過你的核心線程。若是超過核心線程的話,線程池會生成新的線程去執行它。 4.若是這個時候,已經達到了最大線程數,那麼線程池將走到拒絕回調 5.若是線程池的最大線程數不大於核心線程數,而且工做隊列已滿,那麼將直接走拒絕回調ui
實際上這個流程已經在ThreadPoolExecutor.execute方法註釋中有詳細的說明。即使沒有說明,咱們也能夠從它的代碼流程簡單看出一些端倪:this
//code ThreadPoolExecutor.execute(Runnable) int c = ctl.get();//獲取當前運行線程數 if (workerCountOf(c) < corePoolSize) {//若是當前線程數小於核心線程數 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//將請求命令加入到工做隊列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false))//加入到非核心線程中 reject(command);//若是非核心線程沒有執行,那麼將走拒絕請求回調
###3.深刻源碼###線程
咱們以execute爲入口,深刻分析一下這個線程池的源碼。int c = ctl.get()方法咱們暫時不說,後面咱們將會補充,咱們暫且把它理解爲得到一個數量,而這個數量c將會傳入到workerCountOf方法中。這個方法名稱咱們就能知道其用意,就是爲了得到當前工做線程數量。code
private static int workerCountOf(int c) { return c & CAPACITY; }
上文咱們說到,線程池會經過一個int的後幾位來記錄線程數量,而workerCountOf就是經過位運算來得到當前工做線程數。在得到當前線程數了之後,若是當前線程數小於 corePoolSize的話,將會經過addWorker方法把command加入到工做線程中。addWorker須要提供兩個參數,一個是你的command,另一個boolean量是爲了標識是不是往core線程中加。對象
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get();//得到一個含有狀態和數量的值 int rs = runStateOf(c);//得到當前線程池狀態 ... for (;;) {//第二個for int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } }
這裏,經過上面的代碼咱們能夠清晰的瞭解ctl變量的存在的目的: 1.首先,當從類型上看clt是一個原子類型,說明它是要支持多線程調用的 2.ctl裏面的值須要存儲兩個信息,一個是線程數量,一個是當前線程池的工做狀態。
這時候是否有讀者還在納悶,爲何個人線程數小於個人核心線程數,我往個人線程池裏加,仍是可能出現加不進去的狀況。事實上,「第二個for」循環已經很好的說明了這一點。由於線程池不能保證是同一個線程調用addWorker方法。線程池須要同步事後,才能保證是不是否往核心線程裏面加。這就是爲何在ThreadPoolExecutor.execute方法裏,在判斷完核心線程數量以後,若是失敗了,還要再取一次當前線程數的緣由。
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get();//再取一次 }
好的,咱們繼續回到"第二個for"。咱們能夠看出,線程池在同步方面不只細化了粒度,並且用的是CAS算法。這種算法能夠勁量的避免因爲sync引發的線程阻塞。
for (;;) {//第二個for int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //當線程池數量超過核心線程的時候退出,返回false if (compareAndIncrementWorkerCount(c)) break retry;//當增長線程成功的時候,跳出循環 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry;//狀態不一致繼續循環 // else CAS failed due to workerCount change; retry inner loop }
因爲咱們如今只有一個線程在工做,不存在多線程競爭的狀況,所以咱們選擇跳出循環的邏輯。跳出循環之後,程序將真正意義上的生成一個Worker線程來執行指令。
//code private boolean addWorker(Runnable firstTask, boolean core) boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);//生成一個worker對象 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//鎖定全局鎖 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());//檢查線程池狀態 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//將Worker線程歸入workers集合對象管理 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s;//從新賦值largestPoolSize變量 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
上面的代碼很是簡單,線程池將生成一個Worker的線程包裝類。不管是是不是核心線程,全部的線程都被歸入到workers集合對象進行管理。若是一切流程都正常workerAdded將爲true,Worker裏的線程將被啓動。啓動後Worker將執行線程的run方法,而在run方法中,又調用到Worker的runWorker(Worker)方法:
public void run() { runWorker(this); }
runworker是真正的線程執行流程的代碼段:
// code runworker try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();//判斷當前線程池狀態, try { beforeExecute(wt, task);//執行前切面 Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//執行後切面 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }
runworker方法中引伸出來兩個方法beforeExecute和afterExecute。能夠經過繼承的方式來監控command的執行。至關於在command.run以前和以後切了兩個面,是一種面向方面的編程模式。當Task執行完成以後,因爲while循環,將再次執行while的判斷條件task = getTask()) != null; getTask方法是可能阻塞的,阻塞的時間是根據你在構造線程池的時候設置的超時時間來決定的。
private Runnable getTask() { boolean timedOut = false; //是否判斷超時 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //allowCoreThreadTimeOut變量用於控制是否讓核心線程也進行超時判斷 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ?//經過timed變量來選擇使用poll方法仍是take方法 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true;//若是poll獲取的r爲空,標記爲超時 } catch (InterruptedException retry) { timedOut = false; } } } >getTask仍是一個循環操做,第一次執行的時候,會經過timed變量來判斷是否有超時檢查,若是有超時檢查的話將調用poll方法。若是poll在規定的時間內並無得到任何的執行對象,返回的r爲null,timedOut將被標記爲true。這時候,又再次進入循環。這時候,若是你是非核心線程,是擴展線程的話,那麼,if ((wc > maximumPoolSize || (timed && timedOut))這個判斷爲true,程序將返回一個null。 在runWorker方法中,若是getTask返回的對象爲null,runWorker將跳出while循環,執行finally語句:
finally {
processWorkerExit(w, completedAbruptly); }
>processWorkerExit方法須要傳遞兩個變量,第一個變量是Worker對象,第二個變量是completedAbruptly變量,這個變量是幹什麼用的呢?由於你的程序跳出可能存在兩種狀況,一種是正常跳出,一種是異常跳出,若是是異常跳出的話,這個時候你的workercount未必正常的執行decrement操做,所以經過這個變量來標記程序的執行狀態。
//code processWorkerExit final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); }
mainLock是一個全局鎖,主要是爲了同步全局的workers變量。上面的代碼中,線程池將記錄一下task執行數據,而且將worker從workers隊列中刪除。 這個時候,基本上整個線程池的流程都已經概述完了,固然,咱們還確實一個變量,那就是RejectedExecutionHandler類型變量。這個得回到咱們的execute方法:
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);//拒絕請求
>當線程池拒絕請求的時候,將調用reject方法,而reject方法將會回調RejectedExecutionHandler的rejectedExecution方法:
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
線程池提供一個默認的拒絕請求回調:
//code ThreadPoolExecutor private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); //code AbortPolicy public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
也就是採用異常的方式來拒絕請求。 這樣,ThreadPoolExecutor的主要源碼和結構已經分析完了,固然還有其餘的特性和功能須要看官們本身去探索。 -----非墨