JAVA線程池代碼淺析

005631934.jpgJava從1.5開始正式提供了併發包,而這個併發包裏面除了原子變量,synchronizer,併發容器,另一個很是重要的特性就是線程池.對於線程池的意義,咱們這邊再也不多說.html

上圖是線程池的主體類圖,ThreadPoolExecutor是應用最爲普遍的一個線程池實現(我也將在接下來的文字中詳細描述我對這個類的理解和執行機制),ScheduledThreadPoolExecutor則在ThreadPoolExecutor上提供了定時執行的等附加功能,這個能夠從ScheduledExecutorService接口的定義中看出來.Executors則相似工廠方法,提供了幾個很是經常使用的線程池初始化方法.java

ThreadPoolExecutor併發

這個類繼承了AbstractExecutorService抽象類,AbstractExecutorService主要的職責有2部分,一部分定義和實現提交任務的方法(3個submit方法的實現),實例化FutureTask而且交給子類執行,另一部分實現invokeAny,invokeAll方法.留給子類的方法爲execute方法,也就是Executor接口定義的方法.ide

[java]
//實例化一個FutureTask,交給子類的execute方法執行.這種設計可以保證callable和runnable的執行接口方法的一致性(FutureTask包裝了這個差異)
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
[/java]

關於FutureTask這個類的實現,我在前面的JAVALOCK代碼淺析有講過其實現原理,主要的思想就是關注任務完成與未完成的狀態,任務提交線程get()結果時被park住,等待任務執行完成被喚醒,任務執行線程在任務執行完畢後設置結果,而且unpark對應線程而且讓其獲得執行結果.this

回到ThreadPoolExecutor類.ThreadPoolExecutor須要實現除了咱們剛纔說的execute(Runnablecommand)方法外,還得實現ExecutorService接口定義的部分方法.但ThreadPoolExecutor所提供的不光是這些,如下根據個人理解來列一下它所具備的特性
1.execute流程
2.池
3.工做隊列
4.飽和拒絕策略
5.線程工廠
6.beforeExecute和afterExecute擴展.net

execute方法的實現有個機制很是重要,噹噹前線程池線程數量小於corePoolSize,那麼生成一個新的worker並把提交的任務置爲這個工做線程的頭一個執行任務,若是大於corePoolSize,那麼會試着將提交的任務塞到workQueue裏面供線程池裏面的worker稍後執行,並非直接再起一個worker,可是當workQueue也滿,而且當前線程池小於maxPoolSize,那麼起一個新的worker並將該任務設爲該worker執行的第一個任務執行,大於maxPoolSize,workQueue也滿負荷,那麼調用飽和策略裏面的行爲.線程

worker線程在執行完一個任務以後並不會馬上關閉,而是嘗試着去workQueue裏面取任務,若是取不到,根據策略關閉或者保持空閒狀態.因此submit任務的時候,提交的順序爲核心線程池——工做隊列——擴展線程池.設計

池包括核心池,擴展池(2者的線程在同一個hashset中,這裏只是爲了方便才這麼稱呼,並非分離的),核心池在池內worker沒有用完的狀況下,只要有任務提交都會建立新的線程,其表明線程池正常處理任務的能力.擴展池,是在覈心線程池用完,而且工做隊列也已排滿任務的狀況下才會開始初始化線程,其表明的是線程池超出正常負載時的解決方案,一旦任務完成,而且試圖從workQueue取不到任務,那麼會比較當前線程池與核心線程池的大小,大於核心線程池數的worker將被銷燬.
[java]htm

