隨着業務的發展,單線程已經遠遠不能知足,隨即就有多線程的出現。多線程雖然能解決單線程解決不了的事情,可是它也會給你帶來額外的問題。好比成千上萬甚至上百萬的線程時候,你係統就會出現響應延遲、卡機、甚至直接卡死的狀況。爲何會出現這樣的緣由呢?由於爲每一個請求建立一個新線程的開銷很大:在建立和銷燬線程上花費的時間和消耗的系統資源要比花在處理實際的用戶請求的時間和資源更多。html
除了建立和銷燬線程的開銷以外,活動的線程也消耗系統資源。在一個 JVM裏建立太多的線程可能會致使系統因爲過分消耗內存而用完內存或「切換過分」。因此爲了防止資源不足,服務器應用程序須要一些辦法來限制任何給定時刻處理的請求數目。而線程池爲線程生命週期開銷問題和資源不足問題提供瞭解決方案。java
一、下降資源消耗,防止資源不足。合理配置線程池中的線程大小,防止請求線程猛增;另外經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
二、提升響應速度。線程池能夠經過對多個任務重用線程,在請求到達時線程已經存在(若是有空閒線程時),因此無心中也消除了線程建立所帶來的延遲。這樣,就能夠當即爲請求服務,使應用程序響應更快。
三、提升線程的可管理性。使用線程池能夠統一分配、調優和監控線程。數據庫
上面知道了線程池的做用,那麼線程池它是如何工做的呢?其使用核心類是哪個呢?因此要作到合理利用線程池,必須對其實現原理了如指掌。編程
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此必須瞭解這個類的用法及其內部原理,下面咱們來看下ThreadPoolExecutor類的具體源碼解析。 緩存
3.1 繼承關係服務器
經過類的繼承關係能夠得知哪些方法源於哪裏(具體請看代碼),下面直接給出類的繼承結構的圖:多線程
3.2 構造方法 併發
在ThreadPoolExecutor類中提供了四個構造方法:框架
1 // 五個參數的構造函數 2 public class ThreadPoolExecutor extends AbstractExecutorService { 3 public ThreadPoolExecutor(int corePoolSize, 4 int maximumPoolSize, 5 long keepAliveTime, 6 TimeUnit unit, 7 BlockingQueue<Runnable> workQueue) { 8 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 9 Executors.defaultThreadFactory(), defaultHandler); 10 } 11 // 六個參數的構造函數-1 12 public ThreadPoolExecutor(int corePoolSize, 13 int maximumPoolSize, 14 long keepAliveTime, 15 TimeUnit unit, 16 BlockingQueue<Runnable> workQueue, 17 ThreadFactory threadFactory) { 18 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 19 threadFactory, defaultHandler); 20 } 21 22 //六個參數的構造函數 -2 23 public ThreadPoolExecutor(int corePoolSize, 24 int maximumPoolSize, 25 long keepAliveTime, 26 TimeUnit unit, 27 BlockingQueue<Runnable> workQueue, 28 RejectedExecutionHandler handler) { 29 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 30 Executors.defaultThreadFactory(), handler); 31 } 32 // 七個參數的構造函數 33 public ThreadPoolExecutor(int corePoolSize, 34 int maximumPoolSize, 35 long keepAliveTime, 36 TimeUnit unit, 37 BlockingQueue<Runnable> workQueue, 38 ThreadFactory threadFactory, 39 RejectedExecutionHandler handler) { 40 if (corePoolSize < 0 || 41 maximumPoolSize <= 0 || 42 maximumPoolSize < corePoolSize || 43 keepAliveTime < 0) 44 throw new IllegalArgumentException(); 45 if (workQueue == null || threadFactory == null || handler == null) 46 throw new NullPointerException(); 47 this.corePoolSize = corePoolSize; 48 this.maximumPoolSize = maximumPoolSize; 49 this.workQueue = workQueue; 50 this.keepAliveTime = unit.toNanos(keepAliveTime); 51 this.threadFactory = threadFactory; 52 this.handler = handler; 53 }
從源代碼中發現前面三個構造器都是調用的第四個構造器進行的初始化工做,那就以第四個構造函數爲例,解釋下其中各個參數的含義(留意源碼中每一個字段上的註釋):ide
2. int maximumPoolSize:線程池容許建立的最大線程數。
3. long keepAliveTime:空閒線程等待超時的時間
4. TimeUnit unit:參數keepAliveTime的時間單位。共有七種單位,以下:
public enum TimeUnit { /** * 納秒=千分之一微妙 */ NANOSECONDS {...}, /** * 微妙=千分之一毫秒 */ MICROSECONDS {...}, /** * 毫秒 */ MILLISECONDS {...}, /** * 秒 */ SECONDS {...}, /** * 分鐘 */ MINUTES {...}, /** * 小時 */ HOURS {...}, /** * 天 */ DAYS {...}; }
5. BlockingQueue<Runnable> workQueue: 任務隊列,用於保存等待執行任務的阻塞隊列。隊列也有好幾種詳細請看這裏,這裏就不作解釋了。
6. ThreadFactory threadFactory:線程工廠,主要用於建立線程。其中能夠指定線程名字(千萬別忽略這件小事,有意義的名字能讓你快速定位到源碼中的線程類)
7. RejectedExecutionHandler handler:飽和策略,當隊列和線程池都滿了,說明線程處於飽和狀態,那麼後續進來的任務須要一種策略處理。默認狀況下是AbortPolicy:表示沒法處理新任務時拋出異常。線程池框架提供瞭如下4中策略(固然也能夠本身自定義策略:經過實現RejectedExecutionHandler接口自定義策略):
3.3 重要參數方法和方法解讀
1. 線程池狀態
// 初始值 -536870912 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 初始值 29 private static final int COUNT_BITS = Integer.SIZE - 3; // 初始值 536870911 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // RUNNING狀態:接受新任務並處理排隊任務 private static final int RUNNING = -1 << COUNT_BITS; // SHUTDOWN狀態:不接受新任務,但處理排隊任務 private static final int SHUTDOWN = 0 << COUNT_BITS; // STOP狀態:不接受新任務,不處理排隊任務,並中斷正在進行的任務 private static final int STOP = 1 << COUNT_BITS; // All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method private static final int TIDYING = 2 << COUNT_BITS; // TERMINATED: terminated() has completed private static final int TERMINATED = 3 << COUNT_BITS; // 獲取線程池狀態,取前三位 private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取當前正在工做的worker,主要是取後面29位 private static int workerCountOf(int c) { return c & CAPACITY; } // 生成ctl private static int ctlOf(int rs, int wc) { return rs | wc; }
當建立線程池後,初始時,線程池處於RUNNING狀態;
RUNNING -> SHUTDOWNN:若是調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢;
(RUNNING or SHUTDOWN) -> STOP:若是調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務;
SHUTDOWN -> TIDYING or STOP -> TIDYING :當線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態。
2. 線程池中的線程初始化
在說corePoolSize參數時有說到初始化線程池的兩個方法,其實在默認狀況下,建立線程池以後線程池中是沒有線程的,須要提交任務以後纔會建立線程。因此若是想在建立線程池以後就建立線程的話,能夠經過下面兩個方法建立:
/** * 單個建立核心線程 */ public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } /** * 啓動全部核心線程 */ public int prestartAllCoreThreads() { int n = 0; // 添加工做線程 while (addWorker(null, true)) ++n; return n; }
3. 建立線程:addWorker()
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 獲取運行狀態 int rs = runStateOf(c); /** * 若是當前的線程池的狀態>SHUTDOWN 那麼拒絕Worker的add 若是=SHUTDOWN * 那麼此時不能新加入不爲null的Task,若是在WorkCount爲empty的時候不能加入任何類型的Worker, * 若是不爲empty能夠加入task爲null的Worker,增長消費的Worker */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 獲取有效線程數,並判斷//若是當前的數量超過了CAPACITY,或者超過了corePoolSize和maximumPoolSize(試core而定),則直接返回 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // //CAS嘗試增長線程數,若是失敗,證實有競爭,那麼從新到retry。 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 繼續判斷當前線程池的運行狀態 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } /** * 新建任務 */ Worker w = new Worker(firstTask); Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); /** * rs!=SHUTDOWN ||firstTask!=null * * 一樣檢測當rs>SHUTDOWN時直接拒絕減少Wc,同時Terminate,若是爲SHUTDOWN同時firstTask不爲null的時候也要Terminate */ if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); tryTerminate(); return false; } workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } finally { mainLock.unlock(); } t.start(); //Stop或線程Interrupt的時候要停止全部的運行的Worker if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; }
從上面能夠看出:
在rs>SHUTDOWN時,拒絕一切線程的增長,由於STOP是會終止全部的線程,同時移除Queue中全部的待執行的線程的,因此也不須要增長first=null的Worker了。
其次,在SHUTDOWN狀態時,是不能增長first!=null的Worker的,同時即便first=null,可是此時Queue爲Empty也是不容許增長Worker的,SHUTDOWN下增長的Worker主要用於消耗Queue中的任務。
SHUTDOWN狀態時,是不容許向workQueue中增長線程的,isRunning(c) && workQueue.offer(command) 每次在offer以前都要作狀態檢測,也就是線程池狀態變爲>=SHUTDOWN時不容許新線程進入線程池了。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 原註釋已經講的很清楚了,主要分三步進行: */ int c = ctl.get(); // 一、若是線程數小於基本線程數,則建立線程並執行當前任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 二、若是任務能夠排隊,則會從新檢查看是否能夠啓動新的任務仍是拒絕任務 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 三、若是咱們沒法排隊任務,那麼咱們嘗試添加一個新線程。 若是失敗,咱們知道咱們已關閉或飽和,所以拒絕該任務。 else if (!addWorker(command, false)) reject(command); }
注意:該方法是沒有返回值的,若是想獲取線程執行後的結果能夠調用submit方法(固然它底層也是調用execute()方法)
五、線程池關閉:
ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:
從上線的源碼分析後,應該知道線程池處理任務的大概流程了,下面統一梳理下當線程池接到任務時的處理流程:
一、線程池首先會判斷核心線程池是否已滿(核心線程數是否超過corePoolSize),若沒有則建立新的核心線程來處理任務,不然進行第二步;
二、接着會判斷阻塞隊列是否已滿(因此推薦使用有界隊列),若是沒有滿則進入阻塞隊列等待執行,不然進行第三步;
三、而後線程池會判斷整個線程池是否已滿(整個線程數是否超過maximunPoolSize),若沒有則建立新線程處理任務,不然交個飽和策略處理新的任務。
一、建立線程或線程池時請指定有意義的線程名稱,方便回溯。來源《阿里巴巴 Java開發手冊》
二、線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式,這樣 的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。來源《阿里巴巴 Java開發手冊》
說明:Executors 返回的線程池對象的弊端以下:
1)FixedThreadPool 和 SingleThreadPool: 容許的請求隊列長度爲 Integer.MAX_VALUE,可能會堆積大量的請求,從而致使 OOM。
2)CachedThreadPool 和 ScheduledThreadPool: 容許的建立線程數量爲 Integer.MAX_VALUE,可能會建立大量的線程,從而致使 OOM。
三、合理配置線程池大小,能夠從如下幾個角度來進行分析:
比方說:對於 CPU 密集型的計算場景,理論上「線程的數量 = CPU核數」是最合適的。
若是是IO密集型任務,參考值能夠設置爲CPU 核數 * [ 1 +(I/O 耗時 / CPU耗時)]
注意:以上值僅供參考,須要根據具體實際狀況(壓測)而定。
四、建議使用有界隊列
五、合理設置空閒線程等待時間。
若是任務不少且每一個任務執行的時間比較短,則能夠調大時間,提升線程利用率。
https://www.cnblogs.com/dolphin0520/p/3932921.html
http://ifeve.com/java-threadpool/
《Java併發編程的藝術》
《阿里巴巴Java開發手冊》