線程池是開發中繞不開的一個知識點。
對於移動開發而言,網絡框架、圖片加載、AsyncTask、RxJava, 都和線程池有關。
正由於線程池應用如此普遍,因此也成了面試的高頻考點。
java
咱們今天就來說講線程池的基本原理和周邊知識。
先從線程的生命週期開始。面試
線程是程序執行流的最小單元。
Java線程可分爲五個階段:服務器
線程的建立和銷燬代價較高,當有大量的任務時,可複用線程,以提升執行任務的時間佔比。
如上圖,不斷地 Runnable->Runing->Blocked->Runnable, 就可避免過多的線程建立和銷燬。
此外,線程的上下文切換也是開銷比較大的,若要使用線程池,需注意設置合理的參數,控制線程併發。網絡
JDK提供了一個很好用的線程池的封裝:ThreadPoolExecutor併發
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼
corePoolSize:核心線程大小
maximumPoolSize:線程池最大容量(需大於等於corePoolSize,不然會拋異常)
keepAliveTime:線程執行任務結束以後的存活時間
unit:時間單位
workQueue:任務隊列
threadFactory:線程工廠
handler:拒絕策略
框架
線程池中有兩個任務容器:異步
private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;
複製代碼
前者用於存儲Worker,後者用於緩衝任務(Runnable)。
下面是execute方法的簡要代碼:函數
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
}
// 若workQueue已滿,offer會返回false
if (isRunning(c) && workQueue.offer(command)) {
// ...
} else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize))
return false;
Worker w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
t.start();
}
複製代碼
一個任務到來,假設此時容器workers中Worker數的數量爲c,則this
不少人在講線程池的時候,乾脆把workers說成「線程池」,將Worker和線程混爲一談;
不過這也無妨,能幫助理解就好,就像看到一杯水,說「這是水」同樣,不多人會說這是「杯子裝着水」。
spa
Worker和線程,比如汽車和引擎:汽車裝着引擎,汽車行駛,實際上是引擎在作功。
Worker自己實現了Runnable,而後有一個Thread和Runnable的成員;
構造函數中,將自身(this)委託給本身的成員thread;
當thread.start(), Worker的run()函數被回調,從而開啓 「執行任務-獲取任務」的輪迴。
private final class Worker implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
task.run();
}
}
private Runnable getTask() {
for (;;) {
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
}
}
複製代碼
當線程執行完任務(task.run()結束),會嘗試去workQueue取下一個任務,
若是workQueue已經清空,則線程進入阻塞態:workQueue是阻塞隊列,若是取不到元素會block當前線程。
此時,allowCoreThreadTimeOut爲true, 或者 n > corePoolSize,workQueue等待keepAliveTime的時間,
若是時間到了尚未任務進來, 則退出循環, 線程銷燬;
不然,一直等待,直到新的任務到來(或者線程池關閉)。
這就是線程池能夠保留corePoolSize個線程存活的原理。
從線程的角度,要麼執行任務,要麼阻塞等待,或者銷燬;
從任務的角度,要麼立刻被執行,要麼進入隊列等待被執行,或者被拒絕執行。
上圖第2步,任務進入workQueue, 若是隊列爲空且有空閒的Worker的話,可立刻獲得執行。
關於workQueue,經常使用的有兩個隊列:
爲了方便使用,JDK還封裝了一些經常使用的ExecutorService:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
複製代碼
類型 | 最大併發 | 適用場景 |
---|---|---|
newFixedThreadPool | nThreads | 計算密集型任務 |
newSingleThreadExecutor | 1 | 串行執行的任務 |
newCachedThreadPool | Integer.MAX_VALUE | IO密集型任務 |
newScheduledThreadPool | Integer.MAX_VALUE | 定時任務,週期任務 |
newSingleThreadExecutor 實際上是 newFixedThreadPool的特例 (nThreads=1),
寫日誌等任務,比較適合串行執行,一者不會佔用太多資源,兩者爲保證日誌有序與完整,同一時間一個線程寫入便可。
衆多方法中,newCachedThreadPool() 是比較特別的,
一、corePoolSize = 0,
二、maximumPoolSize = Integer.MAX_VALUE,
三、workQueue 爲 SynchronousQueue。
結合上一節的分析:
當一個任務提交過來,因爲corePoolSize = 0,任務會嘗試放入workQueue;
若是沒有線程在嘗試從workQueue獲取任務,offer()會返回false,而後會建立線程執行任務;
若是有空閒線程在等待任務,任務能夠放進workQueue,可是放進去後立刻就被等待任務的線程取走執行了。
總的來講,就是有空閒線程則交給空閒線程執行,沒有則建立線程執行;
SynchronousQueue類型workQueue並不保存任務,只是一個傳遞者。
因此,最終效果爲:全部任務當即調度,無容量限制,無併發限制。
這樣的特色比較適合網絡請求任務。
OkHttp的異步請求所用線程池與此相似(除了ThreadFactory ,其餘參數如出一轍)。
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
複製代碼
一臺設備上,給定一批任務,要想最快時間完成全部任務,併發量應該如何控制?
併發量過小,CPU利用率不高;
併發量太大,CPU 滿負荷,可是花在線程切換的時間增長,用於執行任務的時間反而減小。
一些文章提到以下估算公式:
M:併發數;
C:任務佔用CPU的時間;
I:等待IO完成的時間(爲簡化討論,且只考慮IO);
N:CPU核心數。
代入特定參數驗證這條公式:
一、比方說 I 接近於0,則M≈N,一個線程對應一個CPU,恰好滿負荷且較少線程切換;
二、假如 I=C,則M = 2N,兩個線程對應一個CPU,每一個線程一半時間在等待IO,一半時間在計算,也是恰好。
遺憾的是,對於APP而言這條公式並不適用:
雖然該公式不能直接套用來求解最大併發,但仍有一些指導意義:
IO等待時間較多,則須要高的併發,來達到高的吞吐率;
CPU計算部分較多,則須要下降併發,來提升CPU的利用率。
換言之,就是:
作計算密集型任務時控制併發小一點;
作IO密集型任務時控制併發大一點。
問題來了,小一點是多小,大一點又是多大呢?
說實話這個只能憑經驗了,跟「多吃水果」,「加鹽少量」同樣,看實際狀況而定。
好比RxJava就提供了Schedulers.computation()和Schedulers.io(),
前者默認狀況下爲最大併發爲CPU核心數,後者最大併發爲Integer.MAX_VALUE(至關於不限制併發)。
多是做者也不知道多少才合適,因此乾脆就不限制了。
這樣其實很危險的,JVM對進程有最大線程數限制,超過則會拋OutOfMemoryError。
回顧文章的內容,大概有這些點:
文章沒有對Java線程池作太過深刻的探討,而是從使用的角度講述基本原理和周邊知識; 第二節有結合關鍵代碼做簡要分析,也是點到爲止,目的在於加深對線程池相關參數的理解, 以便在平時使用線程池的時候合理斟酌,在閱讀涉及線程池的開源代碼時也能「知其因此然」。