本文首發於我的微信公衆號《andyqian》,期待你的關注微信
在上一篇文章《Java線程池ThreadPoolExecutor》中描述了ThreadPoolExecutor的基本概念,以及一些經常使用方法。這對於咱們來講,是遠遠不夠的,今天就一塊兒來看TreadPoolExecutor類的內部實現。併發
在學習ThreadPoolExecutor源碼時,首先來看看下面這段代碼,也是很是重要的一段代碼。函數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
這裏使用 AtomicInteger 類型的 ctl 變量,同時記錄線程池的運行狀態以及線程池的容量。學習
其中:this
高三位用來存儲線程池運行狀態,其他位數表示線程池的容量。spa
線程池狀態分爲RUNNING狀態,SHUTDOWN 狀態,STOP狀態,TIDYING 狀態,TERMINATED 狀態。線程
分別以下所述:3d
RUNNIN狀態 code
在該狀態下,線程池接受新任務並會處理阻塞隊列中的任務。接口
其二進制表示的,高三位值是 111。
源碼:
private static final int RUNNING = -1 << COUNT_BITS;
其中 COUNT_BITS = 29,二進制表示以下:
1110 0000 0000 0000 0000 0000 0000 0000
SHUTDOWN 狀態
在該狀態下,線程池不接受新任務,但會處理阻塞隊列中的任務。
其二進制的高三位爲: 000。
源碼:
private static final int SHUTDOW = -1 << COUNT_BITS;
其中 COUNT_BITS = 29,二進制以下所述:
0000 0000 0000 0000 0000 0000 0000 0000
STOP 狀態
在該狀態下,線程池不接受新的任務且不會處理阻塞隊列中的任務,而且會中斷正在執行的任務。
其二進制的高三位爲: 010。
源碼:
private static final int STOP = 1 << COUNT_BITS;
其中 COUNT_BITS = 29,二進制以下所述:
010 0000 0000 0000 0000 0000 0000 0000
TIDYING狀態
全部任務都執行完成,且工做線程數爲0,將要調用terminated方法。
其二進制的高三位爲: 010。
源碼:
private static final int TIDYING = 2<< COUNT_BITS;
其中 COUNT_BITS = 29,二進制以下所述:
0100 0000 0000 0000 0000 0000 0000 0000
TERMINATED狀態
最終狀態,爲執行terminated()方法後的狀態。
二進制的高三位爲110。
源碼:
private static final int TERMINATE = 3 << COUNT_BITS;
其中 COUNT_BITS = 29,二進制以下所述:
1100 0000 000 0000 0000 0000 0000 0000
其狀態的轉換關係以下:
當調用:shutdown() 方法時,其狀態由 RUNNING 狀態 轉換爲 SHUTDOWN (狀態)。
當調用:shutdownNow() 方法是,其狀態由 (RUNNING or SHUTDOWN) 轉換爲 STOP。
當阻塞隊列與線程池二者均爲空時,狀態由 SHUTDOWN 轉換爲 TIDYING。
當線程池任務爲空時,狀態由 STOP 轉換爲 TIDYING 。
當 terminated() 方法執行完成後,狀態由 TIDYING 轉換爲 TERMIN。
下面方法是執行任務的方法,代碼不難,其核心邏輯以下所示:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 判斷workerCount線程數是否小於核心線程數 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 當線程池處於運行狀態,而且向workQueue中添加執行任務。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次檢查(併發考慮),當線程池處於非running狀態時,則從workQueue移除剛添加的任務。並執reject策略。 if (! isRunning(recheck) && remove(command)) reject(command); //當線程池中的workerCount爲0時,此時workQueue中還有待執行的任務,則新增一個addWorker,消費workqueue中的任務。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 當隊列滿時,則調用addWorker方法。若是addWorker失敗,表示當前執行任務超過了當前workerQueue容量,且工做線程數數大於maximumPoolSize,則執行reject策略。 else if (!addWorker(command, false)) reject(command); }
其處理邏輯是:
當待執行任務爲空時,則拋出 NullPointerExecption 異常。
經過workerCountOf方法獲取線程池的工做線程數,當其小於核心線程數時,則經過addWorker方法添加一個核心線程,並將任務給之執行。
當工做線程數大於corePoolSize時,則判斷線程池是否處於運行狀態,並向workQueue中添加任務。基於防止併發的目的,進行了雙重檢查,若是線程池處於非運行狀態且remove任務失敗時,則執行reject方法。
當工做線程爲0時,則調用addWorker方法,建立worker消費workqueue存在的task。
當workQueue滿時,則調用addWorker方法進行添加worker。若是addWorker失敗,則說明workQueue已滿,且線程池工做數已大於maximumPoolSize,則執行reject方法。
咱們如今將目光移到addWorker方法上,其源碼以下所示:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty( return false; for (;;) { int wc = workerCountOf(c); // 若是workcount數大於線程池最大值,或者大於corePoolSize,maximumPoolSize時,則返回false。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 進行WorkCount的 CAS 操做,並結束循環。 if (compareAndIncrementWorkerCount(c)) break retry; // 當CASC操做失敗,且運行狀態已改變時,則繼續執行CAS操做。 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 其中Worker 爲實現 AbstractQueuedSynchronizer 和 Runnable 的內部類 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加鎖,控制併發 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 校驗線程是否處於處於活躍狀態 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 將 worker 添加到線程池中,其實現爲:HashSet<Worker> workers = new HashSet<Worker>(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加成功時,則調用start進行執行。 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 若是線程未啓動成功,則執行addWorkerFailed方法。 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker 類
構造函數:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //經過ThreadFactory()工廠建立線程 this.thread = getThreadFactory().newThread(this); }
由於 Worker 實現了Runable接口,在調用start()方法候,實際執行的是run方法,代碼以下所示:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try while (task != null || (task = getTask()) != null) { w.lock(); // if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //執行前的操做方法,空實現。可根據需求進行實現。 beforeExecute(wt, task); Throwable thrown = null; try { //任務執行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 執行後方法,空實現。可根據實際需求進行實現。 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }
上面簡單分析了ThreadPoolExecutor源碼,其中涉及到比較多的知識點,有鎖,有位運算等等。若是不清楚的,能夠先忽略掉,對主要流程理清楚後,再回過頭來看看不清楚的知識點,這算是我看源碼時的一個小方法。
相關閱讀: