以前在工做中遇到一個問題,我定義了一個線程池來執行任務,程序執行結束後任務沒有所有執行完。mysql
業務場景是這樣的:因爲統計業務須要,訂單信息須要從主庫中通過統計業務代碼寫入統計庫。因爲代碼質量及歷史緣由,目前的從新統計接口是單線程的,粗略算了算一共有100萬條訂單信息,每100條的處理大約是10秒,因此理論上處理徹底部信息須要28個小時,這還不算由於 mysql 中 limit 分頁致使的後期查詢時間以及可能出現的內存溢出致使停止統計的狀況。sql
基於上述的緣由,以及最重要的一點:統計業務是根據訂單所屬的中心進行的,各個中心同時統計不會致使髒數據。因此,我計劃使用線程池,爲每個中心分配一條線程去執行統計業務。函數
// 線程工廠,用於爲線程池中的每條線程命名 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("stats-pool-%d").build(); // 建立線程池,使用有界阻塞隊列防止內存溢出 ExecutorService statsThreadPool = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), namedThreadFactory); // 遍歷全部中心,爲每個centerId提交一條任務到線程池 statsThreadPool.submit(new StatsJob(centerId));
在建立完線程池後,爲每個 centerId 提交一條任務到線程池。在個人預想中,因爲線程池的核心線程數爲5,最多5箇中心同時進行統計業務,將大大縮短100萬條數據的總統計時間,因而萬分興奮的我開始執行從新統計業務了。ui
在跑了好久以後,當我查看統計進度時,我發現了一個十分詭異的問題(以下圖)。藍框標出的這條線程是 WAIT 狀態,代表這條線程是空閒狀態,可是從日誌中我看到這條線程並無完成它的任務,由於這個中心的數據有10萬條,可是日誌顯示它只跑到了一半,以後就再無關於此中心的日誌了。this
這是什麼緣由?spa
能夠想到的是,這條線程由於某些緣由被阻塞了,而且沒有繼續進行下去,可是日誌又沒有任何異常信息...線程
可能有經驗的工程師已經知道了緣由...調試
因爲我的水平的線程,暫時沒有找到緣由的我只能放棄使用線程池,乖乖用單線程跑...日誌
幸運的是,單線程跑的任務居然拋錯了(爲何要說幸運?),因而立刻想到,以前那條 WAIT 狀態的線程多是由於一樣的拋錯因此被中斷了,致使任務沒有繼續進行下去。code
爲何說幸運?由於若是單線程的任務沒有拋錯的話,我可能好久都想不到是這個緣由。
工做上的問題到這裏就找到緣由了,以後的解決過程也十分簡單,這裏就不提了。
可是疑問又來了,爲何使用線程池的時候,線程因異常被中斷卻沒有拋出任何信息呢?還有平時若是是在 main 函數裏面的異常也會被拋出來,而不是像線程池這樣被吞掉。
若是子線程拋出了異常,線程池會如何進行處理呢?
我提交任務到線程池的方式是:
threadPoolExecutor.submit(Runnbale task);
,後面瞭解到使用 execute() 方式提交任務會把異常日誌給打出來,這裏研究一下爲何使用 submit 提交任務,在任務中的異常會被「吞掉」。
對於 submit() 形式提交的任務,咱們直接看源碼:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 被包裝成 RunnableFuture 對象,而後準備添加到工做隊列 RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
它會被線程池包裝成 RunnableFuture 對象,而最終它實際上是一個 FutureTask 對象,在被添加到線程池的工做隊列,而後調用 start() 方法後, FutureTask 對象的 run() 方法開始運行,即本任務開始執行。
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this,runnerOffset,null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { // 捕獲子任務中的異常 result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
在 FutureTask 對象的 run() 方法中,該任務拋出的異常被捕獲,而後在setException(ex); 方法中,拋出的異常會被放到 outcome 對象中,這個對象就是 submit() 方法會返回的 FutureTask 對象執行 get() 方法獲得的結果。可是在線程池中,並無獲取執行子線程的結果,因此異常也就沒有被拋出來,即被「吞掉」了。
這就是線程池的 submit() 方法提交任務沒有異常拋出的緣由。
在定義 ThreadFactory 的時候調用setUncaughtExceptionHandler
方法,自定義異常處理方法。
例如:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("judge-pool-%d") .setUncaughtExceptionHandler((thread, throwable)-> logger.error("ThreadPool {} got exception", thread,throwable)) .build();
這樣,對於線程池中每條線程拋出的異常都會打下 error 日誌,就不會看不到了。
在修復了單個線程任務的異常以後,我繼續使用線程池進行從新統計業務,終於跑完了,也終於完成了這個任務。
小結:使用線程池時須要注意,子線程的異常,若是沒有被捕獲就會丟失,可能會致使後期根據日誌調試時沒法找到緣由。