血的教訓之背景:使用線程池對存量數據進行遷移,可是總有一批數據遷移失敗,無異常日誌打印java
據說parallelStream
並行流是個好東西,因爲平常開發stream
串行流的場景比較多,此次須要寫遷移程序恰好能夠用得上,那還不趕忙拿來裝*一下,此時不裝更待什麼時候。機智的我還知道在 JVM 的後臺,使用通用的 fork/join 池來完成上述功能,該池是全部並行流共享的,默認狀況,fork/join 池會爲每一個處理器分配一個線程,對應的變通方案就是建立本身的線程池如併發
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); pool.submit(() -> { list.parallelStream().collect(Collectors.toList()); });
因而地雷就是從這裏埋下的。異步
public static void main(String[] args) throws InterruptedException, ExecutionException { final ExecutorService pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); List<Integer> list = Lists.newArrayList(1, 2, 3, null); //1.使用submit pool.submit(() -> { list.parallelStream().map(a -> a.toString()).collect(Collectors.toList()); }); TimeUnit.SECONDS.sleep(3); //2.使用 execute pool.execute(() -> { list.parallelStream().map(a -> a.toString()).collect(Collectors.toList()); }); //3.使用submit,調用get() pool.submit(() -> { list.parallelStream().map(a -> a.toString()).collect(Collectors.toList()); }).get(); TimeUnit.SECONDS.sleep(3); }
讀者自行跑一下上面的用例,會發現單獨使用submit
方法的並不會打印出錯誤日誌,而使用execute
方法打印出了錯誤日誌,可是對submit
返回的FutureJoinTask
調用get()
方法,又會拋出異常。因而真相大白,部分批次中的數據存在髒數據,爲null值,遍歷到該null值的時候出現了異常,可是異常日誌在submit
方法中給catch住,沒有打印出來(心痛的感受),而被捕獲的異常,被包裝在返回的結果類FutureJoinTask
中,並無再次拋出。工具
submit
方法 結論先行,我犯的錯誤就是,淺顯的認爲submit
和execute
的區別就只是一個有返回異步結果,一個沒有返回一步結果,可是事實是殘酷的。在submit()
中邏輯必定包含了將異步任務拋出的異常捕獲,而由於使用方法不當而致使該異常沒有再次拋出。post
如今提出一個問題,ForkJoinPool#submit()
中返回的ForkJoinTask
能夠獲取異步任務的結果,現這個異步拋出了異常,咱們嘗試獲取該任務的結果會是如何? 咱們直接看ForkJoinTask#get()
的源碼。this
public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : externalInterruptibleAwaitDone(); Throwable ex; if ((s &= DONE_MASK) == CANCELLED) throw new CancellationException(); //這裏能夠直接看到,異步任務出現異常會在調用get()獲取結果的時候,會被包裝成ExecutionException再次拋出 if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) throw new ExecutionException(ex); return getRawResult(); }
異步任務出現異常會在調用get()獲取結果的時候,會被包裝成ExecutionException
再次拋出,可是異常是在哪裏被捕獲的呢?萬變不離其宗,全部線程的線程都須要重寫Thread#run()
方法, 投遞到ForkJoinPool
的線程會被包裝成ForkJoinWorkerThread
,所以咱們看一下ForkJoinWorkerThread#run()
的實現.線程
public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); pool.runWorker(workQueue); } catch (Throwable ex) { //出現異常,捕獲,再次拋出會在調用ForkJoinTask#get()的時候 exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } }
上面的分析是基於ForkJoinPool
的,是否是全部的線程池的submit
和execute
方法的實現都是相似這樣,咱們經常使用的線程池ThreadPoolThread
實現會是怎樣的,一樣的思路,咱們須要找到投遞到ThreadPoolThread
的異步任務最終被包裝爲哪一個Thread
的子類或者是實現java.lang.Runnable#run
,答案就是java.util.concurrent.FutureTask
日誌
public void run() { ... try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { //捕獲異常 result = null; ran = false; setException(ex); } if (ran) set(result); } } .... }
java.util.concurrent.ExecutorService#submit(java.lang.Runnable)
爲什麼線程池會有這種設定,實際上咱們的思路不該該侷限於線程池,而是放在獲取異步任務結果,異常是否也是屬於異步結果,FutureTask
做爲JDK提供的併發工具類的實現中,已經給出了很好的答案,即獲取異步任務結果,異常也是屬於異步結果,若是異步任務出現運行時異常,那麼在獲取該任務的結果時,該異常會被從新包裝拋出code
做者:plz叫我紅領巾 開發
出處:http://www.javashuo.com/article/p-qwvevnug-ep.html
本博客歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然保留追究法律責任的權利。碼子不易,您的點贊是我習做最大的動力