在前面的一篇文章裏我對java threadpool的幾種基本應用方法作了個總結。Java的線程池針對不一樣應用的場景,主要有固定長度類型、可變長度類型以及定時執行等幾種。針對這幾種類型的建立,java中有一個專門的Executors類提供了一系列的方法封裝了具體的實現。這些功能和用途不同的線程池主要依賴於ThreadPoolExecutor,ScheduledThreadPoolExecutor等幾個類。如前面文章討論所說,這些類和相關類的主要結構以下:java
這裏不是對全部類的詳細實現作一個分析,而是從現有線程池ThreadPoolExecutor的源代碼出發,分析一個線程池應該考慮的要點。從本文總體的方向來講,主要結合前面文章中提交線程給線程池以後分爲返回結果和不返回結果的方式,按照他們執行的脈絡來分析當咱們提交一個線程到線程池以後他們內部是如何運行的。順便也詳細理解線程池這種參考實現的內部結構。安全
咱們從最初使用多線程的代碼開始,在一些示例代碼裏,咱們經過Executors.newFixedThreadPool()等方法建立了一個ExecutorService類型的線程池。實際上具體實現對應的是ThreadPoolExecutor等。而後咱們再使用這個對象的execute或者submit方法。前面咱們瞭解到,一般咱們用execute方法執行一個線程不經過這個方法自己返回執行結果或者咱們不須要利用這個方法來獲取結果。而submit方法是須要獲得結果的。那麼他們二者一個要結果,一個不要結果的是怎麼統一塊兒來的呢?若是咱們看類AbstractExecutorService的以下代碼則就能夠理解了:多線程
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
前面這30多行代碼沒什麼特別的,主要針對不一樣類型的方法簽名。他們能夠接收Runnable, Callable類型的參數。對於須要返回結果的類型,經過專門一個變量來保存結果。整體來講至關於一個簡單的包裝。而具體執行的代碼仍是要看execute方法。這裏須要注意的一點就是newTaskFor(task)方法經過一個包裝類將Callable變量包裝成一個線程,讓它能夠運行。ide
既然前面的兩種方法都歸結於同一個方法execute,那麼咱們就來看看它的具體實現:oop
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
這是ThreadPoolExecutor裏面的代碼。前面這部分的代碼看起來比較困難,並且也比較難懂。別急,咱們先把代碼放這裏。在討論這些詳細實現的思路前,咱們先看看幾個要實現線程池須要考慮的點。ui
假定咱們要實現一個線程池,那麼有哪些地方是咱們須要認真考慮的呢?從線程池自己的定義來看,它是將一組事先建立好的線程放在一個資源池裏,當須要的時候就將該線程分配給具體的任務來執行。那麼,這個池子該有多大呢?咱們線程池確定要面臨多個線程資源的訪問,是否是自己的結構要保證線程安全呢?還有,若是線程池建立好以後咱們後續有若干任務使用了線程資源,當池裏面的資源使用完以後咱們該如何安排呢?是給線程池擴容,建立更多的線程資源,仍是增長一個隊列,讓一些任務先在裏面排隊呢?在一些極端的狀況下,好比說來的任務實在是太多了線程池處理不過來,對於這些任務該怎麼處理呢?是丟棄仍是通知給請求方?線程執行的時候會有碰到異常或者錯誤的狀況,這些異常咱們該怎麼處理?怎麼樣保證這些異常的處理不會致使線程池其餘任務的正常運行不出錯呢?this
總的來講,前面的這幾個問題能夠歸結爲一下幾個方面:atom
1. 線程池的結構。spa
2. 線程池的任務分配策略。
3. 線程池的異常和錯誤處理。
下面,咱們針對這幾個問題結合源代碼詳細的分析一下。
在ThreadPoolExecutor裏面有一個AtomicInteger的數值,它用來表示兩個信息,一個是當前線程池的狀態,還有一個就是當前線程的數目。由於這兩部分都是糅合到一個整型數字裏頭,因此他們的信息訪問就比較緊湊和特殊一點:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
這部分的代碼看起來有點怪異,其實很好理解。咱們設定的線程池裏面最多能夠容納的線程數爲(2^29) -1。這也就是爲何前面用一個Integer.SIZE - 3做爲位數。這樣這個整數的0-28位表示的就是線程的數目。而高位的部分,29-31位的地方則表示線程池的狀態。這裏定義的主要有5種狀態,分別對應的值是從-1到3.
他們對應着線程的running, shutdown, stop, tidying, terminated這幾個狀態。
除了前面的幾個部分之外,線程池裏還有以下幾個成員:
private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition(); /** * Tracks largest attained pool size. Accessed only under * mainLock. */ private int largestPoolSize; /** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. */ private long completedTaskCount; private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. */ private volatile RejectedExecutionHandler handler; /** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. */ private volatile long keepAliveTime; /** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. */ private volatile boolean allowCoreThreadTimeOut; /** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. */ private volatile int maximumPoolSize;
這些部分的內容看起來比較多,實際上他們幾個都是在一些方法裏常常用到的。
workQueue: 一個BlockingQueue<Runnable>隊列,自己的結構能夠保證訪問的線程安全。至關於一個排隊等待隊列。當咱們線程池裏線程達到corePoolSize的時候,一些須要等待執行的線程就放在這個隊列裏等待。
workers: 一個HashSet<Worker>的集合。線程池裏全部能夠當即執行的線程都放在這個集合裏。
mainLock: 一個訪問workers所須要使用的鎖。從前面的workQueue, workers這兩個結構咱們能夠看到,若是咱們要往線程池裏面增長執行任務或者執行完畢一個任務,都要訪問到這兩個結構。因此大多數狀況下爲了保證線程安全,就須要使用mainLock這個鎖。
corePoolSize: 處於活躍狀態的最少worker數目。咱們一個線程池裏確定事先建立好了若干個,等來執行任務的時候直接拿去就能夠跑了。那麼到底要保證最初有多少個呢?就由corePoolSize這個來指定了。
maximumPoolSize:線程池最大的長度。能夠設置的一個參數。在咱們當前池裏面的線程數到達這個數字的時候就不能再往裏面加了。須要注意的是這裏是咱們設定的一個池最大範圍。在這裏能夠設定的最大數字是(2^29) -1。
其餘還有幾個牽涉到的成員好比說RejectedExecutionHandler等,相對都比較簡單一點,代碼裏的註釋就已經可以說清楚了。
ok,有了前面這幾個基本成員的說明,咱們再看看他們使用的work的結構。既然執行的都是一個Worker的集合。那麼在這裏Worker的定義是什麼樣的呢?下面是Worker的定義代碼:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() == 1; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } }
這裏咱們能夠看到Worker自己實現了Runnable接口,因此它能夠當成一個線程來執行。而後也繼承了AbstractQueuedSynchronizer,也能夠實現一些對自己的線程同步訪問。這裏最重要的幾個部分在於它裏面定義了一個Thread thread和Runnable firstTask。看到這裏,咱們可能會比較奇怪,咱們只是要一個能夠執行的線程,這裏放一個Thread和一個Runnable的變量作什麼呢?在Worker的run方法裏,調用的runWorker方法究竟是怎麼執行的呢?咱們再來看看runWorker方法:
final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun(); try { beforeExecute(w.thread, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
這部分代碼看起來挺多的,裏面的beforeExecute,afterExecute的方法在默認的實現裏是空的。在一些有特定要求的地方能夠經過繼承ThreadPoolExecutor提供自定義的實現。和前面的定義結合起來看,看來Worker這裏也沒幹什麼別的,就是繞了個圈執行了裏面設定的firstTask。
若是咱們仔細看其中的代碼,還有一個須要注意的地方就是這裏用了一個while循環來執行task,而跳出循環的條件則是要task爲null。那麼這個getTask是作了什麼呢?通常來講咱們給它一個線程,執行完任務不就完了嗎?要它再去拿線程幹嗎啊?咱們看看它的實現:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
從代碼的註釋裏能夠看到,這個方法要從workQueue裏面獲取task而後提交執行。也就是說咱們線程池執行完了當前的任務後會主動到這個隊列裏來取後續等待的任務執行。若是當前線程由於超時、線程池要關閉等狀態影響則可能會退出,而若是一切都正常的話,則會從workQueue裏面調用poll或take方法取到當前任務。前面一大堆的判斷和循環就是判斷當前線程池長度是否超過maximumPoolSize以及當前狀態是否要關閉了。若是長度超了或者狀態不對則不必繼續去取任務執行了,須要儘快返回。
有了前面這部分的分析,咱們知道Worker只不過包含了一個指向咱們須要建立的Runnable對象,而後在Worker做爲線程執行的時候再來運行這個Runnable裏面的線程執行部分。
有了前面那部分的鋪墊,咱們再來回過頭看線程池拿到一個任務後execute方法的執行。咱們將這些代碼拆開來看,這是第一部分:
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
這裏獲取到當前正在執行的線程數目,若是這些線程的數目少於corePoolSize,則將該線程加入到線程池中。而後返回。
另一部分的代碼以下:
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
這裏是假定若是前面不能直接加入到線程池Worker集合裏,則加入到workQueue隊列等待執行。裏面的if else判斷語句則是檢查當前線程池的狀態。若是線程池自己的狀態是要關閉並清理了,咱們則不能提交線程進去了。這裏咱們就要reject他們。因此前面咱們看到的一些線程池拒絕線程執行的機制在這裏也獲得了驗證。
最後面這部分的代碼以下:
else if (!addWorker(command, false)) reject(command);
這裏對應代碼註釋裏的第3種狀況,咱們前面作了兩種嘗試,一個是將線程加入到workers集合或者workerQueue隊列排隊。在這兩種狀況都失敗的狀況下,咱們嘗試加入一個新的線程。若是這種狀況下咱們也失敗了,則拒絕線程提交執行。
這裏幾個地方都用到了addWorker方法,而咱們既然是execute方法,確定要讓線程執行起來。但是這裏沒有見到那個地方調用線程的start方法。那麼極可能這個線程方法的具體調用就在addWorker方法裏。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } Worker w = new Worker(firstTask); Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); tryTerminate(); return false; } workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } finally { mainLock.unlock(); } t.start(); // It is possible (but unlikely) for a thread to have been // added to workers, but not yet started, during transition to // STOP, which could result in a rare missed interrupt, // because Thread.interrupt is not guaranteed to have any effect // on a non-yet-started Thread (see Thread#interrupt). if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; }
前面的嵌套for循環主要是用來判斷當前線程池的狀態是否能夠容許繼續加線程,同時也判斷線程池的長度是否已經超標。固然,既然咱們有一個線程加入了執行,當前運行的數量也要更新。若是沒問題,則經過break retry;跳出這兩個循環開始後面的正式執行。
在正式執行的時候咱們建立一個Worker對象,並將mainLock加鎖。保證後續執行部分是單線程執行的。在進入加鎖的部分以後還須要再一次檢查一下線程池的狀態。這裏咱們將當前的線程加入到workers集合。而後咱們經過t.start()方法正式執行線程。在這裏一個線程纔算是真正的執行起來了。
前面咱們看了一下線程池的執行機制。在默認的線程池實現裏,它是經過一個workers集合來保持最核心活躍狀態的線程組。當咱們新加入線程執行任務時,則先利用這裏的線程。若是這裏的被佔用滿了以後則加入到workQueue這個隊列裏排隊。這裏面有一個重要的地方就是要常常檢查當前線程池的狀態,只有在運行狀態的時候才能夠往裏面加線程,不然提交線程任務則會被拒絕。咱們也要檢查線程池的長度,防止提交的執行任務達到了咱們設定的上限。爲了保證線程的提交和執行安全,咱們用一個lock來管理對線程集合workers和workerQueue的加鎖控制。
在線程池中也有一些擴展點。好比在線程執行的過程當中咱們能夠覆寫beforeExecute, afterExecute方法來提供本身特定的功能。另外,當線程執行不符合條件要被丟棄或者拒絕的時候,咱們也能夠提供一些RejectExecutionHandler的具體實現。在系統的默認實現裏已經提供了5種。
總的來講,對於一個線程池,它最核心的部分是對應一個線程運行集合和一個隊列。若是咱們可以保證好他們的狀態、大小以及線程安全執行,那麼基本上一個線程的雛形就差很少完成了。