編者注:Java中的線程池是運用場景最多的併發組件,幾乎全部須要異步或併發執行任務的程序均可以使用線程池。java
在開發過程當中,合理地使用線程池可以帶來至少如下幾個好處。web
下降資源消耗:經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。算法
提升響應速度:當任務到達時,任務能夠不須要等到線程建立就能當即執行。數據庫
提升線程的可管理性:線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。可是,要作到合理利用線程池,必須瞭解其實現原理。數組
代碼解耦:好比生產者消費者模式。安全
線程池實現原理
當提交一個新任務到線程池時,線程池的處理流程以下:微信
若是當前運行的線程少於corePoolSize,則建立新線程來執行任務(注意,執行這一步驟須要獲取全局鎖)。併發
若是運行的線程等於或多於corePoolSize,則將任務加入BlockingQueue。app
若是沒法將任務加入BlockingQueue(隊列已滿),則建立新的線程來處理任務(注意,執行這一步驟也須要獲取全局鎖)。異步
若是建立新線程將使當前運行的線程數超出maximumPoolSize,該任務將被拒絕,並調用相應的拒絕策略來處理(RejectedExecutionHandler.rejectedExecution()方法,線程池默認的飽和策略是AbortPolicy,也就是拋異常)。
ThreadPoolExecutor採起上述步驟的整體設計思路,是爲了在執行execute()方法時,儘量地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱以後(當前運行的線程數大於等於corePoolSize),幾乎全部的execute()方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。
線程池任務 拒絕策略包括 拋異常、直接丟棄、丟棄隊列中最老的任務、將任務分發給調用線程處理。
線程池的建立:經過ThreadPoolExecutor來建立一個線程池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, runnableTaskQueue, handler);
建立一個線程池時須要輸入如下幾個參數:
corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,等到線程池的線程數等於線程池基本大小時就再也不建立。若是調用了線程池的prestartAllCoreThreads()方法,線程池會提早建立並啓動全部基本線程。
maximumPoolSize(線程池最大數量):線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是,若是使用了無界的任務隊列這個參數就沒什麼效果。
keepAliveTime(線程活動保持時間):線程池的工做線程空閒後,保持存活的時間。因此,若是任務不少,而且每一個任務執行的時間比較短,能夠調大時間,提升線程的利用率。
TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。
runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。能夠選擇如下幾個阻塞隊列。
- ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。
LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量一般要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
PriorityBlockingQueue:一個具備優先級的無界阻塞隊列。
線程的狀態
在HotSpot VM線程模型中,Java線程被一對一映射到本地系統線程,Java線程啓動時會建立一個本地系統線程;當Java線程終止時,這個本地系統線程也會被回收。操做系統調度全部線程並把它們分配給可用的CPU。
thread運行週期中,有如下6種狀態,在 java.lang.Thread.State 中有詳細定義和說明:
// Thread類
public enum State {
/**
* 剛建立還沒有運行
*/
NEW,
/**
* 可運行狀態,該狀態表示正在JVM中處於運行狀態,不過有多是在等待其餘資源,好比CPU時間片,IO等待
*/
RUNNABLE,
/**
* 阻塞狀態表示等待monitor鎖(阻塞在等待monitor鎖或者在調用Object.wait方法後從新進入synchronized塊時阻塞)
*/
BLOCKED,
/**
* 等待狀態,發生在調用Object.wait、Thread.join (with no timeout)、LockSupport.park
* 表示當前線程在等待另外一個線程執行某種動做,好比Object.notify()、Object.notifyAll(),Thread.join表示等待線程執行完成
*/
WAITING,
/**
* 超時等待,發生在調用Thread.sleep、Object.wait、Thread.join (in timeout)、LockSupport.parkNanos、LockSupport.parkUntil
*/
TIMED_WAITING,
/**
*線程已執行完成,終止狀態
*/
TERMINATED;
}
線程池操做
向線程池提交任務,可使用兩個方法向線程池提交任務,分別爲execute()和submit()方法。execute()方法用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功。經過如下代碼可知execute()方法輸入的任務是一個Runnable類的實例。
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
submit()方法用於提交須要返回值的任務。線程池會返回一個future類型的對象,經過這個future對象能夠判斷任務是否執行成功,經過future的get()方法來獲取返回值,future的get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後當即返回,這時候有可能任務尚未執行完。
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理沒法執行任務異常
} finally {
// 關閉線程池
executor.shutdown();
}
合理配置線程池
要想合理配置線程池,必須先分析任務的特色,能夠從如下幾個角度分析:
任務的性質:CPU密集型任務、IO密集型任務和混合型任務。
任務的優先級:高、中和低。
任務的執行時間:長、中和短。
任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。
性質不一樣的任務能夠用不一樣規模的線程池分開處理。CPU密集型任務應配置儘量少的線程,如配置Ncpu+1個線程的線程池。因爲IO密集型任務線程並非一直在執行任務,則應配置多一點線程,如2*Ncpu。混合型的任務,若是能夠拆分,將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於串行執行的吞吐量。若是這兩個任務執行時間相差太大,則不必進行分解。能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。
優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先執行。執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者可使用優先級隊列,讓執行時間短的任務先執行。依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,等待的時間越長,則CPU空閒時間就越長,那麼線程數應該設置得越大,這樣才能更好地利用CPU。
線程池中線程數量未達到coreSize時,這些線程處於什麼狀態?
這些線程處於RUNNING或者WAITING,RUNNING表示線程處於運行當中,WAITING表示線程阻塞等待在阻塞隊列上。當一個task submit給線程池時,若是當前線程池線程數量還未達到coreSize時,會建立線程執行task,不然將任務提交給阻塞隊列,而後觸發線程執行。(從submit內部調用的代碼也能夠看出來)
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。它主要用來在給定的延遲以後運行任務,或者按期執行任務。ScheduledThreadPoolExecutor的功能與Timer相似,但ScheduledThreadPoolExecutor功能更強大、更靈活。Timer對應的是單個後臺線程,而ScheduledThreadPoolExecutor能夠在構造函數中指定多個對應的後臺線程數。
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,ScheduledThreadPoolExecutor和ThreadPoolExecutor的區別是,ThreadPoolExecutor獲取任務時是從BlockingQueue中獲取的,而ScheduledThreadPoolExecutor是從DelayedWorkQueue中獲取的(注意,DelayedWorkQueue是BlockingQueue的實現類)。
ScheduledThreadPoolExecutor把待調度的任務(ScheduledFutureTask)放到一個DelayQueue中,其中ScheduledFutureTask主要包含3個成員變量:
sequenceNumber:任務被添加到ScheduledThreadPoolExecutor中的序號;
time:任務將要被執行的具體時間;
period:任務執行的間隔週期。
ScheduledThreadPoolExecutor會把待執行的任務放到工做隊列DelayQueue中,DelayQueue封裝了一個PriorityQueue,PriorityQueue會對隊列中的ScheduledFutureTask進行排序,具體的排序比較算法實現以下:
ScheduledFutureTask在DelayQueue中被保存在一個PriorityQueue(基於數組實現的優先隊列,相似於堆排序中的優先隊列)中,在往數組中添加/移除元素時,會調用siftDown/siftUp來進行元素的重排序,保證元素的優先級順序。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
private Thread leader = null;
private final Condition available = lock.newCondition();
}
從DelayQueue獲取任務的主要邏輯就在take()方法中,首選獲取lock,而後獲取queue[0],若是爲null則await等待任務的來臨,若是非null查看任務是否到期,是的話就執行該任務,不然再次await等待。這裏有一個leader變量,用來表示當前進行awaitNanos等待的線程,若是leader非null,表示已經有其餘線程在進行awaitNanos等待,本身await等待,不然本身進行awaitNanos等待。
// DelayedWorkQueue
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
獲取到任務以後,就會執行task的run()方法了,即ScheduledFutureTask.run():
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
推薦閱讀
本文分享自微信公衆號 - TopCoder(gh_12e4a74a5c9c)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。