接上文:java線程池的原理學習(二)java
ThreadPoolExecutor
類中將線程狀態( runState
)分爲了如下五種:segmentfault
RUNNING
:能夠接受新任務而且處理進入隊列中的任務SHUTDOWN
:不接受新任務,可是仍然執行隊列中的任務STOP
:不接受新任務也不執行隊列中的任務TIDYING
:全部任務停止,隊列爲空,進入該狀態下的任務會執行terminated()
方法TERMINATED
:terminated()
方法執行完成後進入該狀態緩存
RUNNING
-> SHUTDOWN
oop
調用了 shutdown()
方法,多是在 finalize()
方法中被隱式調用學習
(RUNNING or SHUTDOWN)
-> STOP
this
調用 shutdownNow()
線程
SHUTDOWN
-> TIDYING
code
當隊列和線程池都爲空時接口
STOP
-> TIDYING
隊列
線程池爲空時
TIDYING
-> TERMINATED
terminated()
方法執行完成
若是查看 ThreadPoolExecutor
的源碼,會發現開頭定義了這幾個變量來表明線程狀態和活動線程的數量:
//原子變量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
這個類中將二進制數分爲了兩部分,高位表明線程池狀態( runState
),低位表明活動線程數( workerCount
), CAPACITY
表明最大的活動線程數,爲2^29-1,下面爲了更直觀的看到這些數我作了些打印:
public class Test1 { public static void main(String[] args) { final int COUNT_BITS = Integer.SIZE - 3; final int CAPACITY = (1 << COUNT_BITS) - 1; final int RUNNING = -1 << COUNT_BITS; final int SHUTDOWN = 0 << COUNT_BITS; final int STOP = 1 << COUNT_BITS; final int TIDYING = 2 << COUNT_BITS; final int TERMINATED = 3 << COUNT_BITS; System.out.println(Integer.toBinaryString(CAPACITY)); System.out.println(Integer.toBinaryString(RUNNING)); System.out.println(Integer.toBinaryString(SHUTDOWN)); System.out.println(Integer.toBinaryString(STOP)); System.out.println(Integer.toBinaryString(TIDYING)); System.out.println(Integer.toBinaryString(TERMINATED)); } }
輸出:
11111111111111111111111111111 11100000000000000000000000000000 0 100000000000000000000000000000 1000000000000000000000000000000 1100000000000000000000000000000
打印的時候會將高位0省略
能夠看到,第一行表明線程容量,後面5行提取高3位獲得:
111 - RUNNING 000 - SHUTDOWN 001 - STOP 010 - TIDYING 011 - TERMINATED
分別對應5種狀態,能夠看到這樣定義以後,只須要經過簡單的移位操做就能夠進行狀態的轉換。
execute
方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); /**分三步執行 * 若是workerCount<corePoolSize,則建立一個新線程執行該任務 */ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) //建立成功則return return; c = ctl.get(); //建立失敗從新讀取狀態,隨時保持狀態的最新 } /** * workerCount>=corePoolSize,判斷線程池是否處於運行狀態,再將任務加入隊列 * */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //用於double check //若是線程池處於非運行態,則將任務從緩存隊列中刪除 if (! isRunning(recheck) && remove(command)) reject(command); //拒絕任務 else if (workerCountOf(recheck) == 0) //若是活動線程數爲0,則建立新線程 addWorker(null, false); } //若是線程池不處於RUNNING狀態,或者workQueue滿了,則執行如下代碼 else if (!addWorker(command, false)) reject(command); }
能夠看到,在類中使用了 Work
類來表明任務,下面是 Work
類的簡單摘要:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } ...
Work
類實現了 Runnable
接口,使用了線程工廠建立線程,使用 runWork
方法來運行任務
建立新線程時用到了 addWorker()
方法:
/** * 檢查在當前線程池狀態和限制下可否建立一個新線程,若是能夠,會相應改變workerCount, * 每一個worker都會運行他們的firstTask * @param firstTask 第一個任務 * @param core true使用corePoolSize做爲邊界,false使用maximumPoolSize * @return false 線程池關閉或者已經具有關閉的條件或者線程工廠沒有建立新線程 */
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 只有當rs < SHUTDOWN纔有可能接受新任務 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //工做線程數量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //不合法則返回 return false; if (compareAndIncrementWorkerCount(c)) //將工做線程數量+1 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) //判斷線程池狀態有沒有改變,改變了則進行外循環,不然只進行內循環 continue retry; // else CAS failed due to workerCount change; retry inner loop } } //建立新線程 Worker w = new Worker(firstTask); Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //再次檢查狀態,防止ThreadFactory建立線程失敗或者狀態改變了 int c = ctl.get(); int rs = runStateOf(c); if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); //減小線程數量 tryTerminate();//嘗試停止線程 return false; } workers.add(w);//添加到工做線程Set集合中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } finally { mainLock.unlock(); } t.start();//執行任務 //狀態變成了STOP(調用了shutdownNow方法) if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; }
再看 Work中
的 runWork
方法:
final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; boolean completedAbruptly = true;//線程是否異常停止 try { //先取firstTask,再從隊列中取任務直到爲null while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun(); try { beforeExecute(w.thread, task);//實現鉤子方法 Throwable thrown = null; try { task.run();//運行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//實現鉤子方法 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false;//成功運行,說明沒有異常停止 } finally { processWorkerExit(w, completedAbruptly); } }