Java 線程池的認識和使用

多線程編程很難,難點在於多線程代碼的執行不是按照咱們直覺上的執行順序。因此多線程編程必需要創建起一個宏觀的認識。java

線程池是多線程編程中的一個重要概念。爲了可以更好地使用多線程,學習好線程池固然是必須的。git

爲何要使用線程池?

平時咱們在使用多線程的時候,一般都是架構師配置好了線程池的 Bean,咱們須要使用的時候,提交一個線程便可,不須要過多關注其內部原理。github

在學習一門新的技術以前,咱們仍是先了解下爲何要使用它,使用它可以解決什麼問題:面試

  1. 建立/銷燬線程伴隨着系統開銷,過於頻繁的建立/銷燬線程,會很大程度上影響處理效率

    例如:編程

    記建立線程消耗時間T1,執行任務消耗時間T2,銷燬線程消耗時間T3緩存

    若是T1+T3>T2,那麼是否是說開啓一個線程來執行這個任務太不划算了!多線程

    正好,線程池緩存線程,可用已有的閒置線程來執行新任務,避免了T1+T3帶來的系統開銷架構

  2. 線程併發數量過多,搶佔系統資源從而致使阻塞

    咱們知道線程能共享系統資源,若是同時執行的線程過多,就有可能致使系統資源不足而產生阻塞的狀況併發

    運用線程池能有效的控制線程最大併發數,避免以上的問題異步

  3. 對線程進行一些簡單的管理

    好比:延時執行、定時循環執行的策略等

    運用線程池都能進行很好的實現

建立一個線程池

在 Java 中,新建一個線程池對象很是簡單,Java 自己提供了工具類java.util.concurrent.Executors,可使用以下代碼建立一個固定數量線程的線程池:

ExecutorService service = Executors.newFixedThreadPool(10);

注意:以上代碼用來測試還能夠,實際使用中最好可以顯示地指定相關參數。

咱們能夠看下其內部源碼實現:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

在阿里巴巴代碼規範中,建議咱們本身指定線程池的相關參數,爲的是讓開發人員可以自行理解線程池建立中的每一個參數,根據實際狀況,建立出合理的線程池。接下來,咱們來剖析下java.util.concurrent.ThreadPoolExecutor的構造方法參數。

ThreadPoolExecutor 淺析

java.util.concurrent.ThreadPoolExecutor有多個構造方法,咱們拿參數最多的構造方法來舉例,如下是阿里巴巴代碼規範中給出的建立線程池的範例:

ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), 
                new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), 
                new ThreadPoolExecutor.AbortPolicy());

貼一張IDEA中的圖更方便看:

clipboard.png

首先最重要的幾個參數,可能就是:corePoolSizemaximumPoolSizeworkQueue了,先看下這幾個參數的解釋:

  • corePoolSize
    用於設定 thread pool 須要時刻保持的最小 core threads 的數量,即使這些 core threads 處於空閒狀態啥事都不作也不會將它們回收掉,固然前提是你沒有設置 allowCoreThreadTimeOut 爲 true。至於 pool 是如何作到保持這些個 threads 不死的,咱們稍後再說。
  • maximumPoolSize
    用於限定 pool 中線程數的最大值。若是你本身構造了 pool 且傳入了一個 Unbounded 的 queue 且沒有設置它的 capacity,那麼很差意思,最大線程數會永遠 <= corePoolSize,maximumPoolSize 變成了無效的。
  • workQueue
    該線程池中的任務隊列:維護着等待執行的 Runnable 對象。當全部的核心線程都在幹活時,新添加的任務會被添加到這個隊列中等待處理,若是隊列滿了,則新建非核心線程執行任務

因爲本文是初步瞭解線程池,因此先理解這幾個參數,上文對於這三個參數的解釋,基本上跟JDK源碼中的註釋一致(java.util.concurrent.ThreadPoolExecutor#execute裏的代碼)。

咱們編寫個程序來方便理解:

// 建立線程池
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1024),
            new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(),
            new ThreadPoolExecutor.AbortPolicy());
// 等待執行的runnable   
Runnable runnable = () -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

// 啓動的任務數量
int counts = 1224;
for (int i = 0; i < counts; i++) {
    service.execute(runnable);
}

// 監控線程池執行狀況的代碼 
ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service);
while (true) {
    System.out.println();

    int queueSize = tpe.getQueue().size();
    System.out.println("當前排隊線程數:" + queueSize);

    int activeCount = tpe.getActiveCount();
    System.out.println("當前活動線程數:" + activeCount);

    long completedTaskCount = tpe.getCompletedTaskCount();
    System.out.println("執行完成線程數:" + completedTaskCount);

    long taskCount = tpe.getTaskCount();
    System.out.println("總線程數:" + taskCount);

    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

線程池的容量與咱們啓動的任務數量息息相關。

已知:

  • corePoolSize = 5
  • maximumPoolSize = 200
  • workQueue.size() = 1024

咱們修改同時 execute 添加到線程池的 Runnable 數量 counts:

  • counts <= corePoolSize:全部的任務均爲核心線程執行,沒有任何 Runnable 被添加到 workQueue中
當前排隊線程數:0
當前活動線程數:3
執行完成線程數:0
總線程數:3
  • corePoolSize < counts <= corePoolSize + workQueue.size():全部任務均爲核心線程執行,當核心線程處於繁忙狀態,則將任務添加到 workQueue 中等待
當前排隊線程數:15
當前活動線程數:5
執行完成線程數:0
總線程數:20
  • corePoolSize + workQueue.size() < counts <= maximumPoolSize + workQueue.size():corePoolSize 個線程由核心線程執行,超出隊列長度 workQueue.size() 的任務,將另啓動非核心線程執行
當前排隊線程數:1024
當前活動線程數:105
執行完成線程數:0
總線程數:1129
  • counts > maximumPoolSize + workQueue.size():將會報異常java.util.concurrent.RejectedExecutionException
java.util.concurrent.RejectedExecutionException: Task com.bwjava.util.ExecutorServiceUtilTest$$Lambda$1/314265080@725bef66 rejected from java.util.concurrent.ThreadPoolExecutor@2aaf7cc2[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]

線程池踩坑:線程嵌套致使阻塞

此次的踩坑纔是我寫這篇文章的初衷,藉此機會好好了解下線程池的各個概念。自己這段時間在研究爬蟲,爲了儘可能提升爬蟲的效率,用到了多線程處理。因爲代碼寫得比較隨性,因此遇到了一個阻塞的問題,研究了一下才搞明白,模擬的代碼以下:

ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(1024),
        new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(),
        new ThreadPoolExecutor.AbortPolicy());

