目錄html
線程是併發編程的基礎,前面的文章裏,咱們的實例基本都是基於線程開發做爲實例,而且都是使用的時候就建立一個線程。這種方式比較簡單,可是存在一個問題,那就是線程的數量問題。java
假設有一個系統比較複雜,須要的線程數不少,若是都是採用這種方式來建立線程的話,那麼就會極大的消耗系統資源。首先是由於線程自己的建立和銷燬須要時間,若是每一個小任務都建立一個線程,那麼就會大大下降系統的效率。其次是線程自己也是佔用內存空間的,大量的線程運行會搶佔內存資源,處理不當極可能會內存溢出,這顯然不是咱們想看到的。編程
那麼有什麼辦法解決呢?有一個好的思路就是對線程進行復用,由於全部的線程並不都是同一時間一塊兒運行的,有些線程在某個時刻多是空閒狀態,若是這部分空閒線程能有效利用起來,那麼就能讓線程的運行被充分的利用,這樣就不須要建立那麼多的線程了。咱們能夠把特定數量的線程放在一個容器裏,須要使用線程時,從容器裏拿出空閒線程使用,線程工做完後不急着關閉,而是退回到線程池等待使用。這樣的容器通常被稱爲線程池。用線程池來管理線程是很是有效的方法,用一張圖片能夠簡單的展現出線程池的管理流程:
緩存
Java中也有一套框架來控制管理線程,那就是Executor框架。Executor框架是JDK1.5以後才引入的,位於java.util.cocurrent 包下,能夠經過該框架來控制線程的啓動、執行和關閉,從而簡化併發編程的操做,這是它的核心成員類圖:
多線程
Executor:最上層的接口,定義了一個基本方法execute,接受一個Runnable參數,用來替代一般建立或啓動線程的方法。併發
ExecutorService:繼承自Executor接口,提供了處理多線程的方法。框架
ScheduledExecutorService:定時調度接口,繼承自ExecutorService。ide
AbstractExecutorService:執行框架的抽象類。函數
ThreadPoolExecutor:線程池中最核心的一個類,提供了線程池操做的基本方法。高併發
Executors:線程池工廠類,可用於建立一系列有特定功能的線程池。
以上Executor框架中的基本成員,其中最核心的的成員無疑就是ThreadPoolExecutor,想了解Java中線程池的運行機制,就必須先了解這個類,而最好的瞭解方式無疑就是看源碼。
打開ThreadPoolExecutor的源碼,發現類中提供了四個構造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
能夠看出,ThreadPoolExecutor的構造函數中的參數仍是比較多的,而且最核心的是第四個構造函數,其中完成了底層的初始化工做。
下面解釋一下構造函數參數的含義:
execute
方法提交的任務。經常使用的有三種隊列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。threadFactory:線程工廠,用於建立線程。
handler:拒絕策略,當任務太多來不及處理時所採用的處理策略。
看完了構造函數,咱們來看下ThreadPoolExecutor類中幾個重要的成員變量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl:控制線程運行狀態的一個字段。同時,根據下面的幾個方法runStateOf
,workerCountOf
,ctlOf
能夠看出,該字段還包含了兩部分的信息:線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount),而且使用的是Integar類型,高3位保存runState,低29位保存workerCount。
COUNT_BITS:值爲29的常量,在字段CAPACITY
被引用計算。
CAPACITY:表示有效線程數量(workerCount)的上限,大小爲 (1<<29) - 1。
下面5個變量表示的是線程的運行狀態,分別是:
用一個狀態轉換圖表示大概以下 (圖片來源於http://www.javashuo.com/article/p-cholswre-gb.html):
構造函數和基本參數都瞭解後,接下來就是對類中重要方法的研究了。
execute方法
ThreadPoolExecutor類的核心調度方法是execute(),經過調用這個方法能夠向線程池提交一個任務,交由線程池去執行。而ThreadPoolExecutor的工做邏輯也能夠藉由這個方法來一步步理清。這是方法的源碼:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //獲取ctl的值,前面說了,該值記錄着runState和workerCount int c = ctl.get(); /* * 調用workerCountOf獲得當前活動的線程數; * 當前活動線程數小於corePoolSize,新建一個線程放入線程池中; * addWorker(): 把任務添加到該線程中。 */ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; //若是上面的添加線程操做失敗,從新獲取ctl值 c = ctl.get(); } //若是當前線程池是運行狀態,而且往工做隊列中添加該任務 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); /* * 若是當前線程不是運行狀態,把任務從隊列中移除 * 調用reject(內部調用handler)拒絕接受任務 */ if (! isRunning(recheck) && remove(command)) reject(command); //獲取線程池中的有效線程數,若是爲0,則執行addWorker建立一個新線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* * 若是執行到這裏,有兩種狀況: * 1. 線程池已經不是RUNNING狀態; * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize而且workQueue已滿。 * 這時,再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize; * 若是失敗則拒絕該任務 */ else if (!addWorker(command, false)) reject(command); }
簡單歸納一下代碼的邏輯,大概是這樣:
一、判斷當前運行中的線程數是否小於corePoolSize,是的話則調用addWorker建立線程執行任務。
二、不知足1的條件,就把任務放入工做隊列workQueue中。
三、若是任務成功加入workQueue,判斷線程池是不是運行狀態,不是的話先把任務移出工做隊列,並調用reject方法,使用拒絕策略拒絕該任務。線程若是是非運行中,調用addWorker建立一個新線程。
四、若是放入workQueue失敗 (隊列已滿),則調用addWorker建立線程執行任務,若是這時建立線程失敗 (addWorker傳進去的第二個參數值是false,說明這種狀況是當前線程數不小於maximumPoolSize),就會調用reject(內部調用handler)拒絕接受任務。
整個執行流程用一張圖片表示大體以下:
以上就是execute方法的大概邏輯,接下來看看addWorker的方法實現。
addWorker方法
源碼以下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); /**線程池狀態不爲SHUTDOWN時 * 判斷隊列或者任務是否爲空,是的話返回false */. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); /* 這裏能夠看出core參數決定着活動線程數的大小比較對象 * core爲true表示與 corePoolSize大小進行比較 * core爲false表示與 maximumPoolSize大小進行比較 * 當前活動線程數大於比較對象就返回false */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 嘗試增長workerCount,若是成功,則跳出第一個for循環 if (compareAndIncrementWorkerCount(c)) break retry; // 若是增長workerCount失敗,則從新獲取ctl的值 c = ctl.get(); // Re-read ctl // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //建立一個worker對象w w = new Worker(firstTask); //實例化w的線程t final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers是一個HashSet,保存着任務的worker對象 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //啓動線程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
從代碼中能夠看出,addWorker方法的主要工做是在線程池中建立一個新的線程並執行,其中firstTask參數指定的是新線程須要執行的第一個任務,core參數決定於活動線程數的比較對象是corePoolSize仍是maximumPoolSize。根據傳進來的參數首先對線程池和隊列的狀態進行判斷,知足條件就新建一個Worker對象,並實例化該對象的線程,最後啓動線程。
Worker類
根據addWorker源碼中的邏輯,咱們能夠發現,線程池中的每個線程其實都是對應的Worker對象在維護的,因此咱們有必要對Worker類一探究竟,先看一下類的源碼:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
從Worker類的構造函數能夠看出,當實例化一個Worker對象時,Worker對象會把傳進來的Runnable參數firstTask
賦值給本身的同名屬性,而且用線程工廠也就是當前的ThreadFactory來新建一個線程。
同時,由於Worker實現了Runnable接口,因此當Worker類中的線程啓動時,調用的實際上是run()方法。run方法中調用的是runWorker
方法,咱們來看下它的具體實現:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //獲取第一個任務 Runnable task = w.firstTask; w.firstTask = null; //容許中斷 w.unlock(); // allow interrupts //是否由於異常退出循環的標誌,processWorkerExit方法會對該參數作判斷 boolean completedAbruptly = true; try { //判斷task是否爲null,是的話經過getTask()從隊列中獲取任務 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /* 這裏的判斷主要邏輯是這樣: * 若是線程池正在中止,那麼就確保當前線程是中斷狀態; * 若是不是的話,就要保證不是中斷狀態 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //用於記錄任務執行前須要作哪些事,屬於ThreadPoolExecutor類中的方法, //是空的,須要子類具體實現 beforeExecute(wt, task); Throwable thrown = null; try { //執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
總結一下runWorker方法的運行邏輯:
一、經過while循環不斷地經過getTask()方法從隊列中獲取任務;
二、若是線程池正在中止狀態,確保當前的線程是中斷狀態,不然確保當前線程不中斷;
三、調用task的run()方法執行任務,執行完畢後須要置爲null;
四、循環調用getTask()取不到任務了,跳出循環,執行processWorkerExit()方法。
過完runWorker()的運行流程,咱們來看下getTask()是怎麼實現的。
getTask方法
getTask()方法的做用是從隊列中獲取任務,下面是該方法的源碼:
private Runnable getTask() { //記錄上次從隊列獲取任務是否超時 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //將workerCount減1 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? /* timed變量用於判斷線程的操做是否須要進行超時判斷 * allowCoreThreadTimeOut無論它,默認是false * wc > corePoolSize,當前線程是若是大於核心線程數corePoolSize */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* 根據timed變量判斷,若是爲true,調用workQueue的poll方法獲取任務, * 若是在keepAliveTime時間內沒有獲取到任務,則返回null; * timed爲false的話,就調用workQueue的take方法阻塞隊列, * 直到隊列中有任務可取。 */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //r爲null,說明time爲true,超時了,把timedOut也設置爲true timedOut = true; } catch (InterruptedException retry) { //發生異常,把timedOut也設置爲false,從新跑循環 timedOut = false; } } }
getTask的代碼看上去比較簡單,但其實內有乾坤,咱們來重點分析一下兩個if判斷的邏輯:
一、當進入getTask方法後,先判斷當前線程池狀態,若是線程池狀態rs >= SHUTDOWN,再進行如下判斷:
1)rs 的狀態是否大於STOP;2)隊列是否爲空;
知足以上條件其中之一,就將workerCount減1並返回null,也就是表示隊列中再也不有任務。由於線程池的狀態值是SHUTDOWN以上時,隊列中再也不容許添加新任務,因此上面兩個條件知足一個都說明隊列中的任務都取完了。
二、進入第二個if判斷,這裏的邏輯有點繞,但做用比較重要,是爲了控制線程池的有效線程數量,咱們來具體解析下代碼:
wc > maximumPoolSize
:判斷當前線程數是否大於maximumPoolSize,這種狀況通常不多發生,除非是maximumPoolSize的大小在該程序執行的同時被進行設置,好比調用ThreadPoolExecutor中的setMaximumPoolSize
方法。
timed && timedOut
:若是爲true,表示當前的操做須要進行超時判斷,而且上次從隊列獲取任務已經超時。
wc > 1 || workQueue.isEmpty()
:若是工做線程大於1,或者阻塞隊列是空的。
compareAndDecrementWorkerCount
:比較並將線程池中的workerCount減1
在上文中,咱們解析execute方法的邏輯時瞭解到,若是當前線程池的線程數量超過了corePoolSize且小於maximumPoolSize,而且workQueue已滿時,仍然能夠增長工做線程。
但調用getTask()取任務的過程當中,若是超時沒有獲取到任務,也就是timedOut爲true的狀況,說明workQueue已經爲空了,也就說明了當前線程池中不須要那麼多線程來執行任務了,能夠把多於corePoolSize數量的線程銷燬掉,也就是不斷的讓任務被取出,讓線程數量保持在corePoolSize便可,直到getTask方法返回null。
而當getTask方法返回null後,runWorker方法中就會由於取不到任務而執行processWorkerExit()方法。
processWorkerExit方法
processWorkerExit方法的做用主要是對worker對象的移除,下面是方法的源碼:
private void processWorkerExit(Worker w, boolean completedAbruptly) { //是異常退出的話,執行程序將workerCount數量減1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 從workers的集合中移除worker對象,也就表示着從線程池中移除了一個工做線程 workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
至此,從executor方法開始的整個運行過程就完畢了,總結一下該流程:
執行executor --> 新建Worker對象,並實例化線程 --> 調用runWorker方法,經過getTask()獲取任務,並執行run方法 --> getTask()方法中不斷向隊列取任務,並將workerCount數量減1,直至返回null --> 調用processWorkerExit清除worker對象。
用一張流程圖表示以下所示 (圖片來源於http://www.javashuo.com/article/p-cholswre-gb.html):
前面咱們屢次提到了workQueue,這是一個任務隊列,用來存放等待執行的任務,它是BlockingQueue類型的對象,而在ThreadPoolExecutor的源碼註釋中,詳細介紹了三種經常使用的Queue類型,分別是:
SynchronousQueue:直接提交的隊列。這個隊列沒有容量,當接收到任務的時候,會直接提交給線程處理,而不保留它。若是沒有空閒的線程,就新建一個線程來處理這個任務!若是線程數量達到最大值,就會執行拒絕策略。因此,使用這個類型隊列的時候,通常都是將maximumPoolSize通常指定成Integer.MAX_VALUE,避免容易被拒絕。
ArrayBlockingQueue:有界的任務隊列。須要給定一個參數來限制隊列的長度,接收到任務的時候,若是沒有達到corePoolSize的值,則新建線程 (核心線程) 執行任務,若是達到了,則將任務放入等待隊列。若是隊列已滿,則在總線程數不到maximumPoolSize的前提下新建線程執行任務,若大於maximumPoolSize,則執行拒絕策略。
LinkedBlockingQueue:無界的任務隊列。該隊列沒有任務數量的限制,因此任務能夠一直入隊,知道耗盡系統資源。當接收任務,若是當前線程數小於corePoolSize,則新建線程處理任務;若是當前線程數等於corePoolSize,則進入隊列等待。
當線程池的任務隊列已滿而且線程數目達到maximumPoolSize時,對於新加的任務通常會採起拒絕策略,一般有如下四種策略:
ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow():
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
代碼邏輯就不一一進行解析了,總結一下兩個方法的特色就是:
ThreadPoolExecutor的運行機制講完了,接下來展現一下如何用ThreadPoolExecutor建立線程池實例,具體代碼以下:
public static void main(String[] args) { ExecutorService service = new ThreadPoolExecutor(5, 10, 300, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); //用lambda表達式編寫方法體中的邏輯 Runnable run = () -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "正在執行"); } catch (InterruptedException e) { e.printStackTrace(); } }; for (int i = 0; i < 10; i++) { service.execute(run); } //這裏必定要作關閉 service.shutdown(); }
上面的代碼中,咱們建立的ThreadPoolExecutor線程池的核心線程數爲5個,因此,當調用線程池執行任務時,同時運行的線程最多也是5個,執行main方法,輸出結果以下:
pool-1-thread-3正在執行 pool-1-thread-1正在執行 pool-1-thread-4正在執行 pool-1-thread-5正在執行 pool-1-thread-3正在執行 pool-1-thread-2正在執行 pool-1-thread-1正在執行 pool-1-thread-4正在執行 pool-1-thread-5正在執行
看到出來,線程池確實只有5個線程在工做,也就是真正的實現了線程的複用,說明咱們的ThreadPoolExecutor實例是有效的。
http://www.javashuo.com/article/p-cholswre-gb.html
http://www.javashuo.com/article/p-zpxgejsk-a.html
《實戰Java:高併發程序設計》