以前使用線程池都是經過Executors.new...這種方式建立,由於Doug Lea已經給咱們作了相對通用的設置,這麼作的話簡單又安全。可是有時候根據不一樣的場景可能須要進行一些自定義的操做。
好比,我須要一個初始狀況下,使用10條核心線程運行任務,可是考慮到服務器的資源有限,咱們但願限制在最多隻能使用20條線程。我大概是這麼定義的:java
ThreadPoolExecutor service = new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
在運行的時候發現活躍線程數(ActiveCount)最大值永遠等於核心線程池數(CorePoolSize),因而翻了下代碼:安全
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
發如今實例化LinkedBlockingQueue的時候,這個隊列的默認值是Integer的最大值,那這能夠認爲是一個無界隊列,基本上是沒法被填滿的,那這就等於一個線程池數爲10的固定線程池了。若是隊列不滿那麼,就永遠走不到新建工做線程的邏輯裏面去。這也就解釋了,爲何執行ActiveCount的永遠爲corePoolSize。因此爲了使隊列有界,從新定義隊列長度:服務器
ThreadPoolExecutor service = new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(50));
執行一段時間後發現拋出異常:this
java.util.concurrent.RejectedExecutionException
問題的緣由是什麼呢?在對隊列設置了長度以後,當corePoolSize數量的線程都在運行狀態會調用內部的addWorker建立非核心線程。線程
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 當workQueue達到容量上限的時候,offer()方法會返回false,進而走到下面addWorker的這個邏輯分支。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
addWorker中有一條邏輯以下,當大於等於最大線程池數量的時候返回false:code
int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
若是返回false就會執行reject()方法,而reject()所在的拒絕策略是默認的AbortPolicy,因此會拋出異常:隊列
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
絕大多數的任務應該都不容許丟棄,因此咱們還須要指定拒絕策略,好比JDK已經提供的實現CallerRunsPolicy。固然也能夠根據具體的場景自定義拒絕策略,好比將任務阻塞插入工做隊列中:資源
RejectedExecutionHandler handler = (r, executor1)-> { try { executor1.getQueue().put(r); } catch (InterruptedException e) { e.printStackTrace(); } };
關於線程池的相關感悟就這些。rem