前言 java
=====spring
談到 Java 的線程池最熟悉的莫過於 ExecutorService 接口了,jdk1.5 新增的 java.util.concurrent 包下的這個 api,大大的簡化了多線程代碼的開發。而不論你用 FixedThreadPool 仍是 CachedThreadPool 其背後實現都是ThreadPoolExecutor。數據庫
ThreadPoolExecutor 是一個典型的緩存池化設計的產物,由於池子有大小,當池子體積不夠承載時,就涉及到拒絕策略。JDK 中已經預設了 4 種線程池拒絕策略,下面結合場景詳細聊聊這些策略的使用場景,以及咱們還能擴展哪些拒絕策略。api
池話設計應該不是一個新名詞。咱們常見的如 Java 線程池、JDBC 鏈接池、Redis 鏈接池等就是這類設計的表明實現。這種設計會初始預設資源,解決的問題就是抵消每次獲取資源的消耗,如建立線程的開銷,獲取遠程鏈接的開銷等。緩存
就比如你去食堂打飯,打飯的大媽會先把飯盛好幾份放那裏,你來了就直接拿着飯盒加菜便可,不用再臨時又盛飯又打菜,效率就高了。多線程
除了初始化資源,池化設計還包括以下這些特徵:池子的初始值、池子的活躍值、池子的最大值等,這些特徵能夠直接映射到 Java 線程池和數據庫鏈接池的成員屬性中。併發
和數據源鏈接池不同,線程池除了初始大小和池子最大值,還多了一個阻塞隊列來緩衝。數據源鏈接池通常請求的鏈接數超過鏈接池的最大值的時候就會觸發拒絕策略,策略通常是阻塞等待設置的時間或者直接拋異常。而線程池的觸發時機以下圖:運維
如圖,想要了解線程池何時觸發拒絕粗略,須要明確上面三個參數的具體含義,是這三個參數整體協調的結果,而不是簡單的超過最大線程數就會觸發線程拒絕粗略,當提交的任務數大於 corePoolSize 時,會優先放到隊列緩衝區,只有填滿了緩衝區後,纔會判斷當前運行的任務是否大於 maxPoolSize,小於時會新建線程處理。大於時就觸發了拒絕策略,總結就是:當前提交任務數大於(maxPoolSize + queueCapacity)時就會觸發線程池的拒絕策略了。ide
在分析 JDK 自帶的線程池拒絕策略前,先看下 JDK 定義的 拒絕策略接口,以下:性能
1public interface RejectedExecutionHandler { 2 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 3}
接口定義很明確,當觸發拒絕策略時,線程池會調用你設置的具體的策略,將當前提交的任務以及線程池實例自己傳遞給你處理,具體做何處理,不一樣場景會有不一樣的考慮,下面看 JDK 爲咱們內置了哪些實現:
1 public static class CallerRunsPolicy implements RejectedExecutionHandler { 2 3 public CallerRunsPolicy() { } 4 5 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 6 if (!e.isShutdown()) { 7 r.run(); 8 } 9 } 10 }
功能:當觸發拒絕策略時,只要線程池沒有關閉,就由提交任務的當前線程處理。
使用場景:通常在不容許失敗的、對性能要求不高、併發量較小的場景下使用,由於線程池通常狀況下不會關閉,也就是提交的任務必定會被運行,可是因爲是調用者線程本身執行的,當屢次提交任務時,就會阻塞後續任務執行,性能和效率天然就慢了。
1 public static class AbortPolicy implements RejectedExecutionHandler { 2 3 public AbortPolicy() { } 4 5 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 6 throw new RejectedExecutionException("Task " + r.toString() + 7 " rejected from " + 8 e.toString()); 9 } 10 }
功能:當觸發拒絕策略時,直接拋出拒絕執行的異常,停止策略的意思也就是打斷當前執行流程
使用場景:這個就沒有特殊的場景了,可是一點要正確處理拋出的異常。ThreadPoolExecutor 中默認的策略就是AbortPolicy,ExecutorService 接口的系列 ThreadPoolExecutor 由於都沒有顯示的設置拒絕策略,因此默認的都是這個。可是請注意,ExecutorService 中的線程池實例隊列都是無界的,也就是說把內存撐爆了都不會觸發拒絕策略。當本身自定義線程池實例時,使用這個策略必定要處理好觸發策略時拋的異常,由於他會打斷當前的執行流程。
1 public static class DiscardPolicy implements RejectedExecutionHandler { 2 3 public DiscardPolicy() { } 4 5 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 6 } 7 }
功能:直接靜悄悄的丟棄這個任務,不觸發任何動做
使用場景:若是你提交的任務可有可無,你就可使用它 。由於它就是個空實現,會悄無聲息的吞噬你的的任務。因此這個策略基本上不用了
1 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 2 3 public DiscardOldestPolicy() { } 4 5 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 6 if (!e.isShutdown()) { 7 e.getQueue().poll(); 8 e.execute(r); 9 } 10 } 11 }
功能:若是線程池未關閉,就彈出隊列頭部的元素,而後嘗試執行
使用場景:這個策略仍是會丟棄任務,丟棄時也是毫無聲息,可是特色是丟棄的是老的未執行的任務,並且是待執行優先級較高的任務。基於這個特性,我能想到的場景就是,發佈消息,和修改消息,當消息發佈出去後,還未執行,此時更新的消息又來了,這個時候未執行的消息的版本比如今提交的消息版本要低就能夠被丟棄了。由於隊列中還有可能存在消息版本更低的消息會排隊執行,因此在真正處理消息的時候必定要作好消息的版本比較
1public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { 2 3 protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); 4 5 private final String threadName; 6 7 private final URL url; 8 9 private static volatile long lastPrintTime = 0; 10 11 private static Semaphore guard = new Semaphore(1); 12 13 public AbortPolicyWithReport(String threadName, URL url) { 14 this.threadName = threadName; 15 this.url = url; 16 } 17 18 @Override 19 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 20 String msg = String.format("Thread pool is EXHAUSTED!" + 21 " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + 22 " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", 23 threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), 24 e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), 25 url.getProtocol(), url.getIp(), url.getPort()); 26 logger.warn(msg); 27 dumpJStack(); 28 throw new RejectedExecutionException(msg); 29 } 30 31 private void dumpJStack() { 32 //省略實現 33 } 34}
能夠看到,當dubbo的工做線程觸發了線程拒絕後,主要作了三個事情,原則就是儘可能讓使用者清楚觸發線程拒絕策略的真實緣由
1 private static final class NewThreadRunsPolicy implements RejectedExecutionHandler { 2 NewThreadRunsPolicy() { 3 super(); 4 } 5 6 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 7 try { 8 final Thread t = new Thread(r, "Temporary task executor"); 9 t.start(); 10 } catch (Throwable e) { 11 throw new RejectedExecutionException( 12 "Failed to start a new thread", e); 13 } 14 } 15 }
Netty 中的實現很像 JDK 中的 CallerRunsPolicy,捨不得丟棄任務。不一樣的是,CallerRunsPolicy 是直接在調用者線程執行的任務。而 Netty是新建了一個線程來處理的。因此,Netty的實現相較於調用者執行策略的使用面就能夠擴展到支持高效率高性能的場景了。可是也要注意一點,Netty的實現裏,在建立線程時未作任何的判斷約束,也就是說只要系統還有資源就會建立新的線程來處理,直到new不出新的線程了,纔會拋建立線程失敗的異常
1 new RejectedExecutionHandler() { 2 @Override 3 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { 4 try { 5 executor.getQueue().offer(r, 60, TimeUnit.SECONDS); 6 } catch (InterruptedException e) { 7 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); 8 } 9 10 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); 11 } 12 });
activeMq中的策略屬於最大努力執行任務型,當觸發拒絕策略時,在嘗試一分鐘的時間從新將任務塞進任務隊列,當一分鐘超時還沒成功時,就拋出異常
1public class RejectedExecutionHandlerChain implements RejectedExecutionHandler { 2 private final RejectedExecutionHandler[] handlerChain; 3 4 public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) { 5 Objects.requireNonNull(chain, "handlerChain must not be null"); 6 RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]); 7 return new RejectedExecutionHandlerChain(handlerChain); 8 } 9 10 private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) { 11 this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null"); 12 } 13 14 @Override 15 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 16 for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) { 17 rejectedExecutionHandler.rejectedExecution(r, executor); 18 } 19 } 20}
pinpoint的拒絕策略實現頗有特色,和其餘的實現都不一樣。他定義了一個拒絕策略鏈,包裝了一個拒絕策略列表,當觸發拒絕策略時,會將策略鏈中的rejectedExecution依次執行一遍。
前文從線程池設計思想,以及線程池觸發拒絕策略的時機引出java線程池拒絕策略接口的定義。並輔以JDK內置4種以及四個第三方開源軟件的拒絕策略定義描述了線程池拒絕策略實現的各類思路和使用場景。但願閱讀此文後能讓你對java線程池拒絕策略有更加深入的認識,可以根據不一樣的使用場景更加靈活的應用。