@Test
public void testBlock() {
    Runnable runnableOuter = () -> {
        try {
            Runnable runnableInner1 = () -> {
                try {
                    TimeUnit.SECONDS.sleep(3); // 模擬比較耗時的爬蟲操做
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            Future<?> submit = service.submit(runnableInner1);

            submit.get(); // 實際業務中,runnableInner2須要用到此處返回的參數,因此必須get

            Runnable runnableInner2 = () -> {
                try {
                    TimeUnit.SECONDS.sleep(5); // 模擬比較耗時的爬蟲操做
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            Future<?> submit2 = service.submit(runnableInner2);
            submit2.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    };

    for (int i = 0; i < 20; i++) {
        service.execute(runnableOuter);
    }

    ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service);

    while (true) {
        System.out.println();

        int queueSize = tpe.getQueue().size();
        System.out.println("當前排隊線程數:" + queueSize);

        int activeCount = tpe.getActiveCount();
        System.out.println("當前活動線程數:" + activeCount);

        long completedTaskCount = tpe.getCompletedTaskCount();
        System.out.println("執行完成線程數:" + completedTaskCount);

        long taskCount = tpe.getTaskCount();
        System.out.println("總線程數:" + taskCount);

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

線程池是前文的線程池,參數徹底不變。線程的監控代碼也一致。當咱們運行這個單元測試的時候,會發現打印出來的結果一直是以下:

當前排隊線程數:15
當前活動線程數:5
執行完成線程數:0
總線程數:20

當前排隊線程數:20
當前活動線程數:5
執行完成線程數:0
總線程數:25

當前排隊線程數:20
當前活動線程數:5
執行完成線程數:0
總線程數:25

……略

根本問題是 Runnable 內部還嵌套了 Runnable ,且他們都提交到了一個線程池。下面分步驟說明問題:

  1. runnableOuter 被提交到了線程池
  2. runnableOuter 開始執行,runnableInner1 被提交到線程池,對 runnableInner1 的結果進行 get,致使runnableOuter 被阻塞

    1. 於此同時,更多的 runnableOuter 被提交到線程池,核心線程被 runnableOuter 和 runnableInner1 佔滿,多餘的線程 runnableInner2 被加入 workQueue 中等待執行
  3. runnableInner2 被提交到線程池,可是由於核心線程已滿,被提交到了 workQueue ,也處於阻塞狀態,此時對 runnableInner2 的結果進行 get,致使 runnableOuter 被阻塞
  4. runnableOuter 被阻塞,沒法釋放核心線程資源,而 runnableInner2 又由於沒法獲得核心線程資源,只能呆在 workQueue 裏,致使整個程序卡死,沒法返回。(有點相似死鎖,互相佔有了資源,對方不釋放,我也不釋放)

用圖表示大概爲:

clipboard.png

既然明白了出錯的緣由,那麼解決起來就簡單了。這個案例告訴咱們,設計一個多線程程序,必定要自頂向下有一個良好的設計,而後再開始編碼,不可以盲目地使用多線程、線程池,這樣只會致使程序出現莫名其妙的錯誤。

動態修改 corePoolSize & maximumPoolSize

其實這個我沒怎麼關注過,曾經在一次面試中被問到過。很簡單,java.util.concurrent.ThreadPoolExecutor提供了Setter方法,能夠直接設置相關參數。按我目前的實踐經驗,幾乎沒有用到過,可是知道這個聊勝於無吧。特定的複雜場景下應該頗有用。

線程池和消息隊列

筆者在實際工程應用中,使用過多線程和消息隊列處理過異步任務。不少新手工程師每每弄不清楚這二者的區別。按筆者的淺見:

多線程是用來充分利用多核 CPU 以提升程序性能的一種開發技術,線程池能夠維持一個隊列保存等待處理的多線程任務,可是因爲此隊列是內存控制的,因此斷電或系統故障後未執行的任務會丟失。

消息隊列是爲消息處理而生的一門技術。其根據消費者的自身消費能力進行消費的特性使其普遍用於削峯的高併發任務處理。此外利用其去耦合的特性也能夠實現代碼上的解耦。消息隊列大多能夠對其消息進行持久化,即便斷電也可以恢復未被消費的任務並繼續處理。

以上是筆者在學習實踐以後對於多線程和消息隊列的粗淺認識,初學者切莫混淆二者的做用。

參考文獻:

  1. Deep thinking in Java thread pool
  2. 線程池,這一篇或許就夠了
相關文章
相關標籤/搜索