對於核心的幾個線程池,不管是newFixedThreadPool()方法,newSingleThreadExecutor()仍是newCachedThreadPool()方法,雖然看起來建立的線程有着徹底不一樣的功能特色,但其內部實現均使用了ThreadPoolExecutor實現,下面給出了三個線程池的實現方式.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
由以上線程池的實現代碼能夠看到,他們都是ThreadPoolExecutor類的封裝. 讓咱們看一下ThreadPoolExecutor最重要的構造器:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
函數的參數以下:
- corePoolSize 指定線程池中的線程數量
- maximumPoolSize 指定了線程池中的最大線程數量
- keepAliveTime 當前線程池數量超過corePoolSize時,多餘的空閒線程的存活時間,
- uintkeepAliveTime的單位
- workQueue 隊伍隊列,被提交但還沒有被執行的任務.
- threadFactory:線程工廠 用於建立線程,通常用默認便可
- handler 拒絕策略 當任務太多來不及處理,如何拒絕任務
以上參數中,大多數都很簡單,只有workQueue和handler須要進行詳細說明.
參數workQueue指被提交但未執行的任務隊列,他是一個BlockingQueue接口的對象,用於存放Runable對象,根據隊列功能分類,子ThreadPoolExecutor的構造函數中使用一下幾種BlockIngQueue.
- 直接提交的隊列,改功能由synchronousQueue對象提供,SynchronousQueue是一個特殊的BlockingQueue.這個隊列沒有容量,每個插入操做都要等待一個響應的刪除操做,反之,每個刪除操做都要等待對應的插入操做,若是使用SynchronousQueue,提交的任務不會真實的保存,而老是將新任務提交給線程執行, 若是沒有空閒的進程,則嘗試建立新的進程,若是進程數量已經達到最大值,則執行拒絕策略,使用SynchronousQueue隊列,一般要設置很大的maximumPoolSize值,不然很容易執行拒絕策略.
- 有界的任務隊列,有界的任務隊列可使用ArrayBlockingQueue實現,ArrayBlockingQueue的構造函數必須帶一個容量參數,表示該隊列的最大容量,如寫所示:
public ArrayBlockingQueue(int capacity)
當使用有界的任務隊列時,如有新的任務須要執行,若是線程池的實際線程數小於corePoolSize,則會優先建立新的新線程,若大於corePoolSize,則會將新任務加入等待隊列,若等待隊列已經滿,沒法加入,則在總線程不大於maximumPoolSize的前提下,建立新的線程執行任務,若大於maximumPoolSize,則執行拒絕策略,可見,有界隊列金當在任務隊列裝滿時,纔可能將線程數量提高到corePoolSize以上,換言之,除非系統很是繁忙.不然確保核心線程維持在corePoolSize.
- 無界的任務隊列:無界的任務隊列能夠經過LinkedBlockingQueue類實現,與有界隊列相比,除非系統資源耗盡,不然無界的任務隊列不存在任務入隊失敗的狀況,當有新的任務到來,系統的線程數小於corePoolSize時,線程池會生成新的線程執行任務,但當系統的線程數達到corePoolSize後,就並不會繼續增長,若後續仍有席你的任務加入,而又沒有空閒的線程資源,責任務直接進入對列等待,若任務建立和處理的速度差別很大,無界隊列會保持快速增加,直到耗盡系統內存.
- 優先任務隊列:優先任務隊列是帶有執行優先級的隊列,它經過PriorityBlockingQueue實現,能夠控制任務的執行順序,他是一個特殊的無界隊列,不管是有界隊列ArrayBlockingQueue,仍是未指定大小的無界隊列LinkedBlockingQueue都是按照先進先出的算法處理任務的,而PriorityBlockingQueue則能夠根據自身的優先級順序前後執行,確保系統性能的同時,也能有很好的質量保證.
回顧newFixedThreadPool()的方法實現,
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
咱們發現它用了corePoolSize和maximumPoolSize大小同樣,而且使用了LingkedBlockingQueue任務隊列的線程池.由於固定大小的線程池而言,不存在線程數量的動態變化,同時它使用無界隊列存放沒法當即執行的任務,當任務提交很是頻繁的時候,改隊列可能迅速膨脹.從而耗盡系統性能.
newSingleThreadExecutor()返回的單線程線程池,是newFixedThreadPool()方法的一種退化,只是簡單的將線程池線程數量設置爲1
newCachedThreadPool()方法的實現:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
這就意味着無任務時,線程池內無線程,而當任務提交時,該線程池會使用空閒的線程執行任務,若無空閒線程,則將任務加入SynchronousQueue隊列,而SynchronousQueue隊列是一種直接提交的隊列,他總會迫使線程池增長新的線程執行任務,.當任務執行完畢後,因爲corePoolSize爲0 所以空閒線程又會在指定的60s內回收.
對於這個線程池,若是同時有大量任務被提交,而任務的執行又不那麼快,那麼系統便會開啓等量的線程處理,這樣作法可能會很快耗盡系統的資源,
這裏咱們看一看ThreadPoolExecutor線程池的核心調度代碼,這段代碼也充分體現了上述線程池的工做邏輯:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
workerCountOf()方法取得了當前線程池的線程總數,當線程總數小於corePoolSize核心線程數時,會將任務經過addWorker()方法直接調度執行,不然則在workQueue.offer()進入等待隊列,若是進入等待隊列失敗,則會執行將任務直接提交給線程池,若是當期已經達到maximumPoolSize,則提交失敗,執行拒絕策略.
ThreadPoolExecutor的最後一個參數制定了拒絕策略,也就是當任務數量超過系統實際承載能力時,該如何處理呢?這時候就要用到拒絕策略了,,
JDK內置提供了四種拒絕策略.
- AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工做
- CallerRunsPolicy策略,只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務.顯然這樣作不會真的丟棄任務,可是任務提交線程的性能極有可能會急劇降低.
- DiscardOledestPolicy策略: 改策略將丟棄最古老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務.
- DiscardPolicy策略,該策略默默的丟棄沒法處理的任務,不與任何處理,若是容許人物丟棄,我以爲這多是最好的一種方案了吧!
以上內置策略均實現了RejectedExecutionHandler接口 若以上策略仍沒法知足實際應用須要,徹底能夠本身拓展RejectedExecutionHandler接口 定義以下:
/**
* A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
*
* @since 1.5
* @author Doug Lea
*/
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
其中r爲請求執行的任務,executor爲當前線程池.
咱們簡單的自定義線程池和拒絕策略的使用:
public class RejectThreadPoolDemo {
public static class MyTask implements Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10),
Executors.defaultThreadFactory(),
(r, executor) -> System.out.println(r.toString() + " is discard"));
for (int i = 0; i < Integer.MAX_VALUE; i++) {
es.submit(task);
Thread.sleep(10);
}
}
}
}
上訴代碼咱們自定義了一個線程池,該池子有5個常駐線程,而且最大的線程數也是5個,這和固定大小的線程池是同樣的,可是他卻擁有一個只有10個容量的等待隊列,由於使用無界隊列極可能不是最佳解決方案,若是任務量極大,極可能會吧內存呈爆,給一個合理的隊列大小,也合乎常理的選擇,同時,這裏定義了拒絕策略,.咱們不拋出異常,由於萬一在任務提交端沒有進行異常處理,則有可能使得整個系統都崩潰,這極有可能不是咱們但願遇到的,但做爲必要的信息記錄,咱們將任務丟棄的信息進行打印.固然這是比內置的DiscardPolicy策略高級那麼一點點,
因爲上述代碼中,MyTask執行須要花費100毫秒,所以 必然致使大量的任務被直接丟棄,輸入以下:
在實際的應用中,咱們能夠將更詳細的信息記錄到日誌中,來分析系統的負載和任務丟失的狀況