本篇是多線程系列的第三篇,若是對前兩篇感興趣的也能夠去看看。前端
多線程(一)、基礎概念及notify()和wait()的使用java
Android進階系列文章是我在學習的同時對知識點的整理,一是爲了加深印象,二是方便後續查閱。web
若是文中有錯誤的地方,歡迎批評指出。數據庫
若是在Android裏面,直接 new Thread
,阿里巴巴 Android 開發規範會提示你不要顯示建立線程,請使用線程池,爲啥要用線程池?你對線程池瞭解多少?後端
在 多線程(一)、基礎概念及notify()和wait()的使用 講了線程的建立,每當有任務來的時候,經過建立一個線程來執行任務,當任務執行結束,對線程進行銷燬,併發操做的時候,大量任務須要執行,每一個任務都要須要重複線程的建立、執行、銷燬,形成了CPU的資源銷燬,並下降了響應速度。數組
new Thread(new Runnable() {
@Override
public void run() {
// 任務執行
}
}).start();
複製代碼
**線程池 **:字面上理解就是將線程經過一個池子進行管理,當任務來的時候,從池子中取出一個已經建立好的線程進行任務的執行,執行結束後再將線程放回池中,待線程池銷燬的時候再統一對線程進行銷燬。緩存
經過上面的對比,使用線程池基本有之前好處:網絡
一、下降資源消耗。經過重複使用線程池中的線程,下降了線程建立和銷燬帶來的資源消耗。多線程
二、提升響應速度。重複使用池中線程,減小了重複建立和銷燬線程帶來的時間開銷。
三、提升線程的可管理性。線程是稀缺資源,咱們不可能無節制建立,這樣會大量消耗系統資源,使用線程池能夠統一分配,管理和監控線程。
要使用線程池,就必需要用到 ThreadPoolExecutor
類,
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
這裏貼了 ThreadPoolExecutor
最複雜的一個構造方法,咱們把參數單獨拎出來說
一、int corePoolSize
核心線程數量:每當接收到一個任務的時候,線程池會建立一個新的線程來執行任務,直到當前線程池中的線程數目等於 corePoolSize
,當任務大於corePoolSize
時候,會放入阻塞隊列
二、int maximumPoolSize
非核心線程數量:線程池中容許的最大線程數,若是當前阻塞隊列滿了,當接收到新的任務就會再次建立線程進行執行,直到線程池中的數目等於maximumPoolSize
三、long keepAliveTime
線程空閒時存活時間:當線程數大於沒有任務執行的時候,繼續存活的時間,默認該參數只有線程數大於corePoolSize
時纔有用
四、TimeUnit unit
keepAliveTime的時間單位
五、BlockingQueue workQueue
阻塞隊列:當線程池中線程數目超過 corePoolSize
的時候,線程會進入阻塞隊列進行阻塞等待,當阻塞隊列滿了的時候,會根據 maximumPoolSize
數量新開線程執行。
隊列:
是一種特殊的線性表,特殊之處在於它只容許在表的前端(front)進行刪除操做,而在表的後端(rear)進行插入操做,和棧同樣,隊列是一種操做受限制的線性表。
進行插入操做的端稱爲隊尾,進行刪除操做的端稱爲隊頭。
隊列中沒有元素時,稱爲空隊列。隊列的數據元素又稱爲隊列元素。
在隊列中插入一個隊列元素稱爲入隊,從隊列中刪除一個隊列元素稱爲出隊。
由於隊列只容許在一端插入,在另外一端刪除,因此只有最先進入隊列的元素才能最早從隊列中刪除,故隊列又稱爲先進先出(FIFO—first in first out)線性表。
阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的緩存容器,而消費者也只從容器裏拿元素。
先看看 BlockingQueue
,它是一個接口,繼承 Queue
public interface BlockingQueue<E> extends Queue<E>
複製代碼
再看看它裏面的方法
針對這幾個方法,簡單的進行介紹:
拋出異常 | 返回特殊值 | 阻塞 | 超時 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
檢查 | element() | peek() |
• 拋出異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常。當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。
• 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,若是沒有則返回null
• 阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。
• 超時:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,若是超過必定的時間,生產者線程就會退出。
咱們再看JDK爲咱們提供的一些阻塞隊列,以下圖:
簡單說明:
阻塞隊列 | 用法 |
---|---|
ArrayBlockingQueue | 一個由數組結構組成的有界阻塞隊列。 |
LinkedBlockingQueue | 一個由鏈表結構組成的有界阻塞隊列。 |
PriorityBlockingQueue | 一個支持優先級排序的無界阻塞隊列。 |
DelayQueue | 一個使用優先級隊列實現的無界阻塞隊列。 |
SynchronousQueue | 一個不存儲元素的阻塞隊列。 |
LinkedTransferQueue | 一個由鏈表結構組成的無界阻塞隊列。 |
LinkedBlockingDeque | 一個由鏈表結構組成的雙向阻塞隊列。 |
六、ThreadFactory threadFactory
建立線程的工廠,經過自定義的線程工廠能夠給每一個新建的線程設置一個具備識別度的線程名Executors靜態工廠裏默認的threadFactory,線程的命名規則是「pool-數字-thread-數字」。
七、RejectedExecutionHandler handler (飽和策略)
線程池的飽和策略,若是任務特別多,隊列也滿了,且沒有空閒線程進行處理,線程池將必須對新的任務採起飽和策略,即提供一種方式來處理這部分任務。
jdk 給咱們提供了四種策略,如圖:
策略 | 做用 |
---|---|
AbortPolicy | 直接拋出異常,該策略也爲默認策略 |
CallerRunsPolicy | 在調用者線程中執行該任務 |
DiscardOldestPolicy | 丟棄阻塞隊列最前面的任務,並執行當前任務 |
DiscardPolicy | 直接丟棄任務 |
咱們能夠看到 RejectedExecutionHandler
實際是一個接口,且只有一個 rejectedExecution
因此咱們能夠根據本身的需求定義本身的飽和策略。
/**
* A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
*
* @since 1.5
* @author Doug Lea
*/
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
複製代碼
熟悉了上面線程池的各個參數含義,對線程池的工做原理,咱們也能夠大體總結以下:
一、線程池剛建立的時候,裏面沒有線程在運行,當有任務進來,而且線程池開始執行的時候,會根據實際狀況處理。
二、當前線程池線程數量少於 corePoolSize
時候,每當有新的任務來時,都會建立一個新的線程進行執行。
三、當線程池中運行的線程數大於等於 corePoolSize
,每當有新的任務來的時候,都會加入阻塞隊列中。
四、當阻塞隊列加滿,沒法再加入新的任務的時候,則會再根據 maximumPoolSize
數 來建立新的非核心線程執行任務。
四、當線程池中線程數目大於等於 maximumPoolSize
時候,當有新的任務來的時候,拒絕執行該任務,採起飽和策略。
五、當一個線程無事可作,超過必定的時間(keepAliveTime)時,線程池會判斷,若是當前運行的線程數大於 corePoolSize,那麼這個線程就被停掉。因此線程池的全部任務完成後,它最終會收縮到 corePoolSize
的大小。
直接經過 ThreadPoolExecutor 建立:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 10
, 1, TimeUnit.SECONDS
, new LinkedBlockingQueue<Runnable>(50)
, Executors.defaultThreadFactory()
, new ThreadPoolExecutor.AbortPolicy());
複製代碼
經過工具類java.util.concurrent.Executors
建立的線程池,其實質也是調用 ThreadPoolExecutor
進行建立,只是針對不一樣的需求,對參數進行了設置。
可重用固定線程數
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
參數說明:
int corePoolSize: nThreads
int maximumPoolSize: nThreads
long keepAliveTime:0L
TimeUnit unit:TimeUnit.MILLISECONDS
BlockingQueue workQueue:new LinkedBlockingQueue()
能夠看到核心線程和非核心線程一致,及不會建立非核心線程,超時時間爲0,即就算線程處於空閒狀態,也不會對其進行回收,阻塞隊列爲LinkedBlockingQueue
無界阻塞隊列。
當有任務來的時候,先建立核心線程,線程數超過 corePoolSize
就進入阻塞隊列,當有空閒線程的時候,再在阻塞隊列中去任務執行。
使用場景:線程池線程數固定,且不會回收,線程生命週期與線程池生命週期同步,適用任務量比較固定且耗時的長的任務。
單線程執行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
複製代碼
參數說明:
int corePoolSize: 1
int maximumPoolSize: 1
long keepAliveTime:0L
TimeUnit unit:TimeUnit.MILLISECONDS
BlockingQueue workQueue:new LinkedBlockingQueue()
基本和 FixedThreadPool 一致,最明顯的區別就是線程池中只存在一個核心線程來執行任務。
使用場景:只有一個線程,確保因此任務都在一個線程中順序執行,不須要處理線程同步問題,適用多個任務順序執行。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複製代碼
參數說明:
int corePoolSize: 0
int maximumPoolSize: Integer.MAX_VALUE
long keepAliveTime:60L
TimeUnit unit:TimeUnit.SECONDS
BlockingQueue workQueue:new SynchronousQueue()
無核心線程,非核心線程數量 Integer.MAX_VALUE,能夠無限建立,空閒線程60秒會被回收,任務隊列採用的是SynchronousQueue
,這個隊列是沒法插入任務的,一有任務當即執行。
使用場景:因爲非核心線程無限制,且使用沒法插入的SynchronousQueue
隊列,因此適合任務量大但耗時少的任務。
定時延時執行
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
複製代碼
參數說明:
int corePoolSize: corePoolSize (設定)
int maximumPoolSize: Integer.MAX_VALUE
long keepAliveTime:0
TimeUnit unit:NANOSECONDS
BlockingQueue workQueue:new DelayedWorkQueue()
核心線程數固定(設置),非核心線程數建立無限制,可是空閒時間爲0,即非核心線程一旦空閒就回收, DelayedWorkQueue()
無界隊列會將任務進行排序,延時執行隊列任務。
使用場景:newScheduledThreadPool是惟一一個具備定時按期執行任務功能的線程池。它適合執行一些週期性任務或者延時任務,能夠經過schedule(Runnable command, long delay, TimeUnit unit)
方法實現。
線程池提供了 execute
和 submit
兩個方法來執行
execute:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 得到當前線程的生命週期對應的二進制狀態碼
int c = ctl.get();
//判斷當前線程數量是否小於核心線程數量,若是小於就直接建立核心線程執行任務,建立成功直接跳出,失敗則接着往下走.
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//判斷線程池是否爲RUNNING狀態,而且將任務添加至隊列中.
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//審覈下線程池的狀態,若是不是RUNNING狀態,直接移除隊列中
if (! isRunning(recheck) && remove(command))
reject(command);
//若是當前線程數量爲0,則單首創建線程,而不指定任務.
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//若是不知足上述條件,嘗試建立一個非核心線程來執行任務,若是建立失敗,調用reject()方法.
else if (!addWorker(command, false))
reject(command);
}
複製代碼
submit():
源碼:
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 將runnable封裝成 Future 對象
RunnableFuture<T> ftask = newTaskFor(task, result);
// 執行 execute 方法
execute(ftask);
// 返回包裝好的Runable
return ftask;
}
// newTaskFor : 經過 FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
複製代碼
其中 newTaskFor
返回的 RunnableFuture<T>
方法繼承了 Runnable
接口,因此能夠直接經過 execute
方法執行。
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
複製代碼
能夠看到,submit 中實際也是調用了 execute() 方法,只不過在調用方法以前,先將Runnable
對象封裝成FutureTask
對象,而後再返回 Future<T>
,咱們能夠經過Future
的 get
方法,拿到任務執行結束後的返回值。
咱們在 多線程(一)、基礎概念及notify()和wait()的使用 中也講了 FutureTask
它提供了 cancel
、isCancelled
、isDone
、get
幾個方法,來對任務進行相應的操做。
總結:
一般狀況下,咱們不須要對線程或者獲取執行結果,能夠直接使用
execute
方法。若是咱們要獲取任務執行的結果,或者想對任務進行取消等操做,就使用
submit
方法。
關於線程的中斷在 多線程(一)、基礎概念及notify()和wait()的使用 裏面有介紹。
線程池的參數比較靈活,咱們能夠自由設置,可是具體每一個參數該設置成多少比較合理呢?這個要根據咱們處理的任務來決定,對任務通常從如下幾個點分析:
8.一、任務的性質
CPU 密集型、IO 密集型、混合型
CPU密集型應配置儘量小的線程,如Ncpu+1個線程的線程池
IO密集型,IO操做有關,如磁盤,內存,網絡等等,對CPU的要求不高則應配置儘量多的線程,如2*Ncpu個線程的線程池
混合型須要拆成CPU 密集型和IO 密集型分別分析,根據任務數量和執行時間,來決定線程的數量
8.二、任務的優先級
高中低優先級
8.三、任務執行時間
長中短
8.四、任務的依耐性
是否須要依賴其餘系統資源,如數據庫鏈接
Runtime.getRuntime().availableProcessors() : 當前設備的CPU個數
又巴拉巴拉說了一大推,我以爲惟有代碼運行,經過結果分析最能打動人心,下面就經過代碼運行結果來分析。
先看這麼一段代碼:
public static void main(String[] args) {
// 一、經過 ThreadPoolExecutor 建立基本線程池
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3,
5,
1,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(50));
for (int i = 0; i < 30; i++) {
final int num = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
// 睡兩秒後執行
Thread.sleep(2000);
System.out.println("run : " + num + " 當前線程:" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 執行
threadPoolExecutor.execute(runnable);
}
}
複製代碼
咱們經過 ThreadPoolExecutor
建立了一個線程池,而後執行30個任務。
參數說明:
int corePoolSize: 3
int maximumPoolSize: 5
long keepAliveTime:1
TimeUnit unit:TimeUnit.SECONDS
BlockingQueue workQueue:new LinkedBlockingQueue(50)
線程池核心線程數爲3,非核心線程數爲5,非核心線程空閒1秒被回收,阻塞隊列使用了 new LinkedBlockingQueue
並指定了隊列容量爲50。
結果:
咱們看到每兩秒後,有三個任務被執行。這是由於核心咱們設置的核心線程數爲3,當多餘的任務到來後,會先放入到阻塞隊列中,又因爲咱們設置的阻塞隊列容量爲50,因此,阻塞隊列永遠不會滿,就不會啓動非核心線程。
咱們改一下咱們的線程池以下:
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(25));
複製代碼
參數就不分析了,咱們直接看結果:
咱們看到此次每隔兩秒有五個任務在執行,爲何?這裏要根據咱們前面線程池的工做原理來分析,咱們有三十個任務須要執行,核心線程數爲2,其他的任務放入阻塞隊列中,阻塞隊列容量爲25,剩餘任務不超過非核心線程數,當阻塞隊列滿的時候,就啓動了非核心線程來執行。
咱們再簡單改一下咱們的線程池,代碼以下:
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(24));
複製代碼
相比上面的,咱們就將阻塞隊列容量改爲了24,若是上面你對線程池的工做原理清楚了,你應該能知道我這裏改爲 24 的良苦用心了,咱們先看結果。
最直接的就是拋異常了,可是線程池仍然再執行任務,首先爲啥拋異常?首先,咱們須要執行三十個任務,可是咱們的阻塞隊列容量爲 24,隊列滿後啓動了非核心線程,可是非核心線程數量爲5,當剩下的這個任務來的時候,線程池將採起飽和策略,咱們沒有設置,默認爲 AbortPolicy
,即直接拋異常,若是咱們手動設置飽和策略以下:
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(24),new ThreadPoolExecutor.DiscardPolicy());
複製代碼
咱們這裏採用的飽和策略爲 DiscardPolicy
,即丟棄多餘任務。最終能夠看到結果沒有拋異常,最終只執行了29個任務,最後一個任務被拋棄了。
最後再看一下經過 Executors 靜態方法建立的線程池運行上面的任務結果如何,Executors 建立的線程池本質也是經過建立 ThreadPoolExecutor
來執行,可結合上面分析自行總結。
一、FixedThreadPool
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(3);
複製代碼
結果:
二、newSingleThreadExecutor
ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
複製代碼
結果:
三、newCachedThreadPool
ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();
複製代碼
結果 :
四、newScheduledThreadPool
ScheduledExecutorService threadPoolExecutor = Executors.newScheduledThreadPool(3);
複製代碼
這是多線程的第三篇,這篇文章篇幅有點多, 有點小亂,後續會再整理一下,基本都是跟着本身的思路,在寫的同時,本身也會再操做一遍,源碼分析過程當中,也會盡量的詳細,一步步的深刻,後續查閱的時候也方便,文章中有些不是很詳細的地方,後面可能會再次更新,或者單獨用一篇文章來說。