爲何要使用線程池?在多線程併發開發中,線程的數量較多,且每一個線程執行必定的時間後就結束了,下一個線程任務到來還須要從新建立線程,這樣線程數量特別龐大的時候,頻繁的建立線程和銷燬線程須要必定時間並且增長系統的額外開銷。基於這樣的場景,線程池就出現了,線程池能夠作到一個線程的任務處理完能夠接受下一個任務,並不須要頻繁的建立銷燬,這樣大大節省了時間和系統的開銷。java
線程池,顧名思義,就是一個池子,任務提交的到線程池後,線程池會在池子裏邊找有沒有空閒的線程,若是沒有,就會進入等待狀態,有就會分配一個空閒的線程來接受這個任務,當服務執行完,重新放回到線程池,不須要銷燬。在這種模式下,系統大大減小了建立線程個銷燬線程的資源開銷,並且一個線程能夠用來執行多個任務,咱們能夠根據系統的配置靈活調整線程池的大小,從而更高效的執行任務。多線程
線程池主要包含:Executors,Executor,ExecutorService,AbstractExecutorService,ThreadPoolExecutor這些類。Executors用來建立線程池,返回ExecutorService的對象,該對象就能夠調用execute方法或者submit方法執行線程。固然,咱們也能夠本身new一個。併發
Executor,ExecutorService,AbstractExecutorService,ThreadPoolExecutor的繼承關係的繼承關係爲:Executor是一個接口,裏面只有execute方法聲明,接口ExecutorService繼承Executor接口,裏面包含shutdown(),shutdownNow(),isTerminated(),submit等方法; AbstractExecutorService是ExecutorService的實現類,實現了該類中的方法,ThreadPoolExecutor繼承AbstractExecutorService。ide
RUNNING:能夠接受新任務,也能夠處理阻塞隊列裏面的任務工具
SHUTDOWN:不接受新任務,可是能夠處理阻塞隊列裏的任務oop
STOP:不在接收新任務,也再也不處理阻塞隊列裏的任務,並中斷正在處理的任務源碼分析
TIDYING:中間狀態:線程池中沒有有效的線程,調用terminate進入TERMINATE狀態測試
TERMINATE:終止狀態ui
ExecutorService executor = Executors.newFixedThreadPool(100);
經過API咱們能夠看到建立線程池的過程。this
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
Executors這個類中基本都是靜態方法,代理了線程池的建立,大大簡化了我麼建立線程池工做量,經過方法名咱們就能夠建立咱們想要的線程池,他的內部其實都是統一的方法實現的,經過構造方法重載實現不一樣的功能,可是不看源碼,是很難知道他們的具體做用的。咱們能夠看到,這裏面有好幾種建立線程池的方法,他們有什麼區別呢?
1. newFixedThreadPool(int)方法,內部實現以下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
建立指定大小的線程池,若是超出大小,放入block隊列,即LinkedBlockingQueue隊列,默認的線程工廠爲defaultThreadFactory。
2. newWorkStealingPool(int),內部實現以下:
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
JDK1.8新增,返回ForkJoin,我的感受有一點mapReduce的思想。
3.newSingleThreadPool,源碼以下:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
建立單個線程的線程池。
4. newCachedThreadPool,源碼以下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
線程池長度超過處理須要,靈活回收空閒線程,若無可回收,則建立新線程。
Executors裏面還有好多方法,咱們仔細查看API就能夠了解的個大概,它是一個工具類,提供了一些靜態方法。從源碼中咱們能夠看到建立線程池返回的是return new ThreadPoolExecutor方法,它的源碼以下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
剛看源碼的時候的確很痛苦,咱們得熟悉做者的思想,他爲何要這麼寫,知道了做者的思想之後就好辦多了,我是結合英文說明來揣摩的,下面咱們看每一個參數的意思。
corePoolSize:核心線程大小,線程數一旦超過這個值,多餘的就會被放入等待隊列
maximumPoolSize:線程池中的最大線程數量,這個通常用不到,源碼中能夠看到corePoolSize和maximumPoolSize是同樣的,不一樣的是大於這個值會由丟棄處理機制來處理,不會被放入等待隊列。
keepAliveTime:保持時間,當線程沒有任務處理後,保持多久結束,默認是0
workQueue:等待隊列,默認爲LinkedBlockingQueue,這就是前面提到的等待隊列,裏面是一個HashSet,內部包裝了一層。
threadFactory:構造Thread方法,咱們能夠本身包裝和傳遞,實現newThread方法
handler:這就是前面提到的丟棄處理機制方法,實現接口RejectExecutionHandler中的方法便可。
在作項目的時候發現線程池有兩個執行方法可供調用,分別是execute和submit,那麼這兩個方法有什麼區別呢?在看submit源碼的時候能夠看到submit最終仍是會調用execute方法。
不一樣的是submit方法提供了一個Future來託管返回值的處理,當調用這個方法須要有返回值的時候,能夠用這個方法,execute只能接受Runnable做爲參數,而submit除了Runnable還能夠接收Callable。
下面來分析最重要的execute方法源碼:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * Proceed in 3 steps: 6 * 7 * 1. If fewer than corePoolSize threads are running, try to 8 * start a new thread with the given command as its first 9 * task. The call to addWorker atomically checks runState and 10 * workerCount, and so prevents false alarms that would add 11 * threads when it shouldn't, by returning false. 12 * 13 * 2. If a task can be successfully queued, then we still need 14 * to double-check whether we should have added a thread 15 * (because existing ones died since last checking) or that 16 * the pool shut down since entry into this method. So we 17 * recheck state and if necessary roll back the enqueuing if 18 * stopped, or start a new thread if there are none. 19 * 20 * 3. If we cannot queue task, then we try to add a new 21 * thread. If it fails, we know we are shut down or saturated 22 * and so reject the task. 23 */ 24 int c = ctl.get(); 25 if (workerCountOf(c) < corePoolSize) { 26 if (addWorker(command, true)) 27 return; 28 c = ctl.get(); 29 } 30 if (isRunning(c) && workQueue.offer(command)) { 31 int recheck = ctl.get(); 32 if (! isRunning(recheck) && remove(command)) 33 reject(command); 34 else if (workerCountOf(recheck) == 0) 35 addWorker(null, false); 36 } 37 else if (!addWorker(command, false)) 38 reject(command); 39 }
代碼解釋:若是任務爲空,返回空異常;接下來int c = ctl.get();獲取線程池的狀態位,進入if中計算線程池的數量,若是小於線程池的核心線程數,就封裝成一個工做(work),失敗了繼續獲取線程池狀態位;if (isRunning(c) && workQueue.offer(command))判斷線程池是否正常運行,正常的話就把當前線程添加到工做隊列而且再次獲取線程池狀態位,if (! isRunning(recheck) && remove(command))若是沒有運行的線程了,就把剛纔添加的線程移除,移除成功後,使用拒絕策略reject(command); else if (workerCountOf(recheck) == 0)
addWorker(null, false);若是線程池的線程數爲0,那麼就要添加一個空任務繼續運行,以此來保證能夠繼續接收新任務而繼續運行。
else if (!addWorker(command, false))
reject(command);
若是核心線程滿了,工做隊列也飽和了,開啓非核心線程也失敗了就會拒絕,此時已經達到最大線程數了。
從英文解釋中,咱們能夠看到:基本分三步:
a) 開啓線程執行任務,直到達到最大核心線程數
b) 達到核心線程數時,將接受的新任務放入工做隊列
c) 當工做隊列也放滿後,就會開啓線程(非核心)執行任務,直到到達最大線程數
d) 以上條件都不知足時,就執行默認的拒絕策略
addWork源碼:
private boolean addWorker(Runnable firstTask, boolean core) { retry: //循環標誌 for (;;) { 死循環 int c = ctl.get();//獲取狀態位 int rs = runStateOf(c);//計算線程池的狀態 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;//這一段說的是線程池不能正常運行的狀況:線程池狀態關閉、任務爲空、隊列爲空返回錯誤 for (;;) {//死循環 int wc = workerCountOf(c);//計算線程數 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;//若是線程數超出核心線程數,返回錯誤 if (compareAndIncrementWorkerCount(c))//增長worker的數量 break retry;//回到進入該方法的循環狀態 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry;//若是狀態發生改變,就回退 // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false;//線程是否開始運行 boolean workerAdded = false;//worker是否添加成功 Worker w = null; try { w = new Worker(firstTask);//封裝成worker final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock;//加鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());計算線程池狀態 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true;//worker添加成功 } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//啓動剛剛添加的任務 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//失敗後執行的操做 } return workerStarted; }
從對源碼的翻譯中咱們能夠知道這個方法是有什麼做用,簡單說就是:建立任務,封裝任務。
進行一個簡單的測試模擬線程池的工做原理:
模擬多線程:
public class TestThreadPool implements Runnable { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
測試類:
public static void main(String[] args) { //指定3個長度的工做隊列 LinkedBlockingDeque<Runnable> workQueue=new LinkedBlockingDeque<>(3); //指定線程池參數:核心線程數,線程池最大線程數量,活躍時間,工做隊列 ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(4, 7, 90, TimeUnit.SECONDS, workQueue); for (int i = 0; i < 15; i++) { threadPoolExecutor.execute(new Thread(new TestThreadPool(), "線程:".concat(i+""))); System.out.println("線程池中活躍線程數"+threadPoolExecutor.getActiveCount()); if(workQueue.size()>0){ System.out.println("被阻塞的線程數爲:"+workQueue.size()); } } }
指定線程池核心數爲4,最大線程數量7,工做隊列最大放入3個線程,模擬15個線程併發。運行結果以下:
線程池中活躍線程數1 線程池中活躍線程數2 線程池中活躍線程數3 線程池中活躍線程數4 線程池中活躍線程數4 被阻塞的線程數爲:1 線程池中活躍線程數4 被阻塞的線程數爲:2 線程池中活躍線程數4 被阻塞的線程數爲:3 線程池中活躍線程數5 被阻塞的線程數爲:3 線程池中活躍線程數6 被阻塞的線程數爲:3 線程池中活躍線程數7 被阻塞的線程數爲:3 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Thread[線程:10,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@42a57993[Running, pool size = 7, active threads = 7, queued tasks = 3, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at main.Main.main(Main.java:19)
能夠看到,建立了4個核心線程和3個非核心線程,當線程數超出了線程池可容納的的最大數量,執行了拒絕策略Reject,說明隊列和線程池都滿了,線程池處於飽和狀態,另一個緣由是完成的線程沒有及時釋放,而是進入了休眠。
線程池工做原理:任務開始後,開始建立新的線程,當達到核心線程數後,新的任務進來不在建立新的線程,這時候把任務加入工做隊列,當達到工做隊列的長度後,新任務開始建立新的普通線程,直到數量達到線程池的最大核心數量,後面再有新任務則執行飽和策略或拒絕,拋出異常。