速讀Java線程池

1、前言

線程池是開發中繞不開的一個知識點。
對於移動開發而言,網絡框架、圖片加載、AsyncTask、RxJava, 都和線程池有關。
正由於線程池應用如此普遍,因此也成了面試的高頻考點。
java

咱們今天就來說講線程池的基本原理和周邊知識。
先從線程的生命週期開始。面試

2、線程生命週期

線程是程序執行流的最小單元。
Java線程可分爲五個階段:服務器

  • 新建(New): 建立Thread對象,而且未調用start();
  • 就緒(Runnable): 調用start()以後, 等待操做系統調度;
  • 運行(Running): 獲取CPU時間分片,執行 run()方法中的代碼;
  • 阻塞(Blocked): 線程讓出CPU,進入等待(就緒);
  • 終止(Terminated): 天然退出或者被終止。

線程的建立和銷燬代價較高,當有大量的任務時,可複用線程,以提升執行任務的時間佔比。
如上圖,不斷地 Runnable->Runing->Blocked->Runnable, 就可避免過多的線程建立和銷燬。
此外,線程的上下文切換也是開銷比較大的,若要使用線程池,需注意設置合理的參數,控制線程併發。網絡

3、ThreadPoolExecutor

JDK提供了一個很好用的線程池的封裝:ThreadPoolExecutor併發

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼

corePoolSize:核心線程大小
maximumPoolSize:線程池最大容量(需大於等於corePoolSize,不然會拋異常)
keepAliveTime:線程執行任務結束以後的存活時間
unit:時間單位
workQueue:任務隊列
threadFactory:線程工廠
handler:拒絕策略
框架

線程池中有兩個任務容器:異步

private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;
複製代碼

前者用於存儲Worker,後者用於緩衝任務(Runnable)。
下面是execute方法的簡要代碼:函數

public void execute(Runnable command) {
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
        }
        // 若workQueue已滿,offer會返回false
        if (isRunning(c) && workQueue.offer(command)) {
            // ...
        } else if (!addWorker(command, false))
            reject(command);
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        int wc = workerCountOf(c);
        if (wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
        Worker w = new Worker(firstTask);
        final Thread t = w.thread;
        workers.add(w);
        t.start();
    }
複製代碼

一個任務到來,假設此時容器workers中Worker數的數量爲c,則this

  • 一、當c < corePoolSize時,建立Worker來執行這個任務,並放入workers
  • c >= corePoolSize時,
    • 二、若workQueue未滿,則將任務放入workQueue
    • workQueue已滿,
      • 三、若c < maximumPoolSize,建立Worker來執行這個任務,並放入workers
      • 四、若c >= maximumPoolSize, 執行拒絕策略。

不少人在講線程池的時候,乾脆把workers說成「線程池」,將Worker和線程混爲一談;
不過這也無妨,能幫助理解就好,就像看到一杯水,說「這是水」同樣,不多人會說這是「杯子裝着水」。
spa

Worker和線程,比如汽車和引擎:汽車裝着引擎,汽車行駛,實際上是引擎在作功。
Worker自己實現了Runnable,而後有一個Thread和Runnable的成員;
構造函數中,將自身(this)委託給本身的成員thread
thread.start(), Worker的run()函數被回調,從而開啓 「執行任務-獲取任務」的輪迴。

private final class Worker implements Runnable{
        final Thread thread;
        Runnable firstTask;
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            runWorker(this);
        }
    }

    final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        while (task != null || (task = getTask()) != null) {
            task.run();
        }
    }

    private Runnable getTask() {
        for (;;) {
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
        }
    }
複製代碼

當線程執行完任務(task.run()結束),會嘗試去workQueue取下一個任務,
若是workQueue已經清空,則線程進入阻塞態:workQueue是阻塞隊列,若是取不到元素會block當前線程。
此時,allowCoreThreadTimeOuttrue, 或者 n > corePoolSize,workQueue等待keepAliveTime的時間,
若是時間到了尚未任務進來, 則退出循環, 線程銷燬;
不然,一直等待,直到新的任務到來(或者線程池關閉)。
這就是線程池能夠保留corePoolSize個線程存活的原理。

從線程的角度,要麼執行任務,要麼阻塞等待,或者銷燬;
從任務的角度,要麼立刻被執行,要麼進入隊列等待被執行,或者被拒絕執行。
上圖第2步,任務進入workQueue, 若是隊列爲空且有空閒的Worker的話,可立刻獲得執行。

關於workQueue,經常使用的有兩個隊列:

  • LinkedBlockingQueue(capacity):
    傳入capacity(大於0), 則LinkedBlockingQueue的容量爲capacity;
    若是不傳,默認爲Integer.MAX_VALUE,至關於無限容量(不考慮內存因素),多少元素都裝不滿。
  • SynchronousQueue 除非另外一個線程試圖移除獲取元素,不然不能添加元素。

