多線程編程很難,難點在於多線程代碼的執行不是按照咱們直覺上的執行順序。因此多線程編程必需要創建起一個宏觀的認識。java
線程池是多線程編程中的一個重要概念。爲了可以更好地使用多線程,學習好線程池固然是必須的。git
平時咱們在使用多線程的時候,一般都是架構師配置好了線程池的 Bean,咱們須要使用的時候,提交一個線程便可,不須要過多關注其內部原理。github
在學習一門新的技術以前,咱們仍是先了解下爲何要使用它,使用它可以解決什麼問題:面試
例如:編程
記建立線程消耗時間T1,執行任務消耗時間T2,銷燬線程消耗時間T3緩存
若是T1+T3>T2,那麼是否是說開啓一個線程來執行這個任務太不划算了!多線程
正好,線程池緩存線程,可用已有的閒置線程來執行新任務,避免了T1+T3帶來的系統開銷架構
咱們知道線程能共享系統資源,若是同時執行的線程過多,就有可能致使系統資源不足而產生阻塞的狀況併發
運用線程池能有效的控制線程最大併發數,避免以上的問題異步
好比:延時執行、定時循環執行的策略等
運用線程池都能進行很好的實現
在 Java 中,新建一個線程池對象很是簡單,Java 自己提供了工具類java.util.concurrent.Executors
,可使用以下代碼建立一個固定數量線程的線程池:
ExecutorService service = Executors.newFixedThreadPool(10);
注意:以上代碼用來測試還能夠,實際使用中最好可以顯示地指定相關參數。
咱們能夠看下其內部源碼實現:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
在阿里巴巴代碼規範中,建議咱們本身指定線程池的相關參數,爲的是讓開發人員可以自行理解線程池建立中的每一個參數,根據實際狀況,建立出合理的線程池。接下來,咱們來剖析下java.util.concurrent.ThreadPoolExecutor
的構造方法參數。
java.util.concurrent.ThreadPoolExecutor
有多個構造方法,咱們拿參數最多的構造方法來舉例,如下是阿里巴巴代碼規範中給出的建立線程池的範例:
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
貼一張IDEA中的圖更方便看:
首先最重要的幾個參數,可能就是:corePoolSize
,maximumPoolSize
,workQueue
了,先看下這幾個參數的解釋:
因爲本文是初步瞭解線程池,因此先理解這幾個參數,上文對於這三個參數的解釋,基本上跟JDK源碼中的註釋一致(java.util.concurrent.ThreadPoolExecutor#execute
裏的代碼)。
咱們編寫個程序來方便理解:
// 建立線程池 ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); // 等待執行的runnable Runnable runnable = () -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }; // 啓動的任務數量 int counts = 1224; for (int i = 0; i < counts; i++) { service.execute(runnable); } // 監控線程池執行狀況的代碼 ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service); while (true) { System.out.println(); int queueSize = tpe.getQueue().size(); System.out.println("當前排隊線程數:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("當前活動線程數:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("執行完成線程數:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("總線程數:" + taskCount); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }
線程池的容量與咱們啓動的任務數量息息相關。
已知:
咱們修改同時 execute 添加到線程池的 Runnable 數量 counts:
當前排隊線程數:0 當前活動線程數:3 執行完成線程數:0 總線程數:3
當前排隊線程數:15 當前活動線程數:5 執行完成線程數:0 總線程數:20
當前排隊線程數:1024 當前活動線程數:105 執行完成線程數:0 總線程數:1129
java.util.concurrent.RejectedExecutionException
java.util.concurrent.RejectedExecutionException: Task com.bwjava.util.ExecutorServiceUtilTest$$Lambda$1/314265080@725bef66 rejected from java.util.concurrent.ThreadPoolExecutor@2aaf7cc2[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]
此次的踩坑纔是我寫這篇文章的初衷,藉此機會好好了解下線程池的各個概念。自己這段時間在研究爬蟲,爲了儘可能提升爬蟲的效率,用到了多線程處理。因爲代碼寫得比較隨性,因此遇到了一個阻塞的問題,研究了一下才搞明白,模擬的代碼以下:
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); @Test public void testBlock() { Runnable runnableOuter = () -> { try { Runnable runnableInner1 = () -> { try { TimeUnit.SECONDS.sleep(3); // 模擬比較耗時的爬蟲操做 } catch (InterruptedException e) { e.printStackTrace(); } }; Future<?> submit = service.submit(runnableInner1); submit.get(); // 實際業務中,runnableInner2須要用到此處返回的參數,因此必須get Runnable runnableInner2 = () -> { try { TimeUnit.SECONDS.sleep(5); // 模擬比較耗時的爬蟲操做 } catch (InterruptedException e) { e.printStackTrace(); } }; Future<?> submit2 = service.submit(runnableInner2); submit2.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }; for (int i = 0; i < 20; i++) { service.execute(runnableOuter); } ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service); while (true) { System.out.println(); int queueSize = tpe.getQueue().size(); System.out.println("當前排隊線程數:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("當前活動線程數:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("執行完成線程數:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("總線程數:" + taskCount); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }
線程池是前文的線程池,參數徹底不變。線程的監控代碼也一致。當咱們運行這個單元測試的時候,會發現打印出來的結果一直是以下:
當前排隊線程數:15 當前活動線程數:5 執行完成線程數:0 總線程數:20 當前排隊線程數:20 當前活動線程數:5 執行完成線程數:0 總線程數:25 當前排隊線程數:20 當前活動線程數:5 執行完成線程數:0 總線程數:25 ……略
根本問題是 Runnable 內部還嵌套了 Runnable ,且他們都提交到了一個線程池。下面分步驟說明問題:
runnableOuter 開始執行,runnableInner1 被提交到線程池,對 runnableInner1 的結果進行 get,致使runnableOuter 被阻塞
用圖表示大概爲:
既然明白了出錯的緣由,那麼解決起來就簡單了。這個案例告訴咱們,設計一個多線程程序,必定要自頂向下有一個良好的設計,而後再開始編碼,不可以盲目地使用多線程、線程池,這樣只會致使程序出現莫名其妙的錯誤。
其實這個我沒怎麼關注過,曾經在一次面試中被問到過。很簡單,java.util.concurrent.ThreadPoolExecutor
提供了Setter方法,能夠直接設置相關參數。按我目前的實踐經驗,幾乎沒有用到過,可是知道這個聊勝於無吧。特定的複雜場景下應該頗有用。
筆者在實際工程應用中,使用過多線程和消息隊列處理過異步任務。不少新手工程師每每弄不清楚這二者的區別。按筆者的淺見:
多線程是用來充分利用多核 CPU 以提升程序性能的一種開發技術,線程池能夠維持一個隊列保存等待處理的多線程任務,可是因爲此隊列是內存控制的,因此斷電或系統故障後未執行的任務會丟失。
消息隊列是爲消息處理而生的一門技術。其根據消費者的自身消費能力進行消費的特性使其普遍用於削峯的高併發任務處理。此外利用其去耦合的特性也能夠實現代碼上的解耦。消息隊列大多能夠對其消息進行持久化,即便斷電也可以恢復未被消費的任務並繼續處理。
以上是筆者在學習實踐以後對於多線程和消息隊列的粗淺認識,初學者切莫混淆二者的做用。
參考文獻: