轉載自http://www.linuxidc.com/Linux/2014-11/108791.htmlinux
相關類Executor,Executors,AbstractExecutorService,ExecutorService緩存
Executor:整個線程池執行者框架的頂層接口。定義了一個execute方法,整個線程執行者框架的核心方法。安全
public interface Executor {框架
void execute(Runnable command);
}函數
ExecutorService:這是一個接口它繼承自Executor,定義了shutdown,shutdownNow,awaitTermination,submit,invokeAll等方法。this
AbstractExecutorService:實現了ExecutorService接口中的submit,invokeAll的方法。線程
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}設計
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}htm
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}對象
在這裏,全部submit方法提交的任務最終仍是調用了execute方法,execute是接口Executor中定義的方法,AbstractExecutorService沒有實現它,
須要子類去實現這個方法,ThreadPoolExecutor繼承了AbstractExecutorService,它實現了execute方法。ScheduledThreadPoolExecutor繼承自
ThreadPoolExecutor,並覆蓋了ThreadPoolExecutor的execute方法。這個方法是線程執行框者架的核心邏輯,不一樣的線程池執行者有不一樣的實現邏輯。
AbstractExecutorService的功能較爲簡單,實現了不一樣參數的submit,invokeAll方法。
ThreadPoolExecutor線程池執行者:它有一個核心的成員變量:
private final HashSet<Worker> workers = new HashSet<Worker>();
workers能夠看作是ThreadPoolExecutor中用於運行任務的線程池。
worker是一個封裝了一個Thread對象並實現了Runnable接口的類。封裝Thread很容易理解,由於它要利用Thread去運行execute方法提交過來的runnable任務,可是爲何會繼承runnable接口呢?
下面是剔除了部分代碼的Worker源碼:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
Worker是ThreadPoolExecutor的一個內部類,Worker自己實現了Runnable接口,並封裝了一個Thread對象,最後在構造方法中獲取了一個Runnable對象,這個對象就是ThreadPoolExecutor經過execute提交過來的目標任務。
跟蹤runWorker(this)方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//在這裏直接調用了目標任務的run方法,並無將它傳給Thread對象。
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
回過頭來在看看Worker的構造方法:
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
它將本身傳給了本身的成員變量thread。目標任務被執行的步驟可能就是:Worker的成員變量thread啓動後調用worker的run方法,worker的run方法中將本身傳給runWorker,runWorker在調用目標執行對象的run方法。
那麼thread是什麼時候被執行的呢?
下面看看ThreadPoolExecutor中的一個其餘方法:
private boolean addWorker(Runnable firstTask, boolean core) {
......
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;//這裏初始化一個Worker對象w,在將w的成員變量thread付給t
if (t != null) {
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//在這裏調用t的start方法。
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
這裏爲何會設計的這麼繞,我想主要是Worker不只封裝了一個thread,並且對目標任務進行了封裝,在運行封裝事後的目標任務前,addWorker能夠作一些相關操做。
這裏僅僅介紹了ThreadPoolExecutor的線程池,那麼這個線程池是如何被維護的,下面介紹幾個關鍵的參數。
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final BlockingQueue<Runnable> workQueue;
這三個是ThreadPoolExecutor的成員變量,其中workQueue跟縣城池沒有關係。workQueue是一個線程安全的阻塞隊列。
corePoolSize是線程池的核心大小,maximumPoolSize是線程池的最大大小。
當提交新任務時,若是ThreadPoolExecutor中有線程在運行,而且線程的數量小於corePoolSize,那麼就會有新的線程被建立。若是當前運行的線程數大於corePoolSize,就會放到緩存隊列workQueue中。若是緩衝隊列也滿了,就繼續建立線程,直到線程的數量達到maximumPoolSize
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//判斷若是當前運行的線程數小於 corePoolSize,添加新的線程(addWorker會添加一個新的線程,上面有介紹),方法直接返回。
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//若是當前的運行的線程數大於或等於corePoolSize則新的任務會放到緩存隊列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
//最後緩衝隊列添加失敗,則會繼續添加線程。若是添加新的線程失敗,則拒絕這個任務。
reject(command);
}
還有些其餘的參數:
private volatile ThreadFactory threadFactory //線程的工廠函數。
private volatile RejectedExecutionHandler handler;//任務拒絕的處理類。
private volatile long keepAliveTime;//任務等待的是將。
ThreadPoolExecutor有幾個構造方法來初始化這些參數。Executors類將這些參數簡化了來得到一個ExecutorService的引用。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
這四個方法中前兩個的核心線程數和最大線程數相同,全部可運行的線程數是固定的,<=nThreads。當任務數大於nThreads時,就是放入緩衝隊列中。 後兩個方法中,線程數是無邊界的,核心線程數是0,最大線程數是整型的最大值,而後若是有線程60秒內沒有任務運行的話就銷燬。每次有新的任務來,都會建立新的線程或使用之前建立的線程(60秒內沒有任務運行的線程)。你可能有疑問,既然核心線程數是0,那麼全部的任務不是都放到隊裏裏了嗎?那麼如今就來看看SynchronousQueue這個隊裏,能夠看看這裏的介紹 http://www.linuxidc.com/Linux/2014-11/108792.htm 。
回過頭來看看任務提交方法的源碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//這裏是在往隊列裏方任務,若是不成功就會添加Worker(封裝了線程對象)
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
上面連接裏提到:offer()往queue裏放一個element後當即返回,若是碰巧這個element被另外一個thread取走了,offer方法返回true,認爲offer成功;不然返回false。
試想一下,第一次提交任務的時候,核心線程數爲0,此時沒有線程因此沒有線程從workQueue中取東西,因此這裏的workQueue.offer(command)會返回false,那麼就會經過addWorker(command, false)建立一個新的線程。