最近精讀Netty源碼,讀到NioEventLoop部分的時候,發現對Java線程&線程池有些概念還有困惑, 因此深刻總結一下
Java線程池一:線程基礎
Java線程池二:線程池原理html
Java線程映射的是系統內核線程,是稀缺資源,使用線程池主要有如下幾點好處java
execute
方法提交任務submit
方法 & 提供線程池關閉的方法(shutdown, shutdowNow)任務提交過程見下流程圖
api
核心線程池滿時,任務會嘗試提交到工做隊列,後續工做線程會從工做隊列中獲取任務執行。數組
由於涉及到多個線程對工做隊列的讀寫,因此工做隊列須要是線程安全的,Java提供瞭如下幾種線程安全的隊列(BlockingQueue)安全
實現類 | 工做機制 |
---|---|
ArrayBlockingQueue | 底層實現是數組 |
LinkedBlockingDeque | 底層實現是鏈表 |
PriorityBlockingQueue | 優先隊列,本質是個小頂堆 |
DelayQueue | 延時隊列 (優先隊列 & 元素實現Delayed接口),ScheduledThreadPoolExecutor實現的關鍵 |
SynchronousQueue | 同步隊列 |
操做 | 描述 |
---|---|
add/remove | 隊列已滿/隊列已空時,拋出異常 |
put/take | 隊列已滿/隊列已空時,阻塞等待 |
offer/poll | 隊列已滿/隊列已空時,返回特殊值(false/null) |
offer(time) / poll(time) | 超時時間內沒法寫入或者讀取成功,返回特殊值 |
拒絕策略是當線程池滿負載時(任務隊列已滿 & 線程池已滿)對新提交任務的處理策略,jdk提供了以下四種實現,其中AbortPolicy是默認實現。併發
實現類 | 工做機制 |
---|---|
AbortPolicy | 拋出RejectedExecutionException異常 |
CallerRunsPolicy | 調用線程執行該任務 |
DiscardOldestPolicy | 丟棄工做隊列頭部任務,再嘗試提交該任務 |
DiscardPolicy | 直接丟棄 |
固然咱們能夠有自定義的實現,好比記錄日誌、任務實例持久化,同時發送報警到開發人員。框架
線程池提供了幾個submit方法, 調用線程能夠根據返回的Future對象獲取任務執行結果,那麼它的實現原理又是什麼吶?oop
裝飾模式對task的run方法進行加強
this
1.提交任務前,會把task裝飾成一個FutureTask對象線程
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
2.FutureTask對象的run方法會存儲返回的結果或者異常。調用方能夠根據FutureTask獲取任務的執行結果。
//省略了部分代碼 public void run() { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //執行任務 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //存儲異常 setException(ex); } if (ran) //存儲返回值 set(result); }
shutdown將線程池的狀態設置成SHUTDOWN,同時拒絕提交新的任務,可是已提交的任務會正常執行
shutdownNow將線程池的狀態設置成STOP,該狀態下拒絕提交新的任務 & 丟棄工做隊列中的任務& 中斷當前活躍的線程(嘗試中止正在執行的任務)
須要注意的是shutdownNow對於正在執行的任務只是嘗試中止
,不保證成功(取決於任務是否監聽處理中斷位)
ScheduledThreadPoolExecutor在ThreadPoolExecutor之上擴展實現了定時調度的能力
1.實例化時工做隊列使用延時隊列(DelayedWorkQueue)--- 本質是個小頂堆
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); }
2.提交的任務裝飾成ScheduledFutureTask類型,並把任務加入到工做隊列(不直接調用execute)
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); //裝飾 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); //任務加入工做隊列 delayedExecute(t); return t; }
3.ScheduledFutureTask實現Delayed和Comparable接口
因此提交到工做隊列中的任務是按照任務執行時間排序的(最先執行的任務在頭部),由於工做隊列是個小頂堆。
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
4.只能從工做隊列中獲取已到執行時間的任務
public RunnableScheduledFuture<?> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first = queue[0]; //若是頭部的任務尚未到執行時間, 直接返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } }
假設:CPU核心數是N,每一個任務的執行時間是T,任務的超時時間是timeout,核心線程數是corePoolSize,工做隊列大小是workQueue, 最大線程數是 maxPoolSize, 任務最大併發數爲maxTasks
對於CPU密集型任務:corePoolSize 大小設置成和CPU核心數接近,如N+1 或者 N+2
對於IO密集型任務:corePoolSize能夠設置的比較大一些,如2N~3N;也能夠經過以下邏輯進行估算
假設80%的時間是IO操做,那麼每一個任務須要佔用CPU時間大概是0.2T, 每秒每一個CPU核心最大能夠執行的任務數爲 = (1/0.2T) = 5/T;因此理論上
80%IO的狀況下corePoolSize能夠設置爲 5N (一個cpu能夠對應5個工做線程)
工做隊列的大小取決於任務的超時時間 & 核心線程池的吞吐量
則 workQueue = corePoolSize * (1/T) * timeout = (corePoolSize * timeout) / T
須要注意的是: 工做隊列不能使用無界隊列。(無界隊列異常狀況下可能耗盡系統資源,形成服務不可用)
最大線程數的大小取決於最大的任務併發數 & 工做隊列的大小 & 任務的執行時間
則 maxPoolSize = (maxTasks - workQueue) / T
對於可有可無的任務,咱們能夠直接丟棄;對於一些重要的任務須要對任務進行持久化,以便後續進行補償和恢復。
咱們能夠有個定時腳本將線程池的最大線程數、工做隊列大小、已經執行的任務數、已經拒絕的任務數等數據推送到監控系統
這樣咱們能夠根據這些數據對線程池進行調優,也能夠即便感知線上業務異常。