最近看完了ElasticSearch線程池模塊的源碼,感觸頗深,而後也自不量力地借鑑ES的 EsThreadPoolExecutor 從新造了一把輪子(源碼在這裏),對線程池的理解又加深了一些。在繼承 ThreadPoolExecutor實現自定義的線程池時,ES先重寫了Runnable接口,提供了更靈活的任務運行過程當中出現異常處理邏輯。簡而言之,它採用回調機制實現了線程在運行過程當中拋出未受檢異常的統一處理邏輯,很是優美。實在忍不住把源碼copy下來:html
/** * An extension to runnable. */ public abstract class AbstractRunnable implements Runnable { /** * Should the runnable force its execution in case it gets rejected? */ public boolean isForceExecution() { return false; } @Override public final void run() { try { doRun(); } catch (Exception t) { onFailure(t); } finally { onAfter(); } } /** * This method is called in a finally block after successful execution * or on a rejection. */ public void onAfter() { // nothing by default } /** * This method is invoked for all exception thrown by {@link #doRun()} */ public abstract void onFailure(Exception e); /** * This should be executed if the thread-pool executing this action rejected the execution. * The default implementation forwards to {@link #onFailure(Exception)} */ public void onRejection(Exception e) { onFailure(e); } /** * This method has the same semantics as {@link Runnable#run()} * @throws InterruptedException if the run method throws an InterruptedException */ protected abstract void doRun() throws Exception; }
統一的任務執行入口方法doRun(),由各個子類實現doRun()執行具體的業務邏輯java
try-catch中統一處理線程執行任務過程當中拋出的異常,由onFailure()處理git
任務執行完成(不論是正常結束仍是運行過程當中拋出了異常),統一由onAfter()處理程序員
isForceExecution
方法,用來支持任務在提交給線程池被拒絕了,強制執行。固然了,這須要線程池的任務隊列提供相關的支持。我也是受這種方式的啓發,實現了一個線程在執行任務過程當中拋出未受檢異常時,先判斷該任務是否容許強制執行isForceExecution,而後再從新提交任務運行的線程池。github
此外,ES內置了好幾個默認實現的線程池,好比 EsThreadPoolExecutor 、QueueResizingEsThreadPoolExecutor 和 PrioritizedEsThreadPoolExecutor。apache
QueueResizingEsThreadPoolExecutorless
在建立線程池時會指定一個任務隊列(BlockingQueue),日常都是直接用 LinkedBlockingQueue,它是一個無界隊列,固然也能夠在構造方法中指定隊列的長度。可是,ES中幾乎不用 LinkedBlockingQueue 做爲任務隊列,而是使用 LinkedTransferQueue ,可是 LinkedTransferQueue 又是一個無界隊列,因而ES又基於LinkedTransferQueue 封裝了一個任務隊列,類名稱爲 ResizableBlockingQueue,它可以限制任務隊列的長度。elasticsearch
那麼問題來了,對於一個線程池,任務隊列設置爲多長合適呢?ide
答案就是Little's Law。在QueueResizingEsThreadPoolExecutor 線程池中重寫了afterExecute()方法,裏面統計了每一個任務的運行時間、等待時間(入隊列到執行)。因此,你想知道如何統計一個任務的運行時間嗎?你想統計線程池一共提交了多少個任務,全部任務的運行時間嗎?看看QueueResizingEsThreadPoolExecutor 源碼就明白了。性能
另外再提一個問題,爲何ES用 LinkedTransferQueue 做爲任務隊列而不用 LinkedBlockingQueue 呢?
我想:很重要的一個緣由是LinkedBlockingQueue 是基於重量級的鎖(ReentrantLock)實現的入隊操做,而LinkedTransferQueue 是基於CAS原子指令實現的入隊操做。LinkedBlockingQueue#offer()當隊列長度達到最大值,此時不能提交任務給隊列了,直接返回false,不然經過加鎖方式將任務提交給隊列。LinkedTransferQueue自己是無界的,所以添加任務到LinkedTransferQueue時,經過CAS實現避免了加鎖帶來的上下文開銷的切換,在大部分競爭狀況下,是會提高性能的。
PrioritizedEsThreadPoolExecutor
優先級任務的線程池,任務提交給線程池後是在任務隊列裏面排隊,FIFO模式。而這個線程池則容許任務定義一個優先級,優先級高的任務先執行。
EsThreadPoolExecutor
這個線程池很是像JDK裏面的ThreadPoolExecutor,不過,它實現了一些拒絕處理邏輯,提交任務若被拒絕(會拋出EsRejectedExecutionException異常),則進行相關處理
@Override public void execute(final Runnable command) { doExecute(wrapRunnable(command)); } protected void doExecute(final Runnable command) { try { super.execute(command); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection // directly and don't need to rethrow it. try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } } else { throw ex; } } }
講完了ES中經常使用的三個線程池實現,還想結合JDK源碼,記錄一下線程在執行任務過程當中拋出運行時異常,是如何處理的。我以爲有二種方式(或者說有2個地方)來處理運行時異常。一種方式是:java.util.concurrent.ThreadPoolExecutor#afterExecute方法,另外一種方式是:java.lang.Thread.UncaughtExceptionHandler#uncaughtException
afterExecute
看ThreadPoolExecutor#afterExecute(Runnable r, Throwable t) 的源碼註釋:
Method invoked upon completion of execution of the given Runnable.This method is invoked by the thread that executed the task. If non-null, the Throwable is the uncaught RuntimeException or Error that caused execution to terminate abruptly.
提交給線程池的任務,執行完(不論是正常結束,仍是執行過程當中出現了異常)後都會自動調用afterExecute()方法。若是執行過程當中出現了異常,那麼Throwable t 就不爲null,而且致使執行終止(terminate abruptly.)。
This implementation does nothing, but may be customized in subclasses. Note: To properly nest multiple overridings, subclasses should generally invoke super.afterExecute at the beginning of this method.
默認的afterExecute(Runnable r, Throwable t) 方法是一個空實現,什麼也沒有。所以,在繼承ThreadPoolExecutor實現本身的線程池時,若是重寫該方法,則要記住:先調用 super.afterExecute
好比說這樣幹:
@Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t != null) { //出現了異常 if (r instanceof AbstractRunnable && ((AbstractRunnable)r).isForceExecution()) { //AbstractRunnable 設置爲強制執行時從新拉起任務 execute(r); logger.error("AbstractRunnable task run time error:{}, restarted", t.getMessage()); } } }
看,重寫afterExecute方法,當 Throwable 不爲null時,代表線程執行任務過程當中出現了異常,這時就從新提交任務。
有個時候,在實現 Kafka 消費者線程的時候(while true循環),常常由於解析消息出錯致使線程拋出異常,就會致使 Kafka消費者線程掛掉,這樣就永久丟失了一個消費者了。而經過這種方式,當消費者線程掛了時,可從新拉起一個新任務。
uncaughtException
建立 ThreadPoolExecutor時,要傳入ThreadFactory 做爲參數,在而建立ThreadFactory 對象時,就能夠設置線程的異常處理器java.lang.Thread.UncaughtExceptionHandler。
在用Google Guava包的時候,通常這麼幹:
//先 new Thread.UncaughtExceptionHandler對象 exceptionHandler private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread_name-%d").setUncaughtExceptionHandler(exceptionHandler).build();
在線程執行任務過程當中,若是拋出了異常,就會由JVM調用 Thread.UncaughtExceptionHandler 中實現的異常處理邏輯。看Thread.UncaughtExceptionHandler的JDK源碼註釋:
Interface for handlers invoked when a Thread abruptly. terminates due to an uncaught exception.
When a thread is about to terminate due to an uncaught exception the Java Virtual Machine will query the thread for its UncaughtExceptionHandler using getUncaughtExceptionHandler and will invoke the handler's uncaughtException method, passing the thread and the exception as arguments.
其大意就是:若是線程在執行Runnable任務過程由於 uncaught exception 而終止了,那麼 JVM 就會調用getUncaughtExceptionHandler 方法查找是否設置了異常處理器,若是設置了,那就就會調用異常處理器的java.lang.Thread.UncaughtExceptionHandler#uncaughtException方法,這樣咱們就能夠在這個方法裏面定義異常處理邏輯了。
ES的ThreadPool 模塊是學習線程池的很是好的一個示例,實踐出真知。它告訴你如何自定義線程池(用什麼任務隊列?cpu核數、任務隊列長度等參數如何配置?)。在實現自定義任務隊列過程當中,也進一步理解了CAS操做的原理,如何巧妙地使用CAS?是失敗重試呢?仍是直接返回?。我想,這也是CAS與synchronized鎖、ReentrantLock鎖的一個最重要應用區別:多個線程在競執行 synchronized鎖 或者 ReentrantLock鎖 鎖住的代碼(術語叫臨界區)時,未搶到鎖的進程會被掛起,會伴隨上下文切換,而若能夠把臨界區中的代碼邏輯基於CAS原子指令來實現,若是某個線程執行CAS操做失敗了,它能夠選擇繼續重試,仍是執行其它的處理邏輯,仍是sleep若干毫秒。所以,它把線程執行的主動權交回給了程序員。好比基於CAS實現自增操做,失敗時繼續重試(這裏自增操做邏輯自己要求"失敗重試直到加1成功"),直到加1成功,代碼是這樣的:
do{ v = value.get(); }while(v!=value.compareAndSwap(v,v+1));
有個時候,代碼裏面CAS失敗,並不必定就須要當即重試,由於,CAS失敗了,意味着此時有其餘線程也在競爭,說明資源的競爭較激烈,那咱們是否是能夠先 sleep 一下再重試呢?這樣是否是更好?
線程在執行Runnable任務過程當中拋出了異常如何處理?這裏提到了Thread.UncaughtExceptionHandler#uncaughtException 和 ThreadPoolExecutor#afterExecute。前者是由JVM自動調用的,後者則是在每一個任務執行結束後都會被調用。
Thread.UncaughtExceptionHandler#uncaughtException 和 RejectedExecutionHandler#rejectedExecution 是不一樣的。RejectedExecutionHandler 用來處理任務在提交的時候,被線程池拒絕了,該怎麼辦的問題,默認是AbortPolicy,即:直接丟棄。
Lucene 源碼 org.apache.lucene.util.CloseableThreadLocal 解決了使用JDK ThreadLocal 時 JAVA對象 長期駐留內存得不到及時清除的問題,也值得好好分析一番 :) 原文:https://www.cnblogs.com/hapjin/p/10617702.html