我相信你們在項目中或多或少的都使用過線程,而線程是寶貴的資源,不能頻繁的建立,應當給其餘任務進行復用,因此就有了咱們的線程池。java
你知道咱們如何建立線程池嗎?程序員
這我固然知道了,JDK主要提供了三種建立線程池的方法web
線程池如何使用緩存
ExecutorService threadPool = Executors.newFixedThreadPool(5);
threadPool.execute(() -> {
System.out.println("執行任務");
});
threadPool.shutdown();
複製代碼
你給我講講線程池的原理呢?多線程
上面說的建立線程池的方法實際上都是經過建立ThreadPoolExecutor這個類來實現的,因此咱們直接看這個類的實現原理便可。
首先來看看它的構造方法app
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
複製代碼
先說下它這幾個核心參數的含義測試
固然只是知道這幾個參數也沒有什麼太大的做用,咱們仍是要着眼全局來看ThreadPoolExecutor類。this
首先來認識下線程池中定義的狀態,它們一直貫穿在整個主體spa
// ctl存儲了兩個值,一個是線程池的狀態,
// 另外一個是活動線程數(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池最多容許2^29-1個(大概5億)線程存在,
// 固然首先要你的系統能新建這麼多個
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
複製代碼
從上面的成員變量的定義咱們能夠知道,線程池最多容許5億個(2^29-1)個線程活動,那麼爲何不是2^31-1呢?由於設計者以爲這個值已經夠大了,若是未來以爲這是一個瓶頸的話,會把這個換成Long類型。線程
同時線程池這裏也存在了五個狀態,它們解決着線程池的生命週期。
狀態流轉圖以下:
咱們知道當咱們執行一個task的時候,調用的是execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 若是工做線程數小於核心線程數(corePoolSize),則建立一個工做線程執行任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 若是當前是running狀態,而且任務隊列可以添加任務
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 若是不處於running狀態了(使用者可能調用了shutdown方法),
// 則將剛纔添加到任務隊列的任務移除
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 若是當前沒有工做線程,
// 則新建一個工做線程來執行任務(任務已經被添加到了任務隊列)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 隊列已經滿了的狀況下,則新啓動一個工做線程來執行任務
else if (!addWorker(command, false))
reject(command);
}
複製代碼
而在addWorker方法中還存在有一些必要的判斷邏輯,好比當前線程池是不是非running狀態,隊列是否爲空等條件,固然最主要的邏輯仍是判斷當前工做線程數量是否大於maximumPoolSize以及啓動工做線程執行任務。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
for (;;) {
int wc = workerCountOf(c);
// 1. 判斷當前工做線程是否知足條件
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 2. 增長工做線程數量
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
// 3. 建立工做線程
w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
if (workerAdded) {
// 4. 運行工做線程
t.start();
workerStarted = true;
}
return workerStarted;
}
複製代碼
因此,總結下線程池的工做流程以下:
若是你以爲上面很差記,我給你講個火鍋店的故事你就更加明白了。
之前有個火鍋店,叫作朱帥帥火鍋,老闆是個剛辭掉程序員工做出來創業的帥小夥子,火鍋店不大,只能擺上10張桌子(corePoolSize),若是吃火鍋的來得早就能夠去店裏面坐(店裏有空調),來晚了,店裏面坐滿了,後面來的人就要排隊了(workQueue)。
排隊的人數愈來愈多,朱帥帥一看不是辦法,就給外面擺了幾張臨時桌子(非核心工做線程),讓客人在外面吃。若是店裏面有人吃完了或者外面臨時桌子吃完了就讓排隊的人去吃。後面時間晚了,沒有排隊的人了,老闆就讓人撤了外面的臨時桌子,畢竟擺在外面也不太好,並且還怕城管來。若是生意特別好,又來了特別多的人,已經超出火鍋店的服務能力了,就只能喊他們去別家了。
上面的故事,你要品,細細的品,最後你會發現,代碼來源於生活。
上面一直說到工做線程,工做線程究竟是個什麼鬼?其實工做線程指的就是咱們的Worker類,它是ThreadPoolExecutor中的私有類
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// 運行worker的線程(new Thread(this))
final Thread thread;
// 須要執行的任務
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
複製代碼
能夠看到Woker不只繼承了AbstractQueuedSynchronizer(實現獨佔鎖功能),還實現了Runnable接口。
實際上線程池中的每個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象
Worker用本身做爲task構造一個線程,同時把外層任務賦值給本身的task成員變量,至關於對task作了一個包裝。
addWorker()方法中執行了worker.thread.start(),實際上執行的就是Worker的runWorker方法。
final void runWorker(Worker w) {
// 1. 獲取任務開始執行任務,若是獲取不到任務,當前的worker就會被JVM回收
while (task != null || (task = getTask()) != null) {
task.run();
}
}
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 判斷線程池是否關閉
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 2. 判斷是否須要進行超時控制。
// allowCoreThreadTimeOut默認是false,也就是核心線程不容許進行超時;
// wc > corePoolSize,表示當前線程池中的工做線程數量大於核心線程數量;
// 對於超過核心線程數量的這些線程,須要進行超時控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 3. wc > maximumPoolSize的狀況是由於可能在此方法執行階段同時執行了setMaximumPoolSize方法;
// timed && timedOut 若是爲true,表示當前操做須要進行超時控制,而且上次從阻塞隊列中獲取任務發生了超時
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 4. timed若是爲true,則經過阻塞隊列的poll方法進行超時控制,
//若是在keepAliveTime時間內沒有獲取到任務,則返回null。若是爲false,則直接阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
}
}
複製代碼
上面的代碼中尤爲須要注意的是getTask()中的第3點,它的目的是控制線程池有效的工做線程數量。 從以前的分析咱們能夠知道,若是當前線程池的工做線程數量超過了corePoolSize且小於maximumPoolSize,而且workQueue已滿,則能夠增長工做線程,但這時若是超時沒有獲取到任務,也就是timedOut爲true的狀況,說明workQueue已經爲空了,也就說明了線程池中不須要那麼多線程來執行任務了,能夠把多於corePoolSize數量的線程銷燬掉,保持線程數量在corePoolSize便可。
你剛纔說到拒絕策略,都有哪些拒絕策略呀?
主要有下面4種拒絕策略
你給我說說你開頭說的經過Executors建立的線程池三者有何不一樣嗎?
固定線程數量的線程池,corePoolSize等於maximumPoolSize,採用的阻塞隊列是LinkedBlockingQueue,是一個無界隊列,當任務量忽然很大,線程池來不及處理,就會將任務一直添加到隊列中,就有可能致使內存溢出。
建立單個線程的線程池,corePoolSize = maximumPoolSize = 1,也採用的LinkedBlockingQueue這個無界隊列,當任務量很大,線程池來不及處理,就有可能會致使內存溢出。
建立可緩存的線程池,corePoolSize = 1,maximumPoolSize = Interger.MAX_VALUE;可是使用的是SynousQueue,這個隊列比較特殊,內部沒有結構來存儲任何元素,因此若是任務數很大,而建立的那個線程(corePoolSize=1)遲遲沒有處理完成任務,就會一直建立線程來處理,也有OOM的可能。
cacheThreadPool中的cache其實指的就是SynousQueue,當往這個隊列插入數據的時候,若是沒有任務來取,插入這個過程會被阻塞。
你既然說了都有可能OOM,那麼應該如何建立線程池呢?
實際使用中不建議經過Executors來建立線程池,而是經過 new ThreadPoolExecutor
的方式來建立,而隊列也不建議使用無界隊列,而要使用有界隊列,好比ArrayBlockingQueue。而拒絕策略這個就看你本身需求了(系統提供的若是不知足,就本身寫一個)
同時對於核心線程數的設置也不是越大越好,只能說根據你的需求來設置這個值,通常來說能夠根據下面兩點來進行合理配置
固然啦,這個考慮是不少方面的,不只僅和程序有關,還和硬件等資源有關,總之就是在測試的時候多多調試。
你們不要覺得我上面說的是句廢話,請你自信一點,把覺得去掉。