1、線程池狀態java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
RUNNING : 該狀態的線程池會接收新的任務,並處理阻塞隊列中的任務。ide
SHUTDOWN : 該狀態的線程池不會接收新的任務,但會處理阻塞隊列中的任務。oop
STOP : 該狀態的線程池不會接收新的任務,也不會處理阻塞隊列中的任務,並且會中斷正在執行的任務。ui
2、任務提交 方式this
一、executespa
提交的任務必須實現Runnable接口,接口不帶返回值線程
public void execute(Runnable command) {
二、submit
code
父類AbstractExecutorService提供有submit接口,可獲取線程執行返回值。 blog
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
3、任務執行 -- execute繼承
execute 方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
大體流程爲:
一、經過workerCountOf方法獲得線程池的當前線程數,若是當前線程數小於corePoolSize,則執行addWorker方法建立一個新的核心線程執行任務。
二、若是當前線程數大於等於corePoolSize時,檢查線程池的運行狀態,若是線程池運行狀態爲RUNNING,則嘗試將任務加入阻塞隊列。
三、再次檢查線程池的運行狀態,若是運行狀態不爲RUNNING,則從阻塞隊列中刪除任務並執行reject方法調用處理機制。
四、在2的基礎上,若是加入阻塞隊列失敗,則會執行addWorker方法建立一個新的非核心線程執行任務。
五、在3的基礎上,若是addWorker執行失敗,則會調用reject調用處理機制。
addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
大體流程爲:
一、自旋檢測線程池狀態,若是狀態大於SHUTDOWN,或者 firstTask爲空 或隊列爲空 時,返回任務加入隊列失敗。
二、獲取線程池當前線程數,經過core判斷是不是建立核心線程,若是爲true,而且當前線程數wc小於corePoolSize時,跳出循環建立新的線程。若是core爲false,
則判斷當前線程數wc是否小於maximumPoolSize,小於跳出循環。
三、線程池的工做線程時候經過Worker實現的,經過ReentrantLock加鎖,再次經過線程池狀態監測以後,將worker加入到HashSet<Worker> workers 裏面
四、若是加入成功,則啓動Worker中的線程。
Worker類
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
Worker類繼承了AbstractQueuedSynchronizer(AQS)類,能夠方便的實現工做線程的停止操做。
而且自己實現了Runnable接口,可單獨做爲任務在工做線程中執行。
runWorker 方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
runWorker流程:
一、線程啓動以後,經過unlock方法釋放鎖,設置AQS的state爲0,表示運行中斷;
二、獲取第一個任務firstTask,並執行task的run方法,在執行run方法前,會對Worker加鎖,任務執行完釋放鎖。
三、在任務執行先後,可根據業務自定義實現beforeExecute(wt, task); 和 afterExecute(task, thrown);。
四、任務執行完以後,調用getTask從阻塞隊列中獲取等待的任務,若是隊列中沒有任務,getTask方法會被阻塞並掛起,不會佔用CPU資源。
getTask方法
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
getTask流程:
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
一、若是設定了超時機制,則經過 workQueue.poll()方法來獲取阻塞隊列中的任務,若是隊列中沒有任務,則會在keepAliveTime時間後返回null。
二、若是未設置超時機制,而且當前線程數小於核心線程時,同時未設置容許核心線程超時的狀況下,經過workQueue.take(); 方法來獲取阻塞隊列中的任務,若是沒有任務,
則會一直等待並掛起,直到有新任務提交時,則會環信等待的隊列並返回新的任務。
三、阻塞隊列使用生產者與消費者模式,使用等待與喚醒使線程池線程掛起與喚起。
4、任務執行 -- submit
submit重載了多種實現方式
一、Callable
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
二、Runnable
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
在實際業務中,Future和Callable是成雙出現的,Callable負責產生結果,Future負責獲取結果。
一、Callable相似於Runnable,只是Callable附帶返回值。
二、Callable除了正常返回以外,若是線程出現異常,該異常也會返回,即Future的get方法能夠獲取到異常結果。
三、Future的get()方法會致使主線程阻塞,直到Callable執行完成。
FutureTask
futureTask內部狀態
* Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
FutureTask 實現了Runnable接口,提交的任務能夠交由工做線程處理,執行run方法。
get方法
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
調用get方法時,若是task的狀態處於執行中或初始化,調用awaitDone方法對線程進行阻塞。
awaitDone方法
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
經過對Task的狀態檢測,若是Callable未執行完成,使用 LockSupport.park(this); 對當前線程進行阻塞。等待喚起,並將主線程封裝成WaitNode 並存放在 waiters 鏈表中。
run方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
run方法流程:
經過對task的state判斷,若是task爲初始New狀態,則執行call方法,獲取call方法返回結果,並調用set方法
若是執行失敗,則調用setException方法。
setException方法
設置狀態 EXCEPTIONAL
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
set方法
設置狀態 NORMAL
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
finishCompletion();方法
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
若是finishCompletion 檢測到 經過get方法被阻塞的線程集 waiters 不爲空時,獲取的每個節點,並使用 LockSupport.unpark(t); 對其喚醒。
最終使用report返回結果。