咱們在關閉線程池的時候會使用shutdown()和shutdownNow(),那麼問題來了:java
爲了解決這些疑問咱們須要分析java線程池的原理。ide
日常咱們在建立線程池常用的方式以下:oop
ExecutorService executorService = Executors.newFixedThreadPool(5);
看下newFixedThreadPool源碼, 其實Executors是個工廠類,內部是new了一個ThreadPoolExecuto:源碼分析
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
參數的意義就不介紹了,網上有不少內容,看源碼註釋也能夠明白。ui
線程池中類的繼承關係以下:this
將一個Runnable放到線程池執行有兩種方式,一個是調用ThreadPoolExecutor#submit,一個是調用ThreadPoolExecutor#execute。其實submit是將Runnable封裝成了一個RunnableFuture,而後再調用execute,最終調用的仍是execute,因此咱們這裏就只從ThreadPoolExecutor#execute開始分析。.net
ThreadPoolExecutor中有個重要的屬性是ctl線程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位表示狀態,低29位表示線程池中線程的多少 private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 左移29爲減1,即最終獲得爲高3位爲0,低29位爲1的數字,做爲掩碼,是二進制運算中經常使用的方法 private static final int RUNNING = -1 << COUNT_BITS; // 高三位111 private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位000 private static final int STOP = 1 << COUNT_BITS; // 高三位001 private static final int TIDYING = 2 << COUNT_BITS; // 高三位010 private static final int TERMINATED = 3 << COUNT_BITS; // 高三位011 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } // 保留高3位,即計算線程池狀態 private static int workerCountOf(int c) { return c & CAPACITY; } // 保留低29位, 即計算線程數量 private static int ctlOf(int rs, int wc) { return rs | wc; } // 求ctl
ThreadPoolExecutor中使用32位Integer來表示線程池的狀態和線程的數量,其中高3位表示狀態,低29位表示數量。若是對二進制運行不熟悉能夠參考:二進制運算。從上也能夠看出線程池有五種狀態,咱們關心前3中狀態rest
明白了ctl和線程池的狀態後咱們來具體看下execute的處理邏輯code
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 線程數量小於coresize,那麼就調用addWorker if (addWorker(command, true)) // 這裏知道,返回true就不往下走了 return; c = ctl.get(); } // 不知足上述條件,即線程數量 >= coreSize,或者addWorker返回fasle,那麼走下面的邏輯 if (isRunning(c) && workQueue.offer(command)) { // 能夠看到是往blockingqueue中放task int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是不知足上述條件,即blockingqueue也放不進去,那麼就走下面的邏輯 else if (!addWorker(command, false)) reject(command); }
從上面的代碼我們能夠看到線程池處理線程的基本思路是: 若是線程數量小於coresize那麼就執行task,不然就放到queue中,若是queue也放不下就走下面addWorker,若是也失敗了,那麼就調用reject策略。固然還涉及一些細節,須要進一步分析。
execute中反覆調用的是addWorker
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 計算線程池狀態 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && // 先忽略 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 線程數量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 可見若是超過了運行的最大線程數量則返回false return false; if (compareAndIncrementWorkerCount(c)) // 若是成功,線程數量確定加1 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); // 將task封裝成了Worker final Thread t = w.thread; // 來獲取worker的thread if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 將worker添加到hashset中報存,關閉的時候要使用 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 通過一些檢查, 啓動了work的thread t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); // 若是線程啓動失敗,則將線程數減1 } return workerStarted; }
上面的代碼看起來比較複雜,可是若是咱們忽略具體的細節,從大體思路上看,其實也比較簡單。上面代碼的主要思路就是:除了一些狀態檢查外,首先將線程數量加1,而後將runnable分裝成一個worker,去啓動worker線程,若是啓動失敗則再將線程數量減1。返回false的緣由多是線程數量大於容許的數量。因此addWorker調用成功,則會啓動一個work線程,且線程池中線程數量加1
woker是線程池中真正的線程實體。線程池中的線程不是自定義的Runnable實現的線程,而是woker線程,worker在run方法裏調用了自定義的Runnable的run方法。
Worker繼承了AQS,並實現了runnable接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); // 這個時候回頭看看addWorker中t.start(), 就明白了啓動的實際是一個Woker線程,而不是用戶定義的Runnable } public void run() { runWorker(this); } }
Worker中firstTask存儲了用戶定義的Runnable,thread是以他自身爲參數的Thread對象。getThreadFactory()默認返回是Executors#DefaultThreadFactory,用來新建線程,並定義了線程名稱的前綴等:
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; // } public Thread newThread(Runnable r) { // 調用後新建一個線程 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
Worker的run方法調用了runWorker,並將自身做爲參數傳了進去,下面看看問題的關鍵:runWorker:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 注意這裏的while循環,這裏很關鍵。這裏注意,若是兩個條件都知足了,那麼線程就結束了 w.lock(); // 注意worker繼承了AQS,至關於本身實現了鎖,這個在關閉線程的時候有用 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); // 僅僅是回調了Runnable的run方法 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; // 重點,task執行完後就被置位null w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); // 注意while循環結束後worker線程就結束了 } }
runWorker中有個while循環,while中判斷條件爲(task != null || (task = getTask()) != null)。假設咱們按照正常的邏輯,即task != null,則會調用task.run方法,執行完run方法後而後在finally中task被置爲null;接着又進入while循環判斷,此次task == null,因此不符合第一個判斷條件,則會繼續判斷 task == getTask()) != null。咱們來看下getTask作了什麼。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 當調用shutdown()方法的時候,線程狀態就爲shutdown了; 當調用shutdownow()的時候,線程狀態就爲stop了 decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { // 經過死循環設置狀態 int wc = workerCountOf(c); // 設置容許core線程timeout或者線程數量大於coresize,則容許線程超時 timed = allowCoreThreadTimeOut || wc > corePoolSize; // 若是線程數量 <= 最大線程數 且 沒有超時和容許超時 則跳出死循環 if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { // 這裏是關鍵,若是容許超時則調用poll從queue中取出task,不然就調用take可阻塞的獲取task Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 獲取到task則返回,而後runWorker的while循環就繼續執行,並調用task的run方法 return r; timedOut = true; // 不然設置爲timeOut,繼續循環,可是下次循環會走到if (compareAndDecrementWorkerCount(c)) 處,並返回null。 } catch (InterruptedException retry) { timedOut = false; } } }
忽略掉具體細節,getTask的總體思路是: 從blockqueue中拿去task,若是queue中沒有task則分兩種狀況:
這個時候回頭再看下runWorker,若是task != null,那麼就會執行task的run方法,執行完後task就會爲被置爲null,再次進入while循環執行getTask阻塞在這裏了。經過這種方式保留住了線程。若是while循環結束了,那麼worker線程也就結束了。
分析到這裏咱們再來看下addWoker。addWorker能夠將第一個參數設置爲null。例如ThreadPoolExecutor#prestartAllCoreThreads:
public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) // addWorker第一個參數是null ++n; return n; }
通過前面的分析,咱們知道addWoker用來啓動一個worker線程,worker線程調用runWorker來執行,而runWorker中有個while循環,判斷條件是(task != null || (task = getTask()) != null)。由於咱們傳入的task爲null,因此就會判斷task = getTask()) != null,而getTask就是去blockqueue中拿去數據,若是沒有任務就會阻塞住。這個時候就是一個阻塞的線程在等待task的到來了。因此傳入參數爲null表示建立一個空的線程,什麼都不執行。
已經知道了線程池內部的大概工做狀況,咱們再來看下若是全部core線程都建立好了且處於空置狀態,這個時候新放入一個線程的執行流程。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // core線程都建立好了,因此判斷條件不知足 if (addWorker(command, true)) return; c = ctl.get(); } // 會走到這裏,會經過offer往blockingqueue裏放置一個task。這個時候阻塞的core線程會經過blockingqueue的take拿到task執行,相似一個生產者消費者的狀況 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若是blockingqueue添加失敗,則建立線程直到maxsize else if (!addWorker(command, false)) reject(command); }
可見,線程和execute經過blockingqueue來通訊,而不是其餘方式,execute往blockingqueue中放置task,線程經過take來獲取。總體線程池的邏輯以下圖
這個時候咱們終於能夠來看看shutdown和shutdownNow了
看下shutdown
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); // 重點,將線程狀態置爲shutdown,這樣getTask等workqueue爲空後就返回null了 interruptIdleWorkers(); // 重點 onShutdown(); // 什麼都沒作 } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 線程沒有中斷 且 獲取到worker的鎖 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); // 調用interrup,中斷線程 } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
總結下就是: shutdown會把它被調用前放到線程池中的task所有執行完。
再來看下shutdownNow
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 重點,將線程狀態置爲stop interruptWorkers(); // 重點 tasks = drainQueue(); // 重點 } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 沒有去獲取woker的鎖 try { t.interrupt(); } catch (SecurityException ignore) { } } } private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; List<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); // 將blockingqueue中的task清空 if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
從上面的代碼能夠看出:
總結就是shutdownNow比較粗暴,調用他後,他會將全部以前提交的任務都interrupt,且將blockingqueue中的task清空
另外就是不管是shutdown仍是shutdownNow都是調用Thread的interrupt()方法。若是task不響應中斷或者忽略中斷標記,那麼這個線程就不會被終止。例如在run中執行如下邏輯
poolExecutor.execute(new Runnable() { @Override public void run() { while (true) { System.out.println("b"); try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.printf("不處理"); // 忽略中斷 } } } });
運行結果是,即便調用了shutdownNow也終止不了線程運行
b 0 不處理b b b b b ....