要求:線程資源必須經過線程池
提供,不容許在應用自行顯式建立線程; 說明:使用線程池的好處是減小在建立和銷燬線程上所花的時間以及系統資源的開銷,解決資源不足的問題。若是不使用線程池,有可能形成系統建立大量同類線程而致使消耗內存或者「過分切換」的問題。java
by 《阿里巴巴Java手冊》編程
線程池,顧名思義是一個放着線程的池子,這個池子的線程主要是用來執行任務的。當用戶提交任務時,線程池會建立線程去執行任務,若任務超過了核心線程數的時候,會在一個任務隊列裏進行排隊等待,這個詳細流程,咱們會後面細講。 任務,一般是一些抽象的且離散的工做單元,咱們會把應用程序的工做分解到多個任務中去執行。通常咱們須要使用多線程執行任務的時候,這些任務最好都是相互獨立的,這樣有必定的任務邊界供程序把控。 多線程,當使用多線程的時候,任務處理過程就能夠從主線程中剝離出來,任務能夠並行處理,同時處理多個請求。固然了,任務處理代碼必須是線程安全的。數組
一共有7個:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler,(5+2,前5個重要)緩存
這邊咱們區分兩個概念:安全
當前線程總數< corePoolSize
,新建的線程即爲核心線程。當前線程總數< corePoolSize
,新建的線程即爲核心線程。核心線程默認狀況下會一直存活在線程池中,即便這個核心線程不工做(空閒狀態),除非ThreadPoolExecutor 的 allowCoreThreadTimeOut
這個屬性爲 true
,那麼核心線程若是空閒狀態下,超過必定時間後就被銷燬。多線程
線程總數 = 核心線程數 + 非核心線程數併發
keepAliveTime即爲空閒線程容許的最大的存活時間。若是一個非核心線程空閒狀態的時長超過keepAliveTime了,就會被銷燬掉。注意:若是設置allowCoreThreadTimeOut = true
,就變成核心線程超時銷燬了。less
TimeUnit 是一個枚舉類型,列舉以下:性能
單位 | 說明 |
---|---|
NANOSECONDS | 1微毫秒 = 1微秒 / 1000 |
MICROSECONDS | 1微秒 = 1毫秒 / 1000 |
MILLISECONDS | 1毫秒 = 1秒 /1000 |
SECONDS | 秒 |
MINUTES | 分 |
HOURS | 小時 |
DAYS | 天 |
當核心線程都在工做的時候,新提交的任務就會被添加到這個工做阻塞隊列中進行排隊等待;若是阻塞隊列也滿了,線程池就新建非核心線程去執行任務。workQueue維護的是等待執行的Runnable對象。 經常使用的 workQueue 類型:(無界隊列、有界隊列、同步移交隊列)ui
適用於很是大的或者無界的線程池,能夠避免任務排隊
,SynchronousQueue隊列接收到任務後,會直接將任務從生產者移交給工做者線程
,這種移交機制高效。它是一種不存儲元素的隊列,任務不會先放到隊列中去等線程來取,而是直接移交給執行的線程。只有當線程池是無界的或能夠拒絕任務的時候,SynchronousQueue隊列的使用纔有意義,maximumPoolSize 通常指定成 Integer.MAX_VALUE,即無限大。要將一個元素放入SynchronousQueue,就須要有另外一個線程在等待接收這個元素。若沒有線程在等待,而且線程池的當前線程數小於最大值,則ThreadPoolExecutor就會新建一個線程;不然,根據飽和策略,拒絕任務。newCachedThreadPool
默認使用的就是這種同步移交隊列。吞吐量高於LinkedBlockingQueue。鏈表結構
的阻塞隊列,FIFO原則排序
。當任務提交過來,若當前線程數小於corePoolSize核心線程數,則線程池新建核心線程去執行任務;若當前線程數等於corePoolSize核心線程數,則進入工做隊列進行等待。LinkedBlockingQueue隊列沒有最大值限制,只要任務數超過核心線程數,都會被添加到隊列中,這就會致使總線程數永遠不會超過 corePoolSize
,因此maximumPoolSize 是一個無效設定。newFixedThreadPool
和newSingleThreadPool
默認是使用的是無界LinkedBlockingQueue隊列
。吞吐量高於ArrayBlockingQueue。數組結構
的有界
阻塞隊列,能夠設置隊列上限值,FIFO原則排序
。當任務提交時,若當前線程小於corePoolSize核心線程數,則新建核心線程執行任務;若當先線程數等於corePoolSize核心線程數,則進入隊列排隊等候;若隊列的任務數也排滿了,則新建非核心線程執行任務;若隊列滿了且總線程數達到了maximumPoolSize最大線程數,則根據飽和策略進行任務的拒絕。注意: 只有當任務相互獨立沒有任何依賴的時候,線程池或工做隊列設置有界是合理的;若任務之間存在依賴性,須要使用無界的線程池,如newCachedThreadPool,不然有可能會致使死鎖問題。
建立線程的方式,這是一個接口,你 new 他的時候須要實現他的 Thread newThread(Runnable r) 方法,通常用不上,
拋出異常專用,當隊列和最大線程池都滿了以後的飽和策略。
通常流程即爲:建立worker線程;添加任務入workQueue隊列;worker線程執行任務。
未達到 corePoolSize
,則新建一個線程(核心線程)
執行任務達到了 corePoolSize
,則將任務移入阻塞隊列等待
,讓空閒線程處理;隊列已滿
,新建線程(非核心線程)
執行任務總線程數又達到了 maximumPoolSize
,就會按照拒絕策略處理沒法執行的任務,好比RejectedExecutionHandler拋出異常。這邊,爲了你們可以更好的去理解這塊的流程,咱們舉一個例子。生活中咱們常常會去打一些公司的諮詢電話或者是一些特定機構的投訴電話,而那個公司或者機構的客服中心就是一個線程池
,正式員工的客服小姐姐就比如是核心線程
,好比有6個客服小姐姐。 5. 當用戶的電話打進到公司的客服中心的時候(提交任務)
; 6. 客服中心會調度客服小姐姐去接聽電話(建立線程執行任務)
,若是接聽的電話超過了6個,6個客服小姐姐都在接聽的工做狀態了(核心線程池滿了)
,這時客服中心會有一個電話接聽等待通道(進入任務隊列等待)
,就是咱們常常聽到的「您的通話在排隊,前面排隊n人。」 7. 固然,這個電話接聽等待通道也是有上限的,當超過這個上限的時候(任務隊列滿了)
,客服中心就會當即安排外協員工(非核心線程)
,也就是非正式員工去接聽額外的電話(任務隊列滿了,正式和非正式員工數量>總任務數,線程池建立非核心線程去執行任務)
。 8. 當用戶電話數激增,客服中心控制檯發現這個時候正式員工和外協員工的總和已經知足不了這些用戶電話接入了(總線程池滿)
,就開始根據一些公司電話接聽規則去拒絕這些電話(按照拒絕策略處理沒法執行的任務)
terminated()
後會更新爲這個狀態。ThreadPoolExecutor
,在java.util.concurrent下。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */
public ThreadPoolExecutor(int corePoolSize, //核心線程數 int maximumPoolSize, //最大線程數 long keepAliveTime, //空閒線程存活時間 TimeUnit unit, //存活時間單位 BlockingQueue<Runnable> workQueue, //任務的阻塞隊列 ThreadFactory threadFactory, //新線程的產生方式 RejectedExecutionHandler handler //拒絕策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
ThreadPoolExecutor 繼承 AbstractExecutorService;AbstractExecutorService 實現 ExecutorService, ExecutorService 繼承 Executor
public class ThreadPoolExecutor extends AbstractExecutorService {}
public abstract class AbstractExecutorService implements ExecutorService {}
public interface ExecutorService extends Executor {}
複製代碼
1)5參數構造器
// 5參數構造器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 複製代碼
2)6參數構造器-1
// 6參數構造器-1
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 複製代碼
3)6參數構造器-2
// 6參數構造器-2
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 複製代碼
4)7參數構造器
// 7參數構造器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼
//建立固定數目線程的線程池
Executors.newFixedThreadPool(200);
//建立一個無限線程的線程池,無需等待隊列,任務提交即執行
Executors.newCachedThreadPool()
//建立有且僅有一個線程的線程池
Executors.newSingleThreadExecutor();
複製代碼
newCachedThreadPool將建立一個可緩存的線程,若是當前線程數超過處理任務時,回收空閒線程;當需求增長時,能夠添加新線程去處理任務。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複製代碼
newFixedThreadPool建立一個固定長度的線程池,每次提交一個任務的時候就會建立一個新的線程,直到達到線程池的最大數量限制。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
newScheduledThreadPool建立一個固定長度的線程池,而且以延遲或者定時的方式去執行任務。
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
複製代碼
newSingleThreadExecutor顧名思義,是一個單線程的Executor,只建立一個工做線程執行任務,若這個惟一的線程異常故障了,會新建另外一個線程來替代,newSingleThreadExecutor能夠保證任務依照在工做隊列的排隊順序來串行執行。
ExecutorService singleThreadPool = Executors.newSingleThreadPool();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
複製代碼
ThreadPoolExecutor.execute(Runnable command)方法,便可向線程池內添加一個任務
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
//獲取當前線程池的狀態
int c = ctl.get();
//若當前線程數量小於corePoolSize,則建立一個新的線程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//判斷當前線程是否處於運行狀態,且寫入任務阻塞隊列是否成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次獲取線程狀態進行雙重檢查;若是線程變成非運行狀態,則從阻塞隊列移除任務;
if (! isRunning(recheck) && remove(command))
//執行拒絕策略
reject(command);
//若當前線程池爲空,則新建一個線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//當前線程爲非運行狀態而且嘗試新建線程,若失敗則執行拒絕策略。
else if (!addWorker(command, false))
reject(command);
}
複製代碼
1)若當前線程數小於corePoolSize
,則調用addWorker()方法建立線程執行任務。 2)若當前線程不小於corePoolSize
,則將任務添加到workQueue隊列,等待空閒線程來執行。 3)若隊列裏的任務數到達上限
,且當前運行線程小於maximumPoolSize
,任務入workQueue隊列失敗,新建線程執行任務; 4)若建立線程也失敗(隊列任務數到達上限
,且當前線程數達到了maximumPoolSize
),對於新加入的任務,就會調用reject()(內部調用handler)拒絕接受任務。
ThreadPoolExecutor的飽和策略能夠經過調用setRejectedExecutionHandler
來修改。JDK提供了幾種不一樣的RejectedExecutionHandler實現,每種實現都包含有不一樣的飽和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。 拒絕策略以下:
RejectedExecutionHandler rejected = null;
//默認策略,阻塞隊列滿,則丟任務、拋出異常
rejected = new ThreadPoolExecutor.AbortPolicy();
//阻塞隊列滿,則丟任務,不拋異常
rejected = new ThreadPoolExecutor.DiscardPolicy();
//刪除隊列中最舊的任務(最先進入隊列的任務),嘗試從新提交新的任務
rejected = new ThreadPoolExecutor.DiscardOldestPolicy();
//隊列滿,不丟任務,不拋異常,若添加到線程池失敗,那麼主線程會本身去執行該任務
rejected = new ThreadPoolExecutor.CallerRunsPolicy();
複製代碼
(1)AbortPolicy、DiscardPolicy和DiscardOldestPolicy AbortPolicy
是默認的飽和策略
,就是停止任務,該策略將拋出RejectedExecutionException。調用者能夠捕獲這個異常而後去編寫代碼處理異常。 當新提交的任務沒法保存到隊列
中等待執行時,DiscardPolicy
會悄悄的拋棄該任務
。 DiscardOldestPolicy
則會拋棄最舊的
(下一個將被執行的任務),而後嘗試從新提交新的任務。若是工做隊列是那個優先級隊列時,搭配DiscardOldestPolicy飽和策略會致使優先級最高的那個任務被拋棄,因此二者不要組合使用。 (2)CallerRunsPolicy CallerRunsPolicy是「調用者運行」策略,實現了一種調節機制 。它不會拋棄任務
,也不會拋出異常
。 而是將任務回退到調用者
。它不會在線程池中
執行任務,而是在一個調用了execute的線程中
執行該任務。在線程滿後,新任務將交由調用線程池execute方法的主線程執行,而因爲主線程在忙碌,因此不會執行accept方法,從而實現了一種平緩的性能下降。 當工做隊列被填滿後,沒有預約義的飽和策略來阻塞execute(除了拋棄就是停止還有去讓調用者去執行)。然而能夠經過Semaphore來限制任務的到達率。
參考 《Java併發編程實戰》 jdk 1.8 源碼包