目錄java
在Web開發中,若是要密集處理多個任務時,相對於每次都一個建立線程去執行任務,新建線程來執行任務相對來講是個更好的選擇,體如今如下三點:源碼分析
下面從最經常使用的線程池ThreadPoolExecutor的源碼分析如何實現線程池。ui
Executor是最基礎的執行接口,只提供了一個execute(Runnable command)提交任務方法;ExecutorService接口繼承了Executor,在其上作了一些shutdown()、submit()的擴展,能夠說是真正的線程池接口AbstractExecutorService抽象類實現了ExecutorService接口中的大部分方法;TheadPoolExecutor繼承了AbstractExecutorService,是線程池的具體實現。
this
public class ThreadPoolExecutor extends AbstractExecutorService { // 線程池的控制狀態(用來表示線程池的運行狀態(整形的高3位)和運行的worker數量(低29位)) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 偏移量 private static final int COUNT_BITS = Integer.SIZE - 3; // 最大工做線程數量(2^29 - 1) private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 線程運行狀態,總共有5個狀態,須要3位來表示(因此偏移量的29 = 32 - 3) private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 阻塞隊列,存放提交給線程池的任務 private final BlockingQueue<Runnable> workQueue; // 可重入鎖 private final ReentrantLock mainLock = new ReentrantLock(); // 存放工做線程集合 private final HashSet<Worker> workers = new HashSet<Worker>(); // 終止條件 private final Condition termination = mainLock.newCondition(); // 最大線程池容量 private int largestPoolSize; // 已完成任務數量 private long completedTaskCount; // 線程工廠 private volatile ThreadFactory threadFactory; // 拒絕執行處理器 private volatile RejectedExecutionHandler handler; // 線程等待運行時間 private volatile long keepAliveTime; // 是否運行核心線程超時 private volatile boolean allowCoreThreadTimeOut; // 核心池的大小 private volatile int corePoolSize; // 最大線程池大小 private volatile int maximumPoolSize; // 默認拒絕執行處理器 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); }
線程池自己有兩個很重要的狀態信息:線程池的運行狀態和工做線程數,這兩個狀態信息都包含在變量ctl(int型,32位)中:ctl的高3位表示線程狀態runState,低29位表示工做線程worker的數量workCount。線程狀態信息以下:線程
核心參數含義以下:code
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
execute方法是向線程池提交任務的,此時線程池的狀態爲RUNNING(其餘狀態不接收新提交的任務),主要判斷:對象
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //ctl記錄線程池狀態信息和線程池線程數 int c = ctl.get(); //比較當前線程數是否小於corePoolSize,若是小於則新建一個線程放入線程池中 if (workerCountOf(c) < corePoolSize) { //成功加入則返回 if (addWorker(command, true)) return; //加入失敗,從新獲取ctl c = ctl.get(); } //若是當前線程數大於等於corePoolSize,判斷線程池是否仍在運行,是的話加入阻塞隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次檢查線程池是否仍在運行 if (! isRunning(recheck) && remove(command)) reject(command); /** 線程池在運行可是工做線程數爲0,此時可能阻塞隊列有任務但線程池沒有工做線程池, * 若是配置了參數allowCoreThreadTimeOut(默認是false)爲true可能由於核心線程執行 * 完任務且阻塞隊列也沒有線程等待獲取任務,此時屬於空閒線程,因爲超時會回收核心線程 **/ else if (workerCountOf(recheck) == 0) /** 傳false將會在addWorker方法中判斷線程池的工做線程數量和最大線程數量作比較 * 傳一個空的任務,開啓一個工做線程,但這個工做線程會發現當前的任務是空,而後會去隊列中取任務 * 這樣就避免了線程池的狀態是running,並且隊列中還有任務,但線程池卻不執行隊列中的任務 **/ addWorker(null, false); } /** * 若是執行到這裏,有兩種狀況: * 1. 線程池已經不是RUNNING狀態; * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize而且workQueue已滿。 * 這時,再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲 * maximumPoolSize;若是失敗則拒絕該任務 **/ else if (!addWorker(command, false)) reject(command); }
addWorker方法用與建立工做線程,firstTask表示第一個任務,core爲true那麼線程數受corePoolSize制約,爲false則受maximumPoolSize制約。執行流程:blog
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //運行狀態 int rs = runStateOf(c); /** * 若是rs >= SHUTDOWN,則表示此時再也不接收新任務 * 知足rs >= SHUTDOWN條件後接着判斷如下3個條件,只要有1個不知足,則返回false: * 1. rs == SHUTDOWN,這時表示關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保 * 存的任務 2. firsTask爲空 3. 阻塞隊列不爲空 **/ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //當前工做線程數 int wc = workerCountOf(c); //檢查線程數量是否超出 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 嘗試CAS增長workerCount,若是成功,則跳出第一個for循環 if (compareAndIncrementWorkerCount(c)) break retry; //CAS失敗,從新獲取ctl的值 c = ctl.get(); // Re-read ctl // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行 if (runStateOf(c) != rs) continue retry; } } //CAS增長workCount成功,退出循環進入到這裏 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根據firstTask來建立Worker對象 w = new Worker(firstTask); // 每個Worker對象都會建立一個線程 final Thread t = w.thread; if (t != null) { //上鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING狀態; // 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。 // 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //將工做線程work加入到HashSet對象workers workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //啓動線程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
線程池的工做線程是經過包裝成Worker對象,Worker類自己既實現了Runnable接口,又繼承了同步器AQS,實現了一個簡易的不可重入的互斥鎖,經過同步狀態state控制中斷:繼承
private final class Worker extends AbstractQueuedSynchronizerimplements Runnable{ private static final long serialVersionUID = 6138294804551838833L; //工做線程 final Thread thread; //新建Worker傳入的任務command,可能爲null Runnable firstTask; //執行完的任務數量 volatile long completedTasks; //同步狀態state爲0表明爲鎖定,state爲1表明鎖定,state爲-1表明初始狀態 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //建立線程 this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } }
runWork是工做線程執行任務的方法,執行過程以下:接口
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; //同步狀態state設置爲0,容許中斷 w.unlock(); // allow interrupts //用於標識是否工做線程因爲異常忽然終止,在執行任務拋出異常或線程被中斷兩種狀況爲true boolean completedAbruptly = true; try { //循環取任務執行 while (task != null || (task = getTask()) != null) { //上鎖,表示正在工做線程正在執行任務,不能響應中斷 w.lock(); /** * 確保在線程池狀態在STOP及以上時,纔會被設置中斷標示,不然清除中斷標示,判斷如下兩個條件: * 一、若是線程池狀態>=stop,且當前線程沒有設置中斷狀態,wt.interrupt() * 二、若是一開始判斷線程池狀態<stop,但Thread.interrupted()爲true,即線程已經被中斷,又 * 清除了中斷標示,再次判斷線程池狀態是否>=stop(可能調用了shutdownNow關閉線程池) **/ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
當工做線程數達到corePoolSize,後續提交的任務就會放到阻塞隊列workQueue中,工做線程經過getTask方法從阻塞隊列取出任務,執行如下步驟:
private Runnable getTask() { // timeOut變量的值表示上次從阻塞隊列中取任務時是否超時 boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); /** * 1.rs>SHUTDOWN 因此rs至少等於STOP,這時再也不處理隊列中的任務,無論workQueue是否爲空都返回null * 2.rs = SHUTDOWN 因此rs>=STOP確定不成立,這時還須要處理隊列中的任務除非workQueue爲空 * 若是以上條件知足,則將workerCount減1並返回null。由於若是當前線程池狀態的值是SHUTDOWN * 或以上時,不容許再向阻塞隊列中添加任務。 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); /** * timed表示工做線程是否須要剔除,爲true * allowCoreThreadTimeOut默認爲false,表示核心線程不作超時控制 * wc > corePoolSize 超過核心線程數 * timed爲true下面的if條件經過返回null,從而剔除掉超過corePoolSize數目的線程,使線程數 * 回覆corePoolSize **/ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * 條件1: * wc > maximumPoolSize 檢查是否超出maximumPoolSize,線程池可能重置了maximumPoolSize * timed && timedOut 當前線程須要超時控制且上次取任務超時爲true * 條件2:若是線程數量大於1,或者阻塞隊列是空的 * 兩個條件都爲true把workCount減一,返回null **/ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; //CAS失敗從新循環 continue; } try { /** * 根據timed來判斷,若是爲true,則經過阻塞隊列的poll方法進行超時控制,若是在 * keepAliveTime時間內沒有獲取到任務,則返回null; * 不然經過take方法,若是這時隊列爲空,則take方法會阻塞直到隊列不爲空。 **/ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //若是r==null,說明是超時了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
當getTask返回null,會跳出runWork的while循環,此時工做線程的run方法執行完畢,線程會終止,同時會執行processWorkerExit方法,步驟以下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { //若是是忽然終止,從新調整workCount if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統計完成的任務數 completedTaskCount += w.completedTasks; //從集合中移出Worker對象 workers.remove(w); } finally { mainLock.unlock(); } // 根據線程池狀態進行判斷是否結束線程池 tryTerminate(); int c = ctl.get(); //線程狀態小於STOP,即線程池處於RUNNING或SHUTDOWN狀態 if (runStateLessThan(c, STOP)) { //檢查是否異常終止 if (!completedAbruptly) { //若是allowCoreThreadTimeOut=true,而且等待隊列有任務,至少保留一個worker; //若是allowCoreThreadTimeOut=false,workerCount很多於corePoolSize。 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //忽然終止,添加一個Worker addWorker(null, false); } }
關閉線程池,線程池狀態由RUNNING變爲SHUTDOWN,只處理已有任務再也不接收新提交的任務,中斷空閒線程。
爲何要中斷空閒線程:當線程池狀態爲RUNNING可是阻塞隊列爲空,allowCoreThreadTimeOut爲默認值false(既不支持核心線程超時回收),那麼工做線程必然堵塞在workQueue.take()方法上,而調用了shutdown()方法後線程池狀態變爲SHUTDOWN不接收新提交的任務,那麼阻塞隊列永遠爲空,因此須要經過中斷讓線程由阻塞狀態返回null。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //檢查是否有關閉線程池權限 checkShutdownAccess(); //把線程池運行狀態切換爲SHUTDOWN advanceRunState(SHUTDOWN); //中斷空閒線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
中斷空閒線程。
private void interruptIdleWorkers() { //false代表中斷全部空閒線程 interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // t.isInterrupted()檢查線程是否已經中斷過 // w.tryLock() runWork在執行任務會上鎖,執行完解鎖去阻塞隊列得到任務,若是tryLock成功 //說明沒有執行任務,是空閒線程。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
根據線程池狀態嘗試關閉線程池。這裏解釋一下interruptIdleWorkers(ONLY_ONE):
當到達workerCountOf(c) != 0這個判斷時,說明線程池處於SHUTDOWN狀態,且阻塞隊列已經爲空,這是若判斷成立,那麼還有工做線程等待在線程池上,會中斷一個空閒線程,這個被中斷的空閒線程的Worker返回null又會調用tryTerminate,從而把線程池關閉的消息傳給每一個線程,回收空閒線程。
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 當前線程池的狀態爲如下幾種狀況時,直接返回: * 1. RUNNING,由於還在運行中,不能中止; * 2. TIDYING或TERMINATED,由於線程池中已經沒有正在運行的線程了; * 3. SHUTDOWN而且等待隊列非空,這時要執行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //工做線程數不爲0 if (workerCountOf(c) != 0) { // Eligible to terminate //中斷一個空閒線程(等待在阻塞隊列上獲取任務的線程) //中斷的線程在回收Worker時還會調用tryTerminate方法,從而回收空閒線程 interruptIdleWorkers(ONLY_ONE); return; } //到這裏說明工做線程數workCount爲0,線程池狀態置爲TIDYING final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
關閉線程池,運行狀態修改成 STOP, 中斷全部線程; 並返回未處理的任務
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //更改線程池狀態 advanceRunState(STOP); // 中斷全部工做線程,不管是否空閒 interruptWorkers(); //取出阻塞隊列中沒有被執行的任務 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
不論線程是否空閒,中斷全部線程。
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; /** * getState() >= 0 同步狀態state=-1線程還沒啓動,大於等於0說明線程以及啓動,處於 * 執行任務或空閒狀態。 * (t = thread) != null 線程不爲null * !t.isInterrupted() 檢查線程是否被中斷過。 **/ if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
本文分析了線程池ThreadPoolExecutor的實現,主要從向線程池提交任務和關閉線程池這兩個方法分析的,瞭解了線程池複用線程資源減小線程建立和切換的開銷背後的祕密。