參考連接:https://blog.csdn.net/rebirth_love/article/details/51954836java
比較實用的RejectedExecutionHandler實現,丟棄最老的任務設計模式
DiscardOldestPolicy
第一部分:ThreadPoolExecutor的繼承結構緩存
根據上圖能夠知道,ThreadPoolExecutor是繼承的AbstractExecutorService(抽象類)。再來看一下AbstractExecutorService的結構能夠發現,AbstractExecutorService實現了ExecutorService,而且ExecutorService繼承Executor接口。併發
以下是Executor和ExecutorService接口中一些方法:函數
public interface Executor {
void execute(Runnable command);
}工具
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}oop
能夠簡單總結一下:Executor這個接口只有一個方法execute(Runnable command)(以Command Pattern(命令模式)的設計模式實現);在ExecutorService接口中一部分是和執行器生命週期相關的方法,而另外一部分則是以各類方式提交要執行的任務的方法。像submit()就是提交任務的一個方法,在實現中作了適配的工做,不管參數是Runnable仍是Callable,執行器都會正確執行。ExecutorService中,和生命週期相關的,聲明瞭5個方法:源碼分析
那麼再來看一下AbstractExecutorService的源碼:性能
能夠看出來:AbstractExecutorService這個類是ExecutorService的一個抽象實現。其中,提交任務的各種方法已經給出了十分完整的實現。之因此抽象,是由於和執行器自己生命週期相關的方法在此類中並未給出任何實現,須要子類擴展完善(模板方法設計模式)拿一個submit方法出來分析一下:ui
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
從代碼能夠看出實際上用到的是RunnableFuture的實現類FutureTask。但最終仍是調用了execute()方法,在子類中實現。
在正式進入ThreadPoolExecutor源碼分析以前還須要補充一點的是:Executors(工廠方法設計模式)java.util.concurrent.Executors是個工具類,提供了不少靜態的工具方法。其中不少對於執行器來講就是初始化構建用的工廠方法。
這些方法返回的ExecutorService對象最終都是由ThreadPoolExecutor實現的,根據不一樣的需求以不一樣的參數配置,或通過其它類包裝。其中,Executors中的一些內部類就是用來作包裝用的。Executors類中還有靜態的defaultThreadFactory()方法,固然也能夠本身實現自定義的ThreadFactory。
第二部分:ThreadPoolExecutor源碼分析
下面正式進入ThreadPoolExecutor:(按照程序運行順序分析)
一、ThreadPoolExecutor的全參數構造方法:
根據註釋:
二、execute方法提交任務
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
經過註釋:提交新任務的時候,若是沒達到核心線程數corePoolSize,則開闢新線程執行。若是達到核心線程數corePoolSize, 而隊列未滿,則放入隊列,
不然開新線程處理任務,直到maximumPoolSize,超出則丟棄處理。同時判斷目前線程的狀態是否是RUNNING其餘線程有可能調用了shutdown()或shutdownNow()方法,關閉線程池,
致使目前線程的狀態不是RUNNING。在上面提交任務的時候,會出現開闢新的線程來執行,這會調用addWorker()方法。
三、addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
第一部分:第一段從第3行到第26行,是雙層無限循環,嘗試增長線程數到ctl變量,而且作一些比較判斷,若是超出線程數限定或者ThreadPoolExecutor的狀態不符合要求,
則直接返回false,增長worker失敗。第二部分:從第28行開始到結尾,把firstTask這個Runnable對象傳給Worker構造方法,賦值給Worker對象的task屬性。
Worker對象把自身(也是一個Runnable)封裝成一個Thread對象賦予Worker對象的thread屬性。鎖住整個線程池並實際增長worker到workers的HashSet對象當中。
成功增長後開始執行t.start(),就是worker的thread屬性開始運行,實際上就是運行Worker對象的run方法。
Worker的run()方法實際上調用了ThreadPoolExecutor的runWorker()方法。在看runWorker()以前先看一下Worker對象。
四、Worker對象
Worker是真正的任務,是由任務執行線程完成,它是ThreadPoolExecutor的核心。每一個線程池中,有爲數不等的Worker對象,每一個Worker對象中,包含一個須要當即執行的新任務和已經執行完成的任務數量,Worker自己,是一個Runnable對象,不是Thread對象它內部封裝一個Thread對象,用此對象執行自己的run方法,而這個Thread對象則由ThreadPoolExecutor提供的ThreadFactory對象建立新的線程。(將Worker和Thread分離的好處是,若是咱們的業務代碼,須要對於線程池中的線程,賦予優先級、線程名稱、線程執行策略等其餘控制時,能夠實現本身的ThreadFactory進行擴展,無需繼承或改寫ThreadPoolExecutor。)
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
它既實現了Runnable,同時也是一個AQS ( AbstractQueuedSynchronizer )。封裝了3樣東西,Runnable類的首個任務對象,執行的線程thread和完成的任務數(volatile)completedTasks。這個類還提供了interruptIfStarted()這樣一個方法,裏面作了(getState()>=0)的判斷。與此呼應,Worker的構造方法裏對state設置了-1,避免在線程執行前被停掉。注意:第一個須要執行的任務 當有新的任務須要調度,而且須要建立新的線程時,在構造函數中爲其賦值,此時新任務不放入任務緩存隊列目的是減小任務緩存隊列入隊和出隊的操做,提升調度性能(任務緩存隊列的入隊和出隊操做,會涉及鎖定和併發處理)。
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
根據代碼順序看下來,其實很簡單。
接下來看一下getTask()是怎樣實現空閒線程複用的
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}getTask()其實是從工做隊列(workQueue)中取提交進來的任務。這個workQueue是一個BlockingQueue,一般當隊列中沒有新任務的時候,則getTask()會阻塞。另外,還有定時阻塞這樣一段邏輯:若是從隊列中取任務是計時的,則用poll()方法,並設置等待時間爲keepAlive,不然調用阻塞方法take()。當poll()超時,則獲取到的任務爲null,timeOut設置爲 true。這段代碼也是放在一個for(;;)循環中,前面有判斷超時的語句,若是超時,則return null。這意味着runWorker()方法的while循環結束,線程將退出,執行processWorkerExit()方法。