爲何要使用線程池? java
合理利用線程池可以帶來三個好處。第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。第三:提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。可是要作到合理的利用線程池,必須對其原理了如指掌。——摘自http://www.infoq.com/cn/articles/java-threadPool。node
能夠經過線程池的如下屬性監控線程池的當前狀態:程序員
getTaskCount():線程池已經執行的和未執行的任務總數,由於統計的過程當中可能會發生變化,該值是個近似值;算法
getCompletedTaskCount():已完成的任務數量,是個近似值,該值小於等於TaskCount;數組
getLargestPoolSize():線程池曾經的最大線程數量,能夠經過該值判斷線程池是否滿過。如該數值等於線程池的最大大小,則表示線程池曾經滿過;安全
getPoolSize():線程池當前的線程數量;多線程
getActiveCount():線程池中活動的線程數(正在執行任務),是個近似值。併發
還能夠經過重寫線程池提供的hook方法(beforeExecute、afterExecute和terminated)進行監控,例如監控任務的平均執行時間、最大執行時間和最小執行時間等。異步
程序員能夠經過重寫鉤子 hook 方法(如beforeExecute)實現ThreadPoolExecutor的擴展。ide
擴展現例:添加了簡單的暫停/恢復功能的子類
1 class PausableThreadPoolExecutor extends ThreadPoolExecutor { 2 private boolean isPaused; //標誌是否被暫停 3 private ReentrantLock pauseLock = new ReentrantLock(); //訪問isPaused時須要加鎖,保證線程安全 4 private Condition unpaused = pauseLock.newCondition(); 5 6 public PausableThreadPoolExecutor(...) { super(...); } 7 8 //beforeExecute爲ThreadPoolExecutor提供的hood方法 9 protected void beforeExecute(Thread t, Runnable r) { 10 super.beforeExecute(t, r); 11 pauseLock.lock(); 12 try { 13 while (isPaused) 14 unpaused.await(); 15 } catch(InterruptedException ie) { 16 t.interrupt(); 17 } finally { 18 pauseLock.unlock(); 19 } 20 } 21 //暫停 22 public void pause() { 23 pauseLock.lock(); 24 try { 25 isPaused = true; 26 } finally { 27 pauseLock.unlock(); 28 } 29 } 30 //取消暫停 31 public void resume() { 32 pauseLock.lock(); 33 try { 34 isPaused = false; 35 unpaused.signalAll(); 36 } finally { 37 pauseLock.unlock(); 38 } 39 } 40 }
1 //ctl是控制線程池狀態的一個變量,包含有效的線程數(workerCount)和線程池的運行狀態(runState)兩部分信息。高3位表示runState,低29位表示workerCount。 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 3 private static final int COUNT_BITS = Integer.SIZE - 3; //表示workerCount的位數,29位。 4 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //線程數的上限,(2^29)-1,大約5億 5 6 // runState is stored in the high-order bits 7 private static final int RUNNING = -1 << COUNT_BITS; //能接收新任務和處理隊列中的任務 8 private static final int SHUTDOWN = 0 << COUNT_BITS; //不能接收新任務,但能夠處理隊列中的任務 9 private static final int STOP = 1 << COUNT_BITS; //不能接收新任務,不能處理隊列中的任務,中斷正在執行的任務 10 private static final int TIDYING = 2 << COUNT_BITS; //全部的線程都被終止,workerCount爲0時會進入該狀態. 11 private static final int TERMINATED = 3 << COUNT_BITS; //terminated()方法完成後將進入該狀態。
以上ThreadPoolExecutor的成員變量表示線程池的狀態,狀態信息存儲在ctl變量中,ctl包含有效線程數(workerCount)和線程池運行狀態(runState)兩部分信息,ctl的高3位表示runState,低29位表示workerCount。ctl初始值爲RUNNING狀態且線程數爲0。
線程池運行狀態的轉換以下:
1)線程池在RUNNING狀態下調用shutdown()方法會進入到SHUTDOWN狀態,(finalize()方法也會調用shutdownNow())。
2)在RUNNING和SHUTDOWN狀態下調用 shutdownNow() 方法會進入到STOP狀態。
3)在SHUTDOWN狀態下,當阻塞隊列爲空且線程數爲0時進入TIDYING狀態;在STOP狀態下,當線程數爲0時進入TIDYING狀態。
4)在TIDYING狀態,調用terminated()方法完成後進入TERMINATED狀態。
1 //阻塞隊列 2 private final BlockingQueue<Runnable> workQueue; 3 //可重入鎖。訪問woker線程和相關記錄信息時須要獲取該鎖 4 private final ReentrantLock mainLock = new ReentrantLock(); 5 //包含所有worker線程集合,Accessed only under mainLock,HashSet是非線程安全的. 6 private final HashSet<Worker> workers = new HashSet<Worker>(); 7 private final Condition termination = mainLock.newCondition(); 8 //記錄最大的線程數量,Accessed only under mainLock. 9 private int largestPoolSize; 10 //完成任務的數量,Accessed only under mainLock. 11 private long completedTaskCount; 12 13 14 //如下全部程序員能夠控制的參數都被聲明爲volatile變量,保證可見性。 15 16 //建立線程的工廠 17 private volatile ThreadFactory threadFactory; 18 //線程池飽和或關閉時的處理策略(提供了四種飽和策略) 19 private volatile RejectedExecutionHandler handler; 20 //超出corePoolSize數量的空閒線程存活時間(allowCoreThreadTimeOut=true時有效) 21 private volatile long keepAliveTime; 22 //allowCoreThreadTimeOut=false,線程不會由於空閒時間超過keepAliveTime而被中止 23 private volatile boolean allowCoreThreadTimeOut; 24 //核心線程數 25 private volatile int corePoolSize; 26 //最大線程數,此變量的最大上限爲CAPACITY 27 private volatile int maximumPoolSize;
1、線程池核心線程數和最大線程數
ThreadPoolExecutor 將根據 corePoolSize (核心線程數)和 maximumPoolSize(最大線程數)設置的邊界自動調整線程池大小。當新任務在方法 execute(java.lang.Runnable) 中提交時,若是運行的線程少於 corePoolSize,則建立新線程來處理請求,即便其餘輔助線程是空閒的。若是運行的線程多於 corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才建立新線程。若是設置的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的線程池。若是將 maximumPoolSize 設置爲基本的無界值(如 Integer.MAX_VALUE),則容許池適應任意數量的併發任務。在大多數狀況下,核心和最大池大小僅基於構造函數來設置,不過也可使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。
2、任務隊列
workQueue是一個阻塞隊列,用來存儲執行的任務。全部的BlockingQueue均可用於workQueue。
若是有效的線程數小於 corePoolSize,則線程池首選添加新線程,而不進行排隊。
若是有效的線程數大於等於 corePoolSize,則線程池首選將任務加入隊列,而不添加新的線程。
若是隊列已滿,則建立新的線程,當線程數超出 maximumPoolSize 時,任務將被拒絕。
經常使用的三種阻塞隊列的實現:
1)直接提交。SynchronousQueue是一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態。它將任務直接提交給線程而不存儲任務。直接提交一般要求不限制 maximumPoolSizes 以免拒絕新提交的任務。Executors.newCachedThreadPool使用了這個隊列。
2)無界隊列。LinkedBlockingQueue是一個基於鏈表結構的阻塞隊列,默認的大小是Integer.MAX_VALUE。建立的線程就不會超過 corePoolSize,會使maximumPoolSize 的值無效。
3)有界隊列。ArrayBlockingQueue是一個基於數組結構的有界阻塞隊列。有助於防止資源耗盡,可是可能較難調整和控制。
3、飽和策略
當 Executor 已經關閉,或者 Executor 將有限邊界用於最大線程和工做隊列容量且已經飽和時,在方法 execute(Runnable) 中提交的新任務將被拒絕。線程池提供了4種飽和策略:
1)AbortPolicy。默認的飽和策略,直接拋出RejectedExecutionException異常。
2)CallerRunsPolicy。用調用者所在的線程來執行任務,此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度。
3)DiscardPolicy。直接丟棄任務。
4)DiscardOldestPolicy。若是執行程序還沒有關閉,則丟棄阻塞隊列中最靠前的任務,而後重試執行新任務(若是再次失敗,則重複此過程)。
也可使用自定義的 RejectedExecutionHandler 類,但須要很是當心,尤爲是當策略僅用於特定容量或排隊策略時。
4、threadFactory
使用 ThreadFactory 建立新線程,默認狀況下在同一個 ThreadGroup 中一概使用 Executors.defaultThreadFactory() 建立線程,這些線程具備相同的 NORM_PRIORITY 優先級和非守護進程狀態。經過自定義的 ThreadFactory建立新線程,能夠改變線程的名稱、線程組、優先級、守護進程狀態等。
5、workers用來存儲工做線程,注意HashSet<Worker>是非線程安全的,訪問時須要獲取mainLock;
6、mainLock是一個獨佔式可重入鎖,用來保證訪問workers和其餘監控變量(如largestPoolSize、completedTaskCount等)的線程安全。
7、keepAliveTime爲線程池的工做線程空閒後,保持存活的時間。因此若是任務不少,而且每一個任務執行的時間比較短,能夠調大這個時間,提升線程的利用率。allowCoreThreadTimeout變量表示是否容許核心線程超時,若是allowCoreThreadTimeOut=false,那麼當線程空閒時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize;若是allowCoreThreadTimeOut=true,那麼當線程空閒時間達到keepAliveTime時,線程會退出,直到線程數量=0。
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * Proceed in 3 steps: 6 * 7 * 1. If fewer than corePoolSize threads are running, try to 8 * start a new thread with the given command as its first 9 * task. The call to addWorker atomically checks runState and 10 * workerCount, and so prevents false alarms that would add 11 * threads when it shouldn't, by returning false. 12 * 13 * 2. If a task can be successfully queued, then we still need 14 * to double-check whether we should have added a thread 15 * (because existing ones died since last checking) or that 16 * the pool shut down since entry into this method. So we 17 * recheck state and if necessary roll back the enqueuing if 18 * stopped, or start a new thread if there are none. 19 * 20 * 3. If we cannot queue task, then we try to add a new 21 * thread. If it fails, we know we are shut down or saturated 22 * and so reject the task. 23 */ 24 int c = ctl.get(); //獲取線程池的狀態(runState和workerCount) 25 //若是線程數小於corePoolSize,新建一個線程執行該任務。 26 if (workerCountOf(c) < corePoolSize) { 27 if (addWorker(command, true)) 28 return; 29 c = ctl.get(); 30 } 31 //若是線程池是運行狀態,而且添加任務到隊列成功(隊列未滿) 32 if (isRunning(c) && workQueue.offer(command)) { 33 int recheck = ctl.get(); 34 //再次判斷線程池的運行狀態,若是不是運行狀態,須要從隊列刪除該任務。使用拒絕策略處理該任務。 35 if (! isRunning(recheck) && remove(command)) 36 reject(command); 37 //若是線程數爲0,執行addWorker方法。參數爲null的緣由是任務已經加入到隊列,新建的線程從隊列取任務執行便可。 38 else if (workerCountOf(recheck) == 0) 39 addWorker(null, false); 40 } 41 //線程池不是RUNNING狀態或隊列已滿,嘗試新建一個線程執行該任務。若是失敗則拒絕該任務。 42 else if (!addWorker(command, false)) 43 reject(command); 44 }
線程被封裝在Worker類中。
1 //參數firstTask表示新建線程執行的第一個任務。若是firstTask爲null,表示 2 //若是參數core=true,把corePoolSize做爲線程數上限的判斷條件;若是爲false,把maximumPoolSize做爲線程數上限的判斷條件 3 private boolean addWorker(Runnable firstTask, boolean core) { 4 retry: 5 for (;;) { 6 int c = ctl.get(); 7 int rs = runStateOf(c); 8 /* 9 * rs >= SHUTDOWN表示再也不接受新任務。 10 * 1)線程池的運行狀態爲SHUTDOWN;2)firstTask == null;3)阻塞隊列不爲空,只有這三個條件同時知足纔不返回false 11 */ 12 // Check if queue empty only if necessary. 13 if (rs >= SHUTDOWN && 14 ! (rs == SHUTDOWN && 15 firstTask == null && 16 ! workQueue.isEmpty())) 17 return false; 18 19 //自旋CAS遞增workerCount 20 for (;;) { 21 int wc = workerCountOf(c); 22 //若是線程數超過上限,返回false。若是參數core=true,把corePoolSize做爲線程數上限的判斷條件;若是爲false,把maximumPoolSize做爲線程數上限的判斷條件 23 if (wc >= CAPACITY || 24 wc >= (core ? corePoolSize : maximumPoolSize)) 25 return false; 26 //CAS遞增線程數。若是成功,跳出最外層循環;若是失敗,且運行狀態沒有改變,繼續內層循環直到成功。 27 if (compareAndIncrementWorkerCount(c)) 28 break retry; 29 //判斷runState是否改變,若是改變則繼續外層循環 30 c = ctl.get(); // Re-read ctl 31 if (runStateOf(c) != rs) 32 continue retry; 33 // else CAS failed due to workerCount change; retry inner loop 34 } 35 } 36 37 //走到這說明須要新建線程,且workerCount更新成功 38 //下面是新建Worker的過程。 39 boolean workerStarted = false; //新建的Worker是否啓動標識 40 boolean workerAdded = false; //新建的Worker是否被添加到workers標識 41 Worker w = null; 42 try { 43 final ReentrantLock mainLock = this.mainLock; 44 w = new Worker(firstTask); //新建Worker 45 final Thread t = w.thread; 46 //什麼狀況下線程會爲null呢?在ThreadFactory建立線程失敗時可能會出現。 47 if (t != null) { 48 mainLock.lock(); //獲取mainLock鎖。對workers(HashSet非線程安全)和largestPoolSize更新必須加鎖 49 try { 50 // Recheck while holding lock. 51 // Back out on ThreadFactory failure or if 52 // shut down before lock acquired. 53 int c = ctl.get(); 54 int rs = runStateOf(c); 55 /* 56 * 若是運行狀態是RUNNING,或者運行狀態是SHUTDOWN且firstTask爲null,纔將新建的Worker添加到workers 57 */ 58 if (rs < SHUTDOWN || 59 (rs == SHUTDOWN && firstTask == null)) { 60 if (t.isAlive()) // precheck that t is startable 61 throw new IllegalThreadStateException(); 62 workers.add(w); 63 //更新largestPoolSize,標識線程池曾經出現過的最大線程數 64 int s = workers.size(); 65 if (s > largestPoolSize) 66 largestPoolSize = s; 67 workerAdded = true; 68 } 69 } finally { 70 mainLock.unlock(); //釋放mainLock鎖 71 } 72 if (workerAdded) { 73 //啓動線程 74 t.start(); 75 workerStarted = true; 76 } 77 } 78 } finally { 79 //新建的Worker未啓動,進行失敗處理 80 if (! workerStarted) 81 addWorkerFailed(w); 82 } 83 return workerStarted; 84 }
每一個線程被封裝爲一個Worker類實例。Worker類繼承了AbstractQueuedSynchronizer,並實現了一個互斥非重入鎖。Worker類同時繼承了Runnable,Worker類的實例也是一個線程。
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable 4 { 5 /** 6 * This class will never be serialized, but we provide a 7 * serialVersionUID to suppress a javac warning. 8 */ 9 private static final long serialVersionUID = 6138294804551838833L; 10 11 /** Thread this worker is running in. Null if factory fails. */ 12 final Thread thread; //處理任務的線程 13 /** Initial task to run. Possibly null. */ 14 Runnable firstTask; //傳入的任務 15 /** Per-thread task counter */ 16 volatile long completedTasks; //完成的任務數 17 18 /** 19 * Creates with given first task and thread from ThreadFactory. 20 * @param firstTask the first task (null if none) 21 */ 22 Worker(Runnable firstTask) { 23 //同步狀態初始化爲-1,在執行runWorker方法前禁止中斷當前線程 24 setState(-1); // inhibit interrupts until runWorker 25 this.firstTask = firstTask; 26 this.thread = getThreadFactory().newThread(this); //經過ThreadFactory建立線程 27 } 28 29 /** Delegates main run loop to outer runWorker */ 30 public void run() { 31 runWorker(this); 32 } 33 34 // Lock methods 35 // 36 // The value 0 represents the unlocked state. 37 // The value 1 represents the locked state. 38 //實現了一個非重入互斥鎖,state=0表示解鎖狀態,state=1表示加鎖狀態 39 protected boolean isHeldExclusively() { 40 return getState() != 0; 41 } 42 43 protected boolean tryAcquire(int unused) { 44 if (compareAndSetState(0, 1)) { 45 setExclusiveOwnerThread(Thread.currentThread()); 46 return true; 47 } 48 return false; 49 } 50 51 protected boolean tryRelease(int unused) { 52 setExclusiveOwnerThread(null); 53 setState(0); 54 return true; 55 } 56 57 public void lock() { acquire(1); } 58 public boolean tryLock() { return tryAcquire(1); } 59 public void unlock() { release(1); } 60 public boolean isLocked() { return isHeldExclusively(); } 61 62 void interruptIfStarted() { 63 Thread t; 64 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 65 try { 66 t.interrupt(); 67 } catch (SecurityException ignore) { 68 } 69 } 70 } 71 }
1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 Runnable task = w.firstTask; 4 w.firstTask = null; 5 //Worker初始化時同步狀態置爲-1,此處進行解鎖操做目的是將同步狀態置爲0,容許中斷。 6 w.unlock(); // allow interrupts 7 boolean completedAbruptly = true; //是否由於異常跳出循環 8 try { 9 //若是firstTask爲null則經過getTask()方法從隊列中獲取。 10 //正常狀況下,會一直執行While循環,若是隊列爲空,getTask()方法中會阻塞當前線程,getTask()返回null時會跳出循環 11 while (task != null || (task = getTask()) != null) { 12 w.lock(); //加Worker鎖 13 // If pool is stopping, ensure thread is interrupted; 14 // if not, ensure thread is not interrupted. This 15 // requires a recheck in second case to deal with 16 // shutdownNow race while clearing interrupt 17 /* 18 * 若是線程池正在中止,要保證當前線程是中斷狀態 19 * 若是不是,則要保證當前線程不是中斷狀態 20 * STOP狀態要中斷線程池中的全部線程,而這裏使用Thread.interrupted()來判斷是否中斷是爲了確保在RUNNING或者SHUTDOWN狀態時線程是非中斷狀態的,由於Thread.interrupted()方法會復位中斷的狀態。 21 */ 22 if ((runStateAtLeast(ctl.get(), STOP) || 23 (Thread.interrupted() && 24 runStateAtLeast(ctl.get(), STOP))) && 25 !wt.isInterrupted()) 26 wt.interrupt(); 27 try { 28 beforeExecute(wt, task); //鉤子方法 29 Throwable thrown = null; 30 try { 31 task.run(); //調用任務的run方法,而不是start()方法,由於Worker自己就是一個線程類 32 } catch (RuntimeException x) { 33 thrown = x; throw x; 34 } catch (Error x) { 35 thrown = x; throw x; 36 } catch (Throwable x) { 37 thrown = x; throw new Error(x); 38 } finally { 39 afterExecute(task, thrown); //鉤子方法 40 } 41 } finally { 42 task = null; 43 w.completedTasks++; 44 w.unlock(); //釋放Worker鎖 45 } 46 } 47 completedAbruptly = false; 48 } finally { 49 //跳出循環,執行processWorkerExit()方法 50 processWorkerExit(w, completedAbruptly); 51 } 52 }
1 //若是返回null,在runWorker方法中會執行processWorkerExit,即關閉該線程。 2 private Runnable getTask() { 3 //表示上次從隊列獲取任務是否超時 4 boolean timedOut = false; // Did the last poll() time out? 5 6 retry: 7 for (;;) { 8 int c = ctl.get(); 9 int rs = runStateOf(c); 10 11 // Check if queue empty only if necessary. 12 // 若是rs >= STOP,或者 rs=SHUTDOWN且隊列爲空,此時再也不接收新任務,將WorkerCount遞減並返回null。 13 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 14 decrementWorkerCount(); //自旋CAS遞減workerCount直到成功 15 return null; 16 } 17 18 //timed用於判斷是否須要重試控制 19 boolean timed; // Are workers subject to culling? 20 21 for (;;) { 22 //allowCoreThreadTimeOut默認是false,核心線程不進行超時控制,當線程數量大於corePoolSize時須要進行超時控制 23 int wc = workerCountOf(c); 24 timed = allowCoreThreadTimeOut || wc > corePoolSize; 25 26 //若是wc <= maximumPoolSize ,且上次從隊列獲取任務超時或本次須要進行超時控制,則跳出內層循環。 27 //timedOut=true表示上次從隊列獲取元素超時,說明隊列在上次獲取的keepAliveTime時間內是空的。 28 //timed=true說明線程數量大於corePoolSize。 29 //因此timedOut=true和timed=true同時知足則說明當前線程已經空閒了keepAliveTime時間,而且線程池的數量大於corePoolSize。這時就須要關閉多餘的空閒線程(即compareAndDecrementWorkerCount並返回null)。 30 if (wc <= maximumPoolSize && ! (timedOut && timed)) 31 break; 32 //若是線程數量大於maximumPoolSize,或者上次從隊列獲取任務超時且本次須要進行超時控制。須要遞減WorkerCount,若是遞減成功則返回null 33 if (compareAndDecrementWorkerCount(c)) 34 return null; 35 //檢查線程池運行狀態是否改變。若是改變,那麼繼續外層循環,若是未改變,那麼繼續內層循環。 36 c = ctl.get(); // Re-read ctl 37 if (runStateOf(c) != rs) 38 continue retry; 39 // else CAS failed due to workerCount change; retry inner loop 40 } 41 42 try { 43 Runnable r = timed ? 44 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 45 //超時方式獲取,注意keepAliveTime爲超出corePoolSize大小的線程的空閒存活時間 46 workQueue.take(); //阻塞方式獲取,若是隊列爲空阻塞當前線程 47 if (r != null) 48 return r; 49 timedOut = true; //若是超時,繼續循環。 50 } catch (InterruptedException retry) { 51 //若是發生中斷,則將timedOut置爲false,繼續循環 52 timedOut = false; 53 } 54 } 55 }
1 private void processWorkerExit(Worker w, boolean completedAbruptly) { 2 //若是completedAbruptly=false,說明是由getTask返回null致使的,WorkerCount遞減的操做已經執行。 3 //若是completedAbruptly=true,說明是由執行任務的過程當中發生異常致使,須要進行WorkerCount遞減的操做。 4 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 5 decrementWorkerCount(); 6 7 final ReentrantLock mainLock = this.mainLock; 8 mainLock.lock(); 9 try { 10 completedTaskCount += w.completedTasks; 11 workers.remove(w); //從workers中刪除當前worker,對workers更新須要加mainLock鎖。 12 } finally { 13 mainLock.unlock(); 14 } 15 16 tryTerminate(); 17 18 //若是是異常結束(completedAbruptly=true),須要從新調用addWorker()增長一個線程,保持線程數量。 19 //若是是由getTask()返回null致使的線程結束,須要進行如下判斷: 20 // 1)若是allowCoreThreadTimeOut=true且隊列不爲空,那麼須要至少保證有一個線程。 21 // 2)若是allowCoreThreadTimeOut=false,那麼須要保證線程數大於等於corePoolSize。 22 // 23 int c = ctl.get(); 24 if (runStateLessThan(c, STOP)) { 25 if (!completedAbruptly) { 26 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 27 if (min == 0 && ! workQueue.isEmpty()) 28 min = 1; 29 if (workerCountOf(c) >= min) 30 return; // replacement not needed 31 } 32 addWorker(null, false); 33 } 34 }
1 //根據線程池狀態判斷是否結束線程池 2 final void tryTerminate() { 3 for (;;) { 4 int c = ctl.get(); 5 //若是線程池運行狀態是RUNNING,或者大於等於TIDYING,或者運行狀態爲SHUTDOWN且隊列爲空,則直接return。 6 if (isRunning(c) || 7 runStateAtLeast(c, TIDYING) || 8 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 9 return; 10 //若是線程數不爲0,則中斷一個空閒線程並return。爲何有這一步操做。 11 if (workerCountOf(c) != 0) { // Eligible to terminate 12 interruptIdleWorkers(ONLY_ONE); 13 return; 14 } 15 16 final ReentrantLock mainLock = this.mainLock; 17 mainLock.lock(); 18 try { 19 //嘗試將狀態設置爲TIDYING狀態, 20 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 21 try { 22 //若是CAS成功,執行terminated()方法 23 terminated(); 24 } finally { 25 ctl.set(ctlOf(TERMINATED, 0)); 26 termination.signalAll(); 27 } 28 return; 29 } 30 } finally { 31 mainLock.unlock(); 32 } 33 // else retry on failed CAS 34 } 35 }
線程池運行狀態由RUNNING到SHUTDOWN的轉換。
1 public void shutdown() { 2 final ReentrantLock mainLock = this.mainLock; 3 mainLock.lock(); 4 try { 5 //安全管理,檢查方法調用者是否有權限中斷Worker線程 6 checkShutdownAccess(); 7 //運行狀態改成SHUTDOWN 8 advanceRunState(SHUTDOWN); //自旋CAS 9 //中斷空閒線程 10 interruptIdleWorkers(); 11 onShutdown(); // hook for ScheduledThreadPoolExecutor 12 } finally { 13 mainLock.unlock(); 14 } 15 //嘗試結束線程池 16 tryTerminate(); 17 } 18 19 private void interruptIdleWorkers() { 20 interruptIdleWorkers(false); 21 } 22 23 private void interruptIdleWorkers(boolean onlyOne) { 24 final ReentrantLock mainLock = this.mainLock; 25 mainLock.lock(); //對workers的操做須要獲取mainLock 26 try { 27 //遍歷全部的線程,若是沒有被中斷且獲取鎖成功則中斷線程。獲取鎖失敗時極可能該線程正在執行任務(woker執行任務時須要對woker加鎖)。 28 for (Worker w : workers) { 29 Thread t = w.thread; 30 if (!t.isInterrupted() && w.tryLock()) { 31 try { 32 t.interrupt(); 33 } catch (SecurityException ignore) { 34 } finally { 35 w.unlock(); 36 } 37 } 38 if (onlyOne) 39 break; 40 } 41 } finally { 42 mainLock.unlock(); 43 } 44 }
1 public List<Runnable> shutdownNow() { 2 List<Runnable> tasks; 3 final ReentrantLock mainLock = this.mainLock; 4 mainLock.lock(); 5 try { 6 checkShutdownAccess(); 7 advanceRunState(STOP); 8 //中斷全部線程,即便線程正在執行任務 9 interruptWorkers(); 10 //取出隊列中的任務 11 tasks = drainQueue(); 12 } finally { 13 mainLock.unlock(); 14 } 15 //嘗試結束線程池 16 tryTerminate(); 17 return tasks; 18 } 19 20 private void interruptWorkers() { 21 final ReentrantLock mainLock = this.mainLock; 22 mainLock.lock(); 23 try { 24 for (Worker w : workers) 25 w.interruptIfStarted(); 26 } finally { 27 mainLock.unlock(); 28 } 29 } 30 31 private List<Runnable> drainQueue() { 32 BlockingQueue<Runnable> q = workQueue; 33 List<Runnable> taskList = new ArrayList<Runnable>(); 34 q.drainTo(taskList); 35 if (!q.isEmpty()) { 36 for (Runnable r : q.toArray(new Runnable[0])) { 37 if (q.remove(r)) 38 taskList.add(r); 39 } 40 } 41 return taskList; 42 }
利用FutureTask能夠實現獲取異步任務的返回值、取消異步任務等功能。看一下ThreadPoolExecutor的submit方法。submit方法根據任務構造一個FutureTask對象並返回,在主線程中能夠根據FutureTask提供的方法進行任務取消和獲取異步任務的返回值。
1 public <T> Future<T> submit(Callable<T> task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<T> ftask = newTaskFor(task); 4 execute(ftask); //實際執行的任務是ftask 5 return ftask; 6 }
private volatile int state; //狀態,新建立時狀態爲NEW private static final int NEW = 0; //新建立 private static final int COMPLETING = 1; //正在執行 private static final int NORMAL = 2; //正常完成 private static final int EXCEPTIONAL = 3; //執行過程當中出現異常 private static final int CANCELLED = 4; //被取消 private static final int INTERRUPTING = 5; // private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; //要執行的任務 /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; //執行callable的線程 /** Treiber stack of waiting threads */ private volatile WaitNode waiters; //Treiber算法實現的棧,用於存儲等待的線程 static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
狀態的轉換有如下幾種狀況:
1)NEW -> COMPLETING -> NORMAL 正常執行並返回;
2)NEW -> COMPLETING -> EXCEPTIONAL 執行過程當中出現異常;
3)NEW -> CANCELLED 執行前被取消
4)NEW -> INTERRUPTING -> INTERRUPTED 取消時被中斷。
1 public FutureTask(Callable<V> callable) { 2 if (callable == null) 3 throw new NullPointerException(); 4 this.callable = callable; 5 this.state = NEW; // ensure visibility of callable 6 } 7 8 public FutureTask(Runnable runnable, V result) { 9 //因爲Runnable沒有返回值,經過Executors將Runnable轉換爲Callable。 10 this.callable = Executors.callable(runnable, result); 11 this.state = NEW; // ensure visibility of callable 12 }
1 public void run() { 2 //只執行state=NEW的任務。若是state!=NEW說明任務已經執行。 3 //若是state=NEW,則經過CAS將runner置爲當前線程。若是失敗說明其餘線程已經執行。 4 if (state != NEW || 5 !UNSAFE.compareAndSwapObject(this, runnerOffset, 6 null, Thread.currentThread())) 7 return; 8 try { 9 Callable<V> c = callable; 10 if (c != null && state == NEW) { 11 V result; //任務執行結果 12 boolean ran; //任務執行期間是否發生異常 13 try { 14 result = c.call(); //執行任務 15 ran = true; 16 } catch (Throwable ex) { 17 result = null; 18 ran = false; 19 //若是發生異常,執行setException(ex) 20 setException(ex); 21 } 22 //若是正常結束,執行set(result). 23 if (ran) 24 set(result); 25 } 26 } finally { 27 // runner must be non-null until state is settled to 28 // prevent concurrent calls to run() 29 //無論任務執行是否正常,都須要將runner置爲null 30 runner = null; 31 // state must be re-read after nulling runner to prevent 32 // leaked interrupts 33 //防止中斷泄露,須要結合cancel方法研究 34 //若是s>=INTERRUPTING,說明狀態變換爲NEW -> INTERRUPTING -> INTERRUPTED,即在取消時被中斷。 35 int s = state; 36 if (s >= INTERRUPTING) 37 handlePossibleCancellationInterrupt(s); 38 } 39 }
任務執行正常結束:
1 //任務正常結束,經過CAS更新state爲COMPLETING,若是成功,將state更新爲NORMAL,喚醒等待線程。 2 protected void set(V v) { 3 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 4 outcome = v; //將運行結果result賦給outcome 5 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state 6 //刪除和喚醒全部的等待線程 7 finishCompletion(); 8 } 9 }
任務執行時發生異常:
1 //任務執行時發生異常,經過CAS更新state爲COMPLETING,若是成功,將state更新爲EXCEPTIONAL,喚醒等待線程 2 protected void setException(Throwable t) { 3 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 4 outcome = t; //將異常信息賦給outcome 5 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state 6 finishCompletion(); 7 } 8 }
喚醒等待獲取任務運行結果的線程:
1 private void finishCompletion() { 2 // assert state > COMPLETING; 3 //自旋CAS更新waiters爲null直到成功 4 for (WaitNode q; (q = waiters) != null;) { 5 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 6 for (;;) { 7 Thread t = q.thread; 8 if (t != null) { 9 q.thread = null; 10 LockSupport.unpark(t); //喚醒等待線程,WaitNode是在get方法中添加的 11 } 12 WaitNode next = q.next; 13 if (next == null) 14 break; 15 q.next = null; // unlink to help gc 16 q = next; 17 } 18 break; 19 } 20 } 21 22 done(); //hook方法,默認不執行任何操做,子類能夠重寫該方法完成指定的功能(例如:回調) 23 24 callable = null; // to reduce footprint 25 }
handlePossibleCancellationInterrupt方法要確保cancel(true)產生的中斷髮生在run或runAndReset方法執行的過程當中。這裏會循環的調用Thread.yield()來確保狀態在cancel方法中被設置爲INTERRUPTED。
1 private void handlePossibleCancellationInterrupt(int s) { 2 // It is possible for our interrupter to stall before getting a 3 // chance to interrupt us. Let's spin-wait patiently. 4 if (s == INTERRUPTING) 5 while (state == INTERRUPTING) 6 Thread.yield(); // wait out pending interrupt 7 8 // assert state == INTERRUPTED; 9 10 // We want to clear any interrupt we may have received from 11 // cancel(true). However, it is permissible to use interrupts 12 // as an independent mechanism for a task to communicate with 13 // its caller, and there is no way to clear only the 14 // cancellation interrupt. 15 // 16 // Thread.interrupted(); 17 }
1 public V get() throws InterruptedException, ExecutionException { 2 int s = state; 3 //若是state爲NEW或COMPLETING,調用awaitDone方法將當前線程添加到waiters中並阻塞 4 if (s <= COMPLETING) 5 s = awaitDone(false, 0L); 6 //若是已經完成(包括正常結束或異常結束),返回 7 return report(s); 8 } 9 10 //若是超時則拋出TimeoutException異常 11 public V get(long timeout, TimeUnit unit) 12 throws InterruptedException, ExecutionException, TimeoutException { 13 if (unit == null) 14 throw new NullPointerException(); 15 int s = state; 16 if (s <= COMPLETING && 17 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) 18 throw new TimeoutException(); 19 return report(s); 20 }
awaitDone方法,阻塞線程。
1 //timed參數表示是否使用超時機制 2 private int awaitDone(boolean timed, long nanos) 3 throws InterruptedException { 4 final long deadline = timed ? System.nanoTime() + nanos : 0L; 5 WaitNode q = null; 6 boolean queued = false; //是否已經入棧 7 for (;;) { 8 //若當前線程被中斷,則刪除q並拋出InterruptedException() 9 if (Thread.interrupted()) { 10 removeWaiter(q); 11 throw new InterruptedException(); 12 } 13 14 int s = state; 15 //若是state大於COMPLETING,代表任務已經完成,則將節點q的線程置爲null並返回狀態值。 16 if (s > COMPLETING) { 17 if (q != null) 18 q.thread = null; 19 return s; 20 } 21 //s==COMPLETING,說明任務已經執行完成但尚未設置最終狀態。 22 //Thread.yield();讓當前正在運行的線程回到可運行狀態,以容許其餘線程(包括當前線程)得到運行的機會。注意目的是嘗試讓狀態改變,繼續下個循環。 23 else if (s == COMPLETING) // cannot time out yet 24 Thread.yield(); 25 else if (q == null) 26 q = new WaitNode(); //新建WaitNode節點 27 //CAS添加到waiters棧,在阻塞以前先將節點q添加棧,入棧成功後queued更新爲true。 28 else if (!queued) 29 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 30 q.next = waiters, q); 31 else if (timed) { 32 nanos = deadline - System.nanoTime(); 33 //若是已通過期,則刪除節點q並返回 34 if (nanos <= 0L) { 35 removeWaiter(q); 36 return state; 37 } 38 LockSupport.parkNanos(this, nanos); //超時機制阻塞當前線程 39 } 40 else 41 LockSupport.park(this); //阻塞當前線程 42 } 43 } 44 45 //刪除指定節點(Treiber算法實現的棧) 46 private void removeWaiter(WaitNode node) { 47 if (node != null) { 48 node.thread = null; //將線程置爲null,由於下面要根據thread是否爲null判斷是否要把node移出 49 retry: 50 for (;;) { // restart on removeWaiter race 51 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { 52 s = q.next; 53 if (q.thread != null) 54 pred = q; 55 else if (pred != null) { 56 pred.next = s; 57 if (pred.thread == null) // check for race 58 continue retry; 59 } 60 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) 61 continue retry; 62 } 63 break; 64 } 65 } 66 }
report方法,返回運行結果或拋出異常。
1 //任務完成返回執行結果或拋出異常 2 private V report(int s) throws ExecutionException { 3 Object x = outcome; 4 //若是任務正常完成,返回執行結果 5 if (s == NORMAL) 6 return (V)x; 7 //若是s >= CANCELLED,說明任務被取消,那麼就拋出CancellationException 8 if (s >= CANCELLED) 9 throw new CancellationException(); 10 //最後s==EXCEPTIONAL,任務執行時發生異常,拋出該異常 11 throw new ExecutionException((Throwable)x); 12 }
試圖取消對此任務的執行。若是任務已完成、或已取消,或者因爲某些其餘緣由而沒法取消,則此嘗試將失敗。當調用 cancel 時,若是調用成功,而此任務還沒有啓動,則此任務將永不運行。若是任務已經啓動,則 mayInterruptIfRunning 參數肯定是否應該以試圖中止任務的方式來中斷執行此任務的線程。
1 public boolean cancel(boolean mayInterruptIfRunning) { 2 //若state != NEW,說明任務已經啓動,則直接返回失敗。 3 if (state != NEW) 4 return false; 5 //若是mayInterruptIfRunning爲true,要中斷當前執行任務的線程。 6 if (mayInterruptIfRunning) { 7 //CAS更新state爲INTERRUPTING不成功,說明state已被改變(即state != NEW),則直接返回失敗。若是成功則中斷正在執行任務的線程,並喚醒等待獲取結果的線程。 8 if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) 9 return false; 10 Thread t = runner; 11 if (t != null) 12 t.interrupt(); //中斷當前線程 13 //更新state爲INTERRUPTED 14 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state 15 } 16 //mayInterruptIfRunning=flase,CAS更新state爲CANCELLED,若成功則喚醒等待的線程(不中斷正在執行任務的線程),若失敗返回false。 17 else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) 18 return false; 19 finishCompletion(); 20 return true; 21 }
Executors是一個工具類,提供了公共的靜態方法,例如建立默認線程工廠、建立線程池、把Runnable包裝成Callable的方法等。
DefaultThreadFactory類
1 static class DefaultThreadFactory implements ThreadFactory { 2 private static final AtomicInteger poolNumber = new AtomicInteger(1); //線程池序號 3 private final ThreadGroup group; //線程組 4 private final AtomicInteger threadNumber = new AtomicInteger(1); //線程號 5 private final String namePrefix; 6 7 DefaultThreadFactory() { 8 SecurityManager s = System.getSecurityManager(); 9 group = (s != null) ? s.getThreadGroup() : 10 Thread.currentThread().getThreadGroup(); 11 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; 12 } 13 14 public Thread newThread(Runnable r) { 15 Thread t = new Thread(group, r, 16 namePrefix + threadNumber.getAndIncrement(), //線程名 17 0); 18 //非守護線程 19 if (t.isDaemon()) 20 t.setDaemon(false); 21 //相同的優先級 22 if (t.getPriority() != Thread.NORM_PRIORITY) 23 t.setPriority(Thread.NORM_PRIORITY); 24 return t; 25 } 26 }
建立默認工廠方法:
1 public static ThreadFactory defaultThreadFactory() { 2 return new DefaultThreadFactory(); 3 }
1) newFixedThreadPool方法
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
固定線程數的線程池,corePoolSize和 maximumPoolSize 都被設置爲nThreads,keepAliveTime=0,因爲corePoolSize等於maximumPoolSize,因此keepAliveTime和maximumPoolSize參數是無效的。阻塞隊列是LinkedBlockingQueue,是一個無界隊列。正常狀況下(未執行方法shutdown()或shutdownNow()),不會調用飽和策略。
2)newSingleThreadExecutor方法
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
單個線程的線程池,corePoolSize和maximumPoolSize都爲1,其餘同FixedThreadPool。能保證任務按順序執行。
3)newCachedThreadPool方法
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
線程數可改變的線程池,corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,核心線程數爲0,最大線程數爲CAPACITY(由於CAPACITY<Integer.MAX_VALUE).keepAliveTime=60L,意味着CachedThreadPool中的空閒線程等待新任務的最長時間爲60秒,空閒線程超過60秒後將會被終止。CachedThreadPool使用沒有容量的SynchronousQueue做爲線程池的工做隊列.這意味着,若是主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CachedThreadPool會不斷建立新線程。極端狀況下,CachedThreadPool會由於建立過多線程而耗盡CPU和內存資源。
1 public static <T> Callable<T> callable(Runnable task, T result) { 2 if (task == null) 3 throw new NullPointerException(); 4 return new RunnableAdapter<T>(task, result); 5 } 6 7 public static Callable<Object> callable(Runnable task) { 8 if (task == null) 9 throw new NullPointerException(); 10 return new RunnableAdapter<Object>(task, null); 11 }