在web開發中,服務器須要接受並處理請求,因此會爲一個請求來分配一個線程來進行處理。若是每次請求都新建立一個線程的話實現起來很是簡便,可是存在一個問題:java
若是併發的請求數量很是多,但每一個線程執行的時間很短,這樣就會頻繁的建立和銷燬線程,如此一來會大大下降系統的效率。可能出現服務器在爲每一個請求建立新線程和銷燬線程上花費的時間和消耗的系統資源要比處理實際的用戶請求的時間和資源更多。web
那麼有沒有一種辦法使執行完一個任務,並不被銷燬,而是能夠繼續執行其餘的任務呢?安全
這就是線程池的目的了。線程池爲線程生命週期的開銷和資源不足問題提供瞭解決方案。經過對多個任務重用線程,線程建立的開銷被分攤到了多個任務上。服務器
何時使用線程池?多線程
使用線程池的好處併發
引用自 http://ifeve.com/java-threadpool/ 的說明:框架
Java中的線程池是用ThreadPoolExecutor類來實現的. 本文就結合JDK 1.8對該類的源碼來分析一下這個類內部對於線程的建立, 管理以及後臺任務的調度等方面的執行原理。異步
先看一下線程池的類圖:ide
Executor框架是一個根據一組執行策略調用,調度,執行和控制的異步任務的框架,目的是提供一種將」任務提交」與」任務如何運行」分離開來的機制。oop
J.U.C中有三個Executor接口:
public interface Executor { void execute(Runnable command); }
Executor接口只有一個execute方法,用來替代一般建立或啓動線程的方法。例如,使用Thread來建立並啓動線程的代碼以下:
Thread t = new Thread(); t.start();
使用Executor來啓動線程執行任務的代碼以下:
Thread t = new Thread(); executor.execute(t);
對於不一樣的Executor實現,execute()方法多是建立一個新線程並當即啓動,也有多是使用已有的工做線程來運行傳入的任務,也多是根據設置線程池的容量或者阻塞隊列的容量來決定是否要將傳入的線程放入阻塞隊列中或者拒絕接收傳入的線程。
ExecutorService接口繼承自Executor接口,提供了管理終止的方法,以及可爲跟蹤一個或多個異步任務執行情況而生成 Future 的方法。增長了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。若是須要支持即時關閉,也就是shutDownNow()方法,則任務須要正確處理中斷。
ScheduledExecutorService擴展ExecutorService接口並增長了schedule方法。調用schedule方法能夠在指定的延時後執行一個Runnable或者Callable任務。ScheduledExecutorService接口還定義了按照指定時間間隔按期執行任務的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。
ThreadPoolExecutor繼承自AbstractExecutorService,也是實現了ExecutorService接口。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
ctl
是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段, 它包含兩部分的信息: 線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount),這裏能夠看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。
下面再介紹下線程池的運行狀態. 線程池一共有五種狀態, 分別是:
下圖爲線程池的狀態轉換過程:
這裏還有幾個對ctl進行計算的方法:
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
構造方法中的字段含義以下:
corePoolSize:核心線程數量,當有新任務在execute()方法提交時,會執行如下判斷:
因此,任務提交時,判斷的順序爲 corePoolSize –> workQueue –> maximumPoolSize。
execute()方法用來提交任務,代碼以下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * clt記錄着runState和workerCount */ int c = ctl.get(); /* * workerCountOf方法取出低29位的值,表示當前活動的線程數; * 若是當前活動線程數小於corePoolSize,則新建一個線程放入線程池中; * 並把任務添加到該線程中。 */ if (workerCountOf(c) < corePoolSize) { /* * addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷; * 若是爲true,根據corePoolSize來判斷; * 若是爲false,則根據maximumPoolSize來判斷 */ if (addWorker(command, true)) return; /* * 若是添加失敗,則從新獲取ctl值 */ c = ctl.get(); } /* * 若是當前線程池是運行狀態而且任務添加到隊列成功 */ if (isRunning(c) && workQueue.offer(command)) { // 從新獲取ctl值 int recheck = ctl.get(); // 再次判斷線程池的運行狀態,若是不是運行狀態,因爲以前已經把command添加到workQueue中了, // 這時須要移除該command // 執行事後經過handler使用拒絕策略對該任務進行處理,整個方法返回 if (! isRunning(recheck) && remove(command)) reject(command); /* * 獲取線程池中的有效線程數,若是數量是0,則執行addWorker方法 * 這裏傳入的參數表示: * 1. 第一個參數爲null,表示在線程池中建立一個線程,但不去啓動; * 2. 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize,添加線程時根據maximumPoolSize來判斷; * 若是判斷workerCount大於0,則直接返回,在workQueue中新增的command會在未來的某個時刻被執行。 */ else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* * 若是執行到這裏,有兩種狀況: * 1. 線程池已經不是RUNNING狀態; * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize而且workQueue已滿。 * 這時,再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize; * 若是失敗則拒絕該任務 */ else if (!addWorker(command, false)) reject(command); }
簡單來講,在執行execute()方法時若是狀態一直是RUNNING時,的執行過程以下:
workerCount < corePoolSize
,則建立並啓動一個線程來執行新提交的任務;workerCount >= corePoolSize
,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;workerCount >= corePoolSize && workerCount < maximumPoolSize
,且線程池內的阻塞隊列已滿,則建立並啓動一個線程來執行新提交的任務;workerCount >= maximumPoolSize
,而且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。這裏要注意一下addWorker(null, false);
,也就是建立一個線程,但並無傳入任務,由於任務已經被添加到workQueue中了,因此worker在執行的時候,會直接從workQueue中獲取任務。因此,在workerCountOf(recheck) == 0
時執行addWorker(null, false);
也是爲了保證線程池在RUNNING狀態下必需要有一個線程來執行任務。
execute方法執行流程以下:
addWorker方法的主要工做是在線程池中建立一個新的線程並執行,firstTask參數 用於指定新增的線程執行的第一個任務,core參數爲true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize,false表示新增線程前須要判斷當前活動線程數是否少於maximumPoolSize,代碼以下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 獲取運行狀態 int rs = runStateOf(c); /* * 這個if判斷 * 若是rs >= SHUTDOWN,則表示此時再也不接收新任務; * 接着判斷如下3個條件,只要有1個不知足,則返回false: * 1. rs == SHUTDOWN,這時表示關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務 * 2. firsTask爲空 * 3. 阻塞隊列不爲空 * * 首先考慮rs == SHUTDOWN的狀況 * 這種狀況下不會接受新提交的任務,因此在firstTask不爲空的時候會返回false; * 而後,若是firstTask爲空,而且workQueue也爲空,則返回false, * 由於隊列中已經沒有任務了,不須要再添加線程了 */ // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 獲取線程數 int wc = workerCountOf(c); // 若是wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false; // 這裏的core是addWorker方法的第二個參數,若是爲true表示根據corePoolSize來比較, // 若是爲false則根據maximumPoolSize來比較。 // if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 嘗試增長workerCount,若是成功,則跳出第一個for循環 if (compareAndIncrementWorkerCount(c)) break retry; // 若是增長workerCount失敗,則從新獲取ctl的值 c = ctl.get(); // Re-read ctl // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根據firstTask來建立Worker對象 w = new Worker(firstTask); // 每個Worker對象都會建立一個線程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING狀態; // 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。 // 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers是一個HashSet workers.add(w); int s = workers.size(); // largestPoolSize記錄着線程池中出現過的最大線程數量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 啓動線程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
注意一下這裏的t.start()
這個語句,啓動時會調用Worker類中的run方法,Worker自己實現了Runnable接口,因此一個Worker類型的對象也是一個線程。
線程池中的每個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象,看一下Worker的定義:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
Worker類繼承了AQS,並實現了Runnable接口,注意其中的firstTask和thread屬性:firstTask用它來保存傳入的任務;thread是在調用構造方法時經過ThreadFactory來建立的線程,是用來處理任務的線程。
在調用構造方法時,須要把任務傳入,這裏經過getThreadFactory().newThread(this);
來新建一個線程,newThread方法傳入的參數是this,由於Worker自己繼承了Runnable接口,也就是一個線程,因此一個Worker對象在啓動的時候會調用Worker類中的run方法。
Worker繼承了AQS,使用AQS來實現獨佔鎖的功能。爲何不使用ReentrantLock來實現呢?能夠看到tryAcquire方法,它是不容許重入的,而ReentrantLock是容許重入的:
因此,Worker繼承自AQS,用於判斷線程是否空閒以及是否能夠被中斷。
此外,在構造方法中執行了setState(-1);
,把state變量設置爲-1,爲何這麼作呢?是由於AQS中默認的state是0,若是剛建立了一個Worker對象,尚未執行任務時,這時就不該該被中斷,看一下tryAquire方法:
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
tryAcquire方法是根據state是不是0來判斷的,因此,setState(-1);
將state設置爲-1是爲了禁止在執行任務前對線程進行中斷。
正由於如此,在runWorker方法中會先調用Worker對象的unlock方法將state設置爲0.
在Worker類中的run方法調用了runWorker方法來執行任務,runWorker方法的代碼以下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 獲取第一個任務 Runnable task = w.firstTask; w.firstTask = null; // 容許中斷 w.unlock(); // allow interrupts // 是否由於異常退出循環 boolean completedAbruptly = true; try { // 若是task爲空,則經過getTask來獲取任務 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
這裏說明一下第一個if判斷,目的是:
這裏要考慮在執行該if語句期間可能也執行了shutdownNow方法,shutdownNow方法會把狀態設置爲STOP,回顧一下STOP狀態:
不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態。
STOP狀態要中斷線程池中的全部線程,而這裏使用Thread.interrupted()
來判斷是否中斷是爲了確保在RUNNING或者SHUTDOWN狀態時線程是非中斷狀態的,由於Thread.interrupted()方法會復位中斷的狀態。
總結一下runWorker方法的執行過程:
task.run()
執行任務;這裏的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給子類來實現。
completedAbruptly變量來表示在執行任務過程當中是否出現了異常,在processWorkerExit方法中會對該變量的值進行判斷。
getTask方法用來從阻塞隊列中取任務,代碼以下:
private Runnable getTask() { // timeOut變量的值表示上次從阻塞隊列中取任務時是否超時 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /* * 若是線程池狀態rs >= SHUTDOWN,也就是非RUNNING狀態,再進行如下判斷: * 1. rs >= STOP,線程池是否正在stop; * 2. 阻塞隊列是否爲空。 * 若是以上條件知足,則將workerCount減1並返回null。 * 由於若是當前線程池狀態的值是SHUTDOWN或以上時,不容許再向阻塞隊列中添加任務。 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // timed變量用於判斷是否須要進行超時控制。 // allowCoreThreadTimeOut默認是false,也就是核心線程不容許進行超時; // wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量; // 對於超過核心線程數量的這些線程,須要進行超時控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * wc > maximumPoolSize的狀況是由於可能在此方法執行階段同時執行了setMaximumPoolSize方法; * timed && timedOut 若是爲true,表示當前操做須要進行超時控制,而且上次從阻塞隊列中獲取任務發生了超時 * 接下來判斷,若是有效線程數量大於1,或者阻塞隊列是空的,那麼嘗試將workerCount減1; * 若是減1失敗,則返回重試。 * 若是wc == 1時,也就說明當前線程是線程池中惟一的一個線程了。 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* * 根據timed來判斷,若是爲true,則經過阻塞隊列的poll方法進行超時控制,若是在keepAliveTime時間內沒有獲取到任務,則返回null; * 不然經過take方法,若是這時隊列爲空,則take方法會阻塞直到隊列不爲空。 * */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 若是 r == null,說明已經超時,timedOut設置爲true timedOut = true; } catch (InterruptedException retry) { // 若是獲取任務時當前線程發生了中斷,則設置timedOut爲false並返回循環重試 timedOut = false; } } }
這裏重要的地方是第二個if判斷,目的是控制線程池的有效線程數量。由上文中的分析能夠知道,在執行execute方法時,若是當前線程池的線程數量超過了corePoolSize且小於maximumPoolSize,而且workQueue已滿時,則能夠增長工做線程,但這時若是超時沒有獲取到任務,也就是timedOut爲true的狀況,說明workQueue已經爲空了,也就說明了當前線程池中不須要那麼多線程來執行任務了,能夠把多於corePoolSize數量的線程銷燬掉,保持線程數量在corePoolSize便可。
何時會銷燬?固然是runWorker方法執行完以後,也就是Worker中的run方法執行完,由JVM自動回收。
getTask方法返回null時,在runWorker方法中會跳出while循環,而後會執行processWorkerExit方法。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 若是completedAbruptly值爲true,則說明線程執行時出現了異常,須要將workerCount減1; // 若是線程執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操做,這裏就沒必要再減了。 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統計完成的任務數 completedTaskCount += w.completedTasks; // 從workers中移除,也就表示着從線程池中移除了一個工做線程 workers.remove(w); } finally { mainLock.unlock(); } // 根據線程池狀態進行判斷是否結束線程池 tryTerminate(); int c = ctl.get(); /* * 當線程池是RUNNING或SHUTDOWN狀態時,若是worker是異常結束,那麼會直接addWorker; * 若是allowCoreThreadTimeOut=true,而且等待隊列有任務,至少保留一個worker; * 若是allowCoreThreadTimeOut=false,workerCount很多於corePoolSize。 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
至此,processWorkerExit執行完以後,工做線程被銷燬,以上就是整個工做線程的生命週期,從execute方法開始,Worker使用ThreadFactory建立新的工做線程,runWorker經過getTask獲取任務,而後執行任務,若是getTask返回null,進入processWorkerExit方法,整個線程結束,如圖所示:
tryTerminate方法根據線程池狀態進行判斷是否結束線程池,代碼以下:
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 當前線程池的狀態爲如下幾種狀況時,直接返回: * 1. RUNNING,由於還在運行中,不能中止; * 2. TIDYING或TERMINATED,由於線程池中已經沒有正在運行的線程了; * 3. SHUTDOWN而且等待隊列非空,這時要執行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 若是線程數量不爲0,則中斷一個空閒的工做線程,並返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 這裏嘗試設置狀態爲TIDYING,若是設置成功,則調用terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // terminated方法默認什麼都不作,留給子類實現 terminated(); } finally { // 設置狀態爲TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
interruptIdleWorkers(ONLY_ONE);
的做用是由於在getTask方法中執行workQueue.take()
時,若是不執行中斷會一直阻塞。在下面介紹的shutdown方法中,會中斷全部空閒的工做線程,若是在執行shutdown時工做線程沒有空閒,而後又去調用了getTask方法,這時若是workQueue中沒有任務了,調用workQueue.take()
時就會一直阻塞。因此每次在工做線程結束時調用tryTerminate方法來嘗試中斷一個空閒工做線程,避免在隊列爲空時取任務一直阻塞的狀況。
shutdown方法要將線程池切換到SHUTDOWN狀態,並調用interruptIdleWorkers方法請求中斷全部空閒的worker,最後調用tryTerminate嘗試結束線程池。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全策略判斷 checkShutdownAccess(); // 切換狀態爲SHUTDOWN advanceRunState(SHUTDOWN); // 中斷空閒線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 嘗試結束線程池 tryTerminate(); }
這裏思考一個問題:在runWorker方法中,執行任務時對Worker對象w進行了lock操做,爲何要在執行任務的時候對每一個工做線程都加鎖呢?
下面仔細分析一下:
workQueue.take()
進行阻塞;workQueue.take()
後會一直阻塞而不會被銷燬,由於在SHUTDOWN狀態下不容許再有新的任務添加到workQueue中,這樣一來線程池永遠都關閉不了了;workQueue.take()
時,若是發現當前線程在執行以前或者執行期間是中斷狀態,則會拋出InterruptedException,解除阻塞的狀態;下面就來分析一下interruptIdleWorkers方法。
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers遍歷workers中全部的工做線程,若線程沒有被中斷tryLock成功,就中斷該線程。
爲何須要持有mainLock?由於workers是HashSet類型的,不能保證線程安全。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中斷全部工做線程,不管是否空閒 interruptWorkers(); // 取出隊列中沒有被執行的任務 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdownNow方法與shutdown方法相似,不一樣的地方在於:
shutdownNow方法執行完以後調用tryTerminate方法,該方法在上文已經分析過了,目的就是使線程池的狀態設置爲TERMINATED。
經過線程池提供的參數進行監控。線程池裏有一些屬性在監控線程池的時候可使用
經過這些方法,能夠對線程池進行監控,在ThreadPoolExecutor類中提供了幾個空方法,如beforeExecute方法,afterExecute方法和terminated方法,能夠擴展這些方法在執行前或執行後增長一些新的操做,例如統計線程池的執行任務的時間等,能夠繼承自ThreadPoolExecutor來進行擴展。
本文比較詳細的分析了線程池的工做流程,整體來講有以下幾個內容:
在向線程池提交任務時,除了execute方法,還有一個submit方法,submit方法會返回一個Future對象用於獲取返回值,有關Future和Callable請自行了解一下相關的文章,這裏就不介紹了。