4、 ExecutorService

爲了方便使用,JDK還封裝了一些經常使用的ExecutorService:

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}
複製代碼
類型 最大併發 適用場景
newFixedThreadPool nThreads 計算密集型任務
newSingleThreadExecutor 1 串行執行的任務
newCachedThreadPool Integer.MAX_VALUE IO密集型任務
newScheduledThreadPool Integer.MAX_VALUE 定時任務,週期任務

newSingleThreadExecutor 實際上是 newFixedThreadPool的特例 (nThreads=1),
寫日誌等任務,比較適合串行執行,一者不會佔用太多資源,兩者爲保證日誌有序與完整,同一時間一個線程寫入便可。

衆多方法中,newCachedThreadPool() 是比較特別的,
一、corePoolSize = 0,
二、maximumPoolSize = Integer.MAX_VALUE,
三、workQueue 爲 SynchronousQueue。

結合上一節的分析:
當一個任務提交過來,因爲corePoolSize = 0,任務會嘗試放入workQueue;
若是沒有線程在嘗試從workQueue獲取任務,offer()會返回false,而後會建立線程執行任務;
若是有空閒線程在等待任務,任務能夠放進workQueue,可是放進去後立刻就被等待任務的線程取走執行了。
總的來講,就是有空閒線程則交給空閒線程執行,沒有則建立線程執行;
SynchronousQueue類型workQueue並不保存任務,只是一個傳遞者。
因此,最終效果爲:全部任務當即調度,無容量限制,無併發限制。

這樣的特色比較適合網絡請求任務。
OkHttp的異步請求所用線程池與此相似(除了ThreadFactory ,其餘參數如出一轍)。

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
複製代碼

5、 任務併發的估算

一臺設備上,給定一批任務,要想最快時間完成全部任務,併發量應該如何控制?
併發量過小,CPU利用率不高;
併發量太大,CPU 滿負荷,可是花在線程切換的時間增長,用於執行任務的時間反而減小。

一些文章提到以下估算公式:

M:併發數;
C:任務佔用CPU的時間;
I:等待IO完成的時間(爲簡化討論,且只考慮IO);
N:CPU核心數。

代入特定參數驗證這條公式:
一、比方說 I 接近於0,則M≈N,一個線程對應一個CPU,恰好滿負荷且較少線程切換;
二、假如 I=C,則M = 2N,兩個線程對應一個CPU,每一個線程一半時間在等待IO,一半時間在計算,也是恰好。

遺憾的是,對於APP而言這條公式並不適用:

  • 任務佔用CPU時間和IO時間沒法估算
    APP上的異步任務一般是碎片化的,而不一樣的任務性質不同,有的計算耗時多,有的IO耗時多;
    而後一樣是IO任務,比方說網絡請求,IO時間也是不可估計的(受服務器和網速影響)。
  • 可用CPU核心可能會變化
    有的設備可能會考慮省電或者熱量控制而關閉一些核心;
    你們常常吐槽的「一核有難,九核圍觀」映射的就是這種現象。

雖然該公式不能直接套用來求解最大併發,但仍有一些指導意義:
IO等待時間較多,則須要高的併發,來達到高的吞吐率;
CPU計算部分較多,則須要下降併發,來提升CPU的利用率。

換言之,就是:
計算密集型任務時控制併發小一點;
IO密集型任務時控制併發大一點。

問題來了,小一點是多小,大一點又是多大呢?
說實話這個只能憑經驗了,跟「多吃水果」,「加鹽少量」同樣,看實際狀況而定。

好比RxJava就提供了Schedulers.computation()Schedulers.io()
前者默認狀況下爲最大併發爲CPU核心數,後者最大併發爲Integer.MAX_VALUE(至關於不限制併發)。
多是做者也不知道多少才合適,因此乾脆就不限制了。
這樣其實很危險的,JVM對進程有最大線程數限制,超過則會拋OutOfMemoryError

6、總結

回顧文章的內容,大概有這些點:

  • 介紹了線程的生命週期;
  • 從線程池的參數入手,分析這些參數是如何影響線程池的運做;
  • 列舉經常使用的ExecutorService,介紹其各自特色和適用場景;
  • 對併發估算的一些理解。

文章沒有對Java線程池作太過深刻的探討,而是從使用的角度講述基本原理和周邊知識; 第二節有結合關鍵代碼做簡要分析,也是點到爲止,目的在於加深對線程池相關參數的理解, 以便在平時使用線程池的時候合理斟酌,在閱讀涉及線程池的開源代碼時也能「知其因此然」。

相關文章
相關標籤/搜索