Runnable getTask() {
for (;;) {
try {
int state = runState;
//>SHUTDOWN就是STOP或者TERMINATED
//直接返回
if (state > SHUTDOWN)
return null;
Runnable r;
//若是是SHUTDOWN狀態,那麼取任務,若是有
//將剩餘任務執行完畢,不然就結束了
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
//若是不是以上狀態的(也就是RUNNING狀態的),那麼若是當前池大於核心池數量,
//或者容許核心線程池取任務超時就能夠關閉,那麼從任務隊列取任務,
//若是超出keepAliveTime,那麼就返回null了,也就意味着這個worker結束了
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
//若是當前池小於核心池,而且不容許核心線程池取任務超時就關閉,那麼take(),直到拿到任務或者被interrupt
else
r = workQueue.take();
//若是通過以上斷定,任務不爲空,那麼返回任務
if (r != null)
return r;
//若是取到任務爲空,那麼斷定是否能夠退出
if (workerCanExit()) {
//若是整個線程池狀態變爲SHUTDOWN或者TERMINATED,那麼將全部worker interrupt (若是正在執行,那繼續讓其執行)
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
//worker從workQueue中取不到數據的時候調用此方法,以決定本身是否跳出取任務的無限循環,從而結束此worker的運行
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
/**//*
*線程池狀態爲stop或者terminated,
*或者任務隊列裏面任務已經爲空,
*或者容許線程池線程空閒超時(實現方式是從工做隊列拿最多keepAliveTime的任務,超過這個時間就返回null了)而且
*當前線程池大於corePoolSize(>1)
*那麼容許線程結束
*static final int RUNNING = 0;
*static final int SHUTDOWN = 1;
*static final int STOP = 2;
*static final int TERMINATED = 3;
*/
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1,corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}
[/java]

當提交任務是,線程池都已滿,而且工做隊列也無空閒位置的狀況下,ThreadPoolExecutor會執行reject操做,JDK提供了四種reject策略,包括AbortPolicy(直接拋RejectedExceptionException),CallerRunsPolicy(提交任務線程本身執行,固然這時剩餘任務也將沒法提交),DiscardOldestPolicy(將線程池的workQueue任務隊列裏面最老的任務剔除,將新任務丟入),DiscardPolicy(無視,忽略此任務,而且當即返回).實例化ThreadPoolExecutor時,若是不指定任何飽和策略,默認將使用AbortPolicy.blog

我的認爲這些飽和策略並不十分理想,特別是在應用既要保證快速,又要高可用的狀況下,個人想法是可以加入超時等待策略,也就是提交線程時線程池滿,可以park住提交任務的線程,一旦有空閒,能在第一時間通知到等待線程.這個實際上和主線程執行類似,可是主線程執行期間即便線程池有大量空閒也不會當即能夠提交任務,效率上後者可能會比較低,特別是執行慢速任務.

實例化Worker的時候會調用ThreadFactory的addThread(Runnabler)方法返回一個Thread,這個線程工廠是能夠在ThreadPoolExecutor實例化的時候指定的,若是不指定,那麼將會使用DefaultThreadFactory,這個也就是提供給使用者命名線程,線程歸組,是不是demon等線程相關屬性設置的機會.

beforeExecute和afterExecute是提供給使用者擴展的,這兩個方法會在workerrunTask以前和run完畢以後分別調用.JDK註釋裏DougLea(concurrent包做者)展現了beforeExecute一個頗有趣的示例.代碼以下.
[java]

class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor() { super(); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
[/java]

使用這個線程池,用戶能夠隨時調用pause停止剩餘任務執行,固然也可使用resume從新開始執行剩餘任務.

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor是一個很實用的類,它的實現核心是基於DelayedWorkQueue.從ScheduledThreadPoolExecutor的繼承結構上來看,各位應該可以看出些端倪來,就是ScheduledThreadPoolExecutor將ThreadPoolExecutor中的任務隊列設置成了DelayedWorkQueue,這也就是說,線程池Worker從任務隊列中取的一個任務,須要等待這個隊列中最短超時任務的超時,也就是實現定時的效果.因此ScheduledThreadPoolExecutor所作的工做實際上是比較少的.主要就是實現任務的實例化並加入工做隊列,以及支持scheduleAtFixedRate和scheduleAtFixedDelay這種週期性任務執行.
[java]

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);
}
[/java]

對於scheduleAfFixedRate和scheduleAtFiexedDelay這種週期性任務支持,是由ScheduledThreadPoolExecutor內部封裝任務的ScheduledFutureTask來實現的.這個類在執行任務後,對於週期性任務,它會處理週期時間,並將本身再次丟入線程池的工做隊列,從而達到週期執行的目的.
[java]

private void runPeriodic() {
boolean ok = ScheduledFutureTask.super.runAndReset();
boolean down = isShutdown();
// Reschedule if not cancelled and not shutdown or policy allows
if (ok && (!down ||(getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
ScheduledThreadPoolExecutor.super.getQueue().add(this);
}
// This might have been the final executed delayed
// task. Wake up threads to check.
else if (down)
interruptIdleWorkers();
}
[/java]

2.CompletionService

005938748.jpg

ExecutorCompletionService

CompletionService定義了線程池執行任務集,能夠依次拿到任務執行完畢的Future,ExecutorCompletionService是其實現類,先舉個例子,以下代碼,這個例子中,須要注意ThreadPoolExecutor核心池必定保證可以讓任務提交而且立刻執行,而不是放到等待隊列中去,那樣次序將會沒法控制,CompletionService也將失去效果(其實核心池中的任務完成順序仍是準確的).
[java]

public static void main(String[] args) throws InterruptedException, ExecutionException{
ThreadPoolExecutor es=new ThreadPoolExecutor(10, 15, 2000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.AbortPolicy());
CompletionService<String> cs=new ExecutorCompletionService<String>(es);
cs.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.currentThread().sleep(1000);
return "i am sleeped 1000 milliseconds";
}
});
cs.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.currentThread().sleep(5000);
return "i am sleeped 5000 milliseconds";
}
});
cs.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.currentThread().sleep(4000);
return "i am sleeped 4000 milliseconds";
}
});
cs.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.currentThread().sleep(2000);
return "i am sleeped 2000 milliseconds";
}
});
for(int i=0;i<4;i++){
Future<String> fu=cs.take();
System.out.println(fu.get());
}
}
[/java]

執行結果:

i am sleeped 1000 milliseconds
i am sleeped 2000 milliseconds
i am sleeped 4000 milliseconds
i am sleeped 5000 milliseconds

從執行結果看來,咱們發現先完成的任務先被拿出來了,直到全部任務被執行完畢,也就是CompletionService的效果達到了.

ExecutorCompletionService並不複雜,關鍵的一個點就是它的內部類QueueingFuture繼承了FutureTask類,而且實現了done()方法,done()方法是在線程池任務執行完畢,最後調用FutureTask的方法(這在JAVALOCK代碼淺析(http://www.blogjava.net/BucketLi/archive/2010/09/30/333471.html)一文中對於FutureTask代碼解析有提到)

QueueingFuture的done()方法實現是將執行完的任務(FutureTask)丟入全局的完成隊列中(completionQueue),那麼take是從這個blockingqueue中取元素.也就是任務完成就會有元素,即生產者消費者.

這種實現的思想是將本來在單個FutureTask上的等待轉化爲在BlockingQueue上的等待,即對所有FutureTask的等待,從而達到哪一個先完成,哪一個就可取執行結果的效果.
[java]

private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
his.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
[/java]

總結:JUC提供的線程池體系核心是在ThreadPoolExecutor,而ScheduledThreadPoolExecutor和ExecutorCompletionService只是對其擴展,這裏沒有去細講Executors這個便捷類,這個類提供不少便捷的線程池構建方法.各位使用的時候不妨去看下.

相關文章
相關標籤/搜索