調用方經過調用api將任務,裝進queue裏,而後會有一個機制監事queue裏有沒有task,若是有task,就分配給某個worker去執行。workers表明線程池的話.worker就是某條線程了。java
Executor框架最核心的類是ThreadPoolExecutor,他是線程池的實現類,主要由下列7個組件構成。web
package java.util.concurrent; public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
int corePoolSize, // 線程池可以使用線程數的最小值api
int maximumPoolSize, // 線程池容量的最大值tomcat
maximumPoolSize:是一個靜態變量,在變量初始化的時候,有構造函數指定.多線程
long keepAliveTime, // 當線程池中的線程數大於corePoolSize時,keepAliveTime爲多餘的空閒線程等待新任務的最長時間,超過這個時間後多餘的線程將被終止。這裏把keepAliveTime設置爲0L,意味着多餘的空閒線程會被當即終止。併發
TimeUnit unit, // 線程的阻塞時間單位,它的執行方法是TimeUnit.unit.Sleep(keepAliveTime);框架
內部調用了Thread.sleep()方法。可是它和Thread.sleep()方法的區別是,Thread.Sleep只能設置毫秒數,而TimeUnit.unit.Sleep()中的unit能夠換成時間單位,好比DAYS、HOURS、MINUTES,SECONDS、MILLISECONDS和NANOSECONDS。jvm
TimeUnit.MINUTES.sleep(4); // sleeping for 4 minutes
BlockingQueue<Runnable> workQueue, // 阻塞隊列,裏面是Runnable類型,線程的任務
ThreadFactory threadFactory, // 建立線程,併爲線程指定queue裏面的runnable,線程池的構造方法,支持自定義threadFactory傳入,咱們能夠本身編寫newThread()方法,來實現自定義的線程建立邏輯。函數
public interface ThreadFactory { Thread newThread(Runnable r); }
RejectedExecutionHandler handler // 當ThreadPoolExecutor已經關閉或ThreadPoolExecutor已經飽和時(達到了最大線程池大小且工做隊列已滿),execute()方法將要調用的Handler。oop
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
而且這些成員變量,都是volatile修飾的
private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize;
largestPoolSize: 是一個動態變量,是記錄線程曾經達到的最高值,也就是 largestPoolSize<= maximumPoolSize.
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } }
completedtaskcount:
返回已完成執行的近似任務總數。由於在計算期間任務和線程的狀態可能動態改變,因此返回值只是一個近似值,可是該值在整個連續調用過程當中不會減小。
當一個線程在workers容器中,準備remove時,線程會將本身的completedtaskcount賦值給線程池的completedtaskcount。
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } }
TaskCount 線程池執行的總任務數,包括已經執行完的任務數和任務隊列中目前還須要執行的任務數
public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; if (w.isLocked()) ++n; } return n + workQueue.size(); } finally { mainLock.unlock(); } }
getActiveCount();Thread.activeCount() 獲得是存活的線程數 返回值是int類型
public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } }
SingleThreadExecutor是使用單個worker線程的Executor。下面是SingleThreadExecutor的源代碼實現。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
SingleThreadExecutor的corePoolSize和maximumPoolSize被設置爲1。其餘參數與FixedThreadPool相同。SingleThreadExecutor使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列(隊列的容量爲Integer.MAX_VALUE)。SingleThreadExecutor使用無界隊列做爲工做隊列對線程池帶來的影響與FixedThreadPool相同,這裏就不贅述了。
package java.util.concurrent; public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } }
FixedThreadPool的corePoolSize和maximumPoolSize都被設置爲建立FixedThreadPool時指定的參數nThreads。
當線程池中的線程數大於corePoolSize時,keepAliveTime爲多餘的空閒線程等待新任務的最長時間,超過這個時間後多餘的線程將被終止。這裏把keepAliveTime設置爲0L,意味着多餘的空閒線程會被當即終止。
FixedThreadPool的execute()方法的運行示意圖以下所示。
對上圖的說明以下。
FixedThreadPool使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列(隊列的容量爲Integer.MAX_VALUE)。使用無界隊列做爲工做隊列會對線程池帶來以下影響。
CacheThreadPool是一個會根據須要建立新線程的線程池。下面是建立CacheThreadPool的源代碼。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
CacheThreadPool的corePoolSize被設置爲0,即corePool爲空;maximumPoolSize被設置爲Integer.MAX_VALUE,即maximumPool是無界的。這裏把keepAliveTime設置爲60L,意味着CacheThreadPool中的空閒線程等待新任務的最長時間爲60秒,空閒線程超過60秒後將會被終止。
FixedThreadPool和SingleThreadExecutor使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列。CacheThreadPool使用沒有容量的SynchronousQueue做爲線程池的工做隊列,但CacheThreadPool的maximumPool是無界的。這意味着,若是主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CacheThreadPool會不斷建立新線程。極端狀況下,CacheThreadPool會由於建立過多線程而耗盡CPU和內存資源。
對上圖的說明以下。
前面提到過,SynchronousQueue是一個沒有容量的阻塞隊列。每一個插入操做必須等待另外一個線程的對應移除操做,反之亦然。CachedThreadPool使用SynchronousQueue,把主線程提交的任務傳遞給空閒線程執行。CachedThreadPool中任務傳遞的示意圖以下所示。
執行定時任務的線程池
這四種方式,都實現了RejectedExecutionHandler接口
會拋出異常,致使當前線程退出
當咱們建立線程池時,不指定rejectedExecutionHandler時,就會默認使用AbortPolicy,當咱們經過executor.execute(runnable)任務時,可能會發生異常,並將異常直接返回給了調用者。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
當線程池的存活線程數,達到了最大值,此時又有新的請求過來,線程池會調用rejectedExecutionHandler這個接口的實現類的rejectedExecution的方法,此時該實現類正好是CallerRunsPolicy,它會讓新請求,在本身的線程上執行run方法,若是run方法消耗時間長,它會阻塞web容器的請求,影響web容器處理其餘請求的性能。
當有外部請求訪問web服務端時,tomcat會分配一條線程(tomcat默認有150個線程,能夠配置最大的爲1500個線程來接收處理請求,且這些線程之間具備隔離性不會互相影響對方)來處理這個請求,當這個請求要用到線程池,且咱們的線程池是基於CallerRunsPolicy來建立的,那麼CallerRunsPolicy會,使用當前請求的線程,來執行run方法。而當這個run方法執行時間過長時,tomcat的請求就會被佔用不放,致使沒法拿出空閒的線程去處理其餘請求,就會影響到服務端的性能。
應用場景:當咱們但願線程池滿了以後,進行阻塞,就使用CallerRunsPolicy,阻塞的是調用方的,不會往queue裏聽任務了。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
看上面的rejectedExecution方法體,頗有意思,它執行線程的方式,是r.run()而不是start()方法,這很回味無窮,緣由有兩個
咱們在main方法中,準備啓動一個線程時,若是在代碼中咱們使用thread.star()方法,jvm在執行到這行時,實際上會建立一個新的線程,來執行線程對象中的run方法,此時在執行run方法的線程,與執行main方法的線程,是兩條線程,沒有關聯。而上面調用了runnable接口實例的run方法,jvm在執行時,根本不會建立新線程去執行,而是就在當前的請求(線程)裏之心run方法,此時的run方法,根本不須要開闢或分配新線程來運行,而是當作一個普通方法來執行了。因此此時run方法卡住了,他就會卡住當前的請求,就會卡住web容器的請求。影響web容器處理其餘請求的性能。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
不作任何處理
Workers容器
0<active<coresize
當一個task準備分配給workers容器,但願調用一個線程去執行它時,若是此時容器中存活的線程數小於coresize指定線程數時,會一次性建立一條新線程來執行任務,並且新線程也會駐留在內存中。而當線程執行完任務,並不會收回,而是變成等待狀態了。
問題:何時出現activesize會超過coresize?
當coreSize向maxsize變遷的時候,不是由workers決定的,而是由queue決定的。queue裏面的task數量達到最大值的時候,coreSize就會向maxsize變遷了。咱們在建立線程池的時候。線程池的構造方法會有一個BlockingQueue<Runnable> workQueue,而後咱們初始化線程池時會指定這個queue的size,那麼調用者一邊往queue裏裝task,task也會一邊分配給workers去執行。只有當queue裏面的任務數,size達到了設置的最大size時,wokers纔會去建立更多的線程,來處理任務,建立新線程的數量,不能超過maxsize。
core<active<maxsize
條件:任務queue滿了,會新建立線程去處理任務
active == maxsize
跟rejectHandlerPolicy有關係,配置了CallerRunsPolicy就會阻塞請求方,拒絕接受任務;配置了abortPolicy就會返回異常,意思是線程數已經創夠了,不能繼續建立了;配置了discardOldPolicy就會刪除最老任務,配置了discardPolicy就什麼都不作。
本文章參考了:https://blog.csdn.net/en_joker/article/details/84973420 《併發:ThreadPoolExecutor詳解》