版權聲明:本文爲博主原創文章,未經博主容許不得轉載
源碼:github.com/AnliaLee
你們要是看到有錯誤的地方或者有啥好的建議,歡迎留言評論html
本篇博客咱們將開始探索由上一章引出的線程池(ThreadPoolExecutor)的知識。因爲內含大量示例,致使文章篇幅有點長,望你們耐心食用...java
往期回顧
大話Android多線程(一) Thread和Runnable的聯繫和區別
大話Android多線程(二) synchronized使用解析
大話Android多線程(三) 線程間的通訊機制之Handler
大話Android多線程(四) Callable、Future和FutureTaskandroid
簡介這東西也寫不出啥花樣來,遂直接偷懶引用別人的吧哈哈git
new Thread()的缺點
• 每次new Thread()耗費性能
• 調用new Thread()建立的線程缺少管理,被稱爲野線程,並且能夠無限制建立,之間相互競爭,會致使過多佔用系統資源致使系統癱瘓
• 不利於擴展,好比如定時執行、按期執行、線程中斷
採用線程池的優勢
• 重用存在的線程,減小對象建立、消亡的開銷,性能佳
• 可有效控制最大併發線程數,提升系統資源的使用率,同時避免過多資源競爭,避免堵塞
• 提供定時執行、按期執行、單線程、併發數控制等功能程序員
以上內容摘自Android線程管理之ExecutorService線程池github
線程池ThreadPoolExecutor的繼承關係以下圖所示算法
下一節咱們將介紹ThreadPoolExecutor的構造參數數組
構造ThreadPoolExecutor時需傳入許多參數,咱們以參數最多的那個構造方法爲例(由於參數threadFactory和handler是有默認值的,因此和其餘幾個構造方法的區別只是有無設置這兩個參數的入口而已,就不贅述了)緩存
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼
咱們將參數分析代入到程序員開發的故事中,讓你們更容易理解,如下是參數介紹多線程
int corePoolSize:
計劃招聘核心程序員的數量。核心程序員是公司的頂樑柱,公司接到甲方需求(即任務)後會優先分配給核心程序員去開發
線程池中核心線程的數量
int maximumPoolSize:
計劃招聘程序員的總數。程序員由核心程序員和實習生組成
線程池中線程數的最大值
long keepAliveTime:
容許員工打醬油的時間。公司招了實習生以後,若是發現一段時間(keepAliveTime)內實習生沒活幹,在那偷懶刷什麼掘金沸點的時候,就會把他辭掉。固然核心程序員抱着的也不必定是鐵飯碗,若公司採起了節省成本的經營策略(ThreadPoolExecutor.allowCoreThreadTimeOut設爲true),核心程序員一段時間沒活幹也同樣會被裁人
線程的閒置時長,默認狀況下此參數只做用於非核心線程,即非核心線程閒置時間超過keepAliveTime後就會被回收。但若是ThreadPoolExecutor.allowCoreThreadTimeOut設爲true,則參數一樣能夠做用於核心線程
TimeUnit unit:
上面時間參數的單位,有納秒、微秒、毫秒、秒、分、時、天
可供選擇的單位類型有:
TimeUnit.NANOSECONDS:納秒
TimeUnit.MICROSECONDS:微秒
TimeUnit.MILLISECONDS:毫秒
TimeUnit.SECONDS:秒
TimeUnit.MINUTES:分
TimeUnit.HOURS:小時
TimeUnit.DAYS:天
BlockingQueue<Runnable> workQueue:
儲備任務的隊列
線程池中的任務隊列,該隊列主要用來存儲已經提交但還沒有分配給線程執行的任務。BlockingQueue,即阻塞隊列,可供傳入的隊列類型有:
• ArrayBlockingQueue:基於數組的阻塞隊列
• LinkedBlockingQueue:基於鏈表的阻塞隊列
• PriorityBlockingQueue:基於優先級的阻塞隊列
• DelayQueue:基於延遲時間優先級的阻塞隊列
• SynchronousQueue:基於同步的阻塞隊列
咱們在下面的章節中將會詳細對比以上這幾種隊列的區別。此外,還需注意傳入任務的都需實現Runnable接口
ThreadFactory threadFactory:
線程工廠接口,只有一個new Thread(Runnable r)方法,能夠爲線程池建立新線程。系統爲咱們提供了默認的threadFactory:Executors.defaultThreadFactory(),咱們通常使用默認的就能夠了
RejectedExecutionHandler handler:
拒絕策略,默認使用ThreadPoolExecutor.AbortPolicy,當新任務被拒絕時會將拋出RejectExecutorException異常。此外還有3種策略可供選擇:CallerRunsPolicy,DiscardPolicy和DiscardOldestPolicy
當咱們使用submit或者execute方法將任務提交到線程池時,線程池遵循如下策略將任務分配給相應線程去執行(任務隊列使用最基本的ArrayBlockingQueue)
提交任務後,若是線程池中的線程數未達到核心線程的數量(corePoolSize),則會建立一個核心線程去執行
舉個栗子,咱們設置任務數(taskSize)爲3,核心線程數(corePoolSize)爲5,則 taskSize < corePoolSize,線程池會建立3條核心線程去執行任務(假設每一個任務都須要必定時間才能完成)
public class ExecutorTest {
//省略部分代碼...
private static int taskSize = 3;//任務數
private static int corePoolSize = 5;//核心線程的數量
private static int maximumPoolSize = 20;//線程數的最大值
private static int queueSize = 128;//可儲存的任務數
public static class TestTask implements Runnable {
public void run() {
if (taskSize > 0) {
try{
Thread.sleep(500);//模擬開發時間
System.out.println(getTime() + getName(Thread.currentThread().getName())
+ " 完成一個開發任務,編號爲t" + (taskSize--)
);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
public static void main(String args[]){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize)
);
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
executor.execute(task);
}
executor.shutdown();
}
}
複製代碼
運行結果見下圖(請忽略任務編號,如今還沒用到)
提交任務後,若是線程池中的線程數已經達到核心線程的數量(corePoolSize),但任務隊列(workQueue)中存儲的任務數未達到最大值,則將任務存入任務隊列中等待執行
咱們將任務數設爲10,核心線程數設爲3,任務隊列的最大值設爲7,此時將任務分配給核心線程後恰好能夠填滿任務隊列
private static int taskSize = 10;//任務數
private static int corePoolSize = 3;//核心線程的數量
private static int maximumPoolSize = 10;//線程數的最大值
private static int queueSize = 7;//可儲存的任務數
複製代碼
運行結果見下圖
提交任務後,若是線程池中的線程數達到核心線程數但未超過線程數的最大值,同時任務隊列中的任務數已達到最大值,則建立一個非核心線程來執行任務
咱們將以前的任務數改成12,其餘數值不變,那麼將會有兩位實習生參與到開發中(taskSize - (corePoolSize + queueSize) = 2)
private static int taskSize = 12;//任務數
private static int corePoolSize = 3;//核心線程的數量
private static int maximumPoolSize = 10;//線程數的最大值
private static int queueSize = 7;//可儲存的任務數
複製代碼
運行結果見下圖(多了實習生4和5)
加...加班???
正確答案應該是推掉!拒絕!
提交任務後,若線程池中的線程數已達到最大值,且全部線程均在執行任務,任務隊列也飽和了,則拒絕執行該任務,並根據拒絕策略執行相應操做
上個例子中一共建立了5條線程(3核心線程、2非核心線程),那麼此次咱們只將線程數的最大值改成4,採用默認的拒絕策略
private static int taskSize = 12;//任務數
private static int corePoolSize = 3;//核心線程的數量
private static int maximumPoolSize = 4;//線程數的最大值
private static int queueSize = 7;//可儲存的任務數
public static void main(String args[]){
//省略部分代碼...
for (int i = 0; i < size; i++) {
executor.execute(task);
System.out.println("接到任務 " + i);
}
executor.shutdown();
}
複製代碼
運行結果見下圖,能夠看見線程池只接收了11個任務(maximumPoolSize + queueSize = 11 ),在提交第12個任務後會拋出RejectedExecutionException的異常
另外須要注意的是,若是咱們在提交任務時拋出了異常,那麼以後調用的shutdown()將變爲無效代碼,線程池將一直運行在主線程中沒法關閉
在 corePoolSize = 0 的條件下,提交任務後,若任務隊列中的任務數仍未達到最大值,線程池只會建立一條非核心線程來執行任務
private static int taskSize = 9;//任務數
private static int corePoolSize = 0;//核心線程的數量
private static int maximumPoolSize = 5;//線程數的最大值
private static int queueSize = 10;//可儲存的任務數
複製代碼
運行結果見下圖
BlockingQueue是一個接口,它提供了3個添加元素方法:
3個刪除元素的方法:
咱們以前講到的5種類型的隊列實際上都是BlockingQueue的實現類,本篇博客不會具體分析源碼的實現,咱們只對比它們使用上的區別:
基於數組的阻塞隊列,ArrayBlockingQueue內部維護了一個由大小固定的數組構成的數據緩衝隊列,數組大小在隊列初始化時就須要指定。而存儲在ArrayBlockingQueue中的元素需按照FIFO(先進先出)的方式來進行存取。ArrayBlockingQueue的構造方法以下
ArrayBlockingQueue(int capacity)
ArrayBlockingQueue(int capacity, boolean fair)
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
複製代碼
咱們這裏僅以一個參數的構造方法爲例,使用ArrayBlockingQueue時的任務執行策略和具體使用示例見上一節,就不重複再講一次了
基於鏈表(單向鏈表)的阻塞隊列,和ArrayBlockingQueue相似,其內部維護着一個由單向鏈表構成的數據緩衝隊列。區別於ArrayBlockingQueue,LinkedBlockingQueue在初始化的時候能夠不用設置容量大小,其默認大小爲Integer.MAX_VALUE(即2的31次方-1,表示 int 類型可以表示的最大值)。若設置了大小,則使用起來和ArrayBlockingQueue同樣。LinkedBlockingQueue的構造方法以下
LinkedBlockingQueue()
LinkedBlockingQueue(int capacity)
LinkedBlockingQueue(Collection<? extends E> c)
複製代碼
這裏參數和以前講的同樣,並且使用方法和ArrayBlockingQueue大同小異,就不贅述了
基於優先級的阻塞隊列,用法相似於LinkedBlockingQueue,區別在於其存儲的元素不是按照FIFO排序的,這些元素的排序規則得由咱們本身來定義:全部插入PriorityBlockingQueue的對象元素必須實現Comparable(天然排序)接口,咱們對Comparable接口的實現定義了隊列優先級的排序規則
PriorityBlockingQueue的隊列容量是「無界」的,由於新任務進來時若是發現已經超過了隊列的初始容量,則會執行擴容的操做。這意味着若是corePoolSize > 0,線程池中的線程數達到核心線程數的最大值,且任務隊列中的任務數也達到最大值,這時新的任務提交進來,線程池並不會建立非核心線程來執行新任務,而是對任務隊列進行擴容
更加具體的內容你們能夠去研究一下這篇篇博客:併發隊列 – 無界阻塞優先級隊列 PriorityBlockingQueue 原理探究
PriorityBlockingQueue的構造方法以下
PriorityBlockingQueue()
PriorityBlockingQueue(int initialCapacity)
PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
PriorityBlockingQueue(Collection<? extends E> c)
複製代碼
重複的參數就不解釋了
下面咱們來看看具體示例(以最簡單的場景體驗一下使用過程): 咱們將核心程序員的數量(corePoolSize)設爲0,任務隊列的容量使用默認的(11),這時公司接到了12個開發任務,項目經理會根據任務的優先級對任務執行的順序進行排序,而後分配給實習生(公司只能招到一個實習生,緣由咱們在解析任務執行策略的時候已經講過了)
public class ExecutorTest {
//省略部分代碼...
private static int taskSize = 9;//任務數
private static int corePoolSize = 0;//核心線程的數量
private static int maximumPoolSize = 5;//線程數的最大值
private static int queueSize = 10;//可儲存的任務數
public static class PriorityTask implements Runnable,Comparable<PriorityTask>{
private int priority;
public PriorityTask(int priority) {
this.priority = priority;
}
@Override
public void run() {
if (taskSize > 0) {
try{
Thread.sleep(1000);//模擬開發時間
System.out.println(getTime() + getName(Thread.currentThread().getName())
+ " 完成一個開發任務,編號爲t" + (taskSize--) + ", 優先級爲:" + priority
);
}catch (Exception e){
e.printStackTrace();
}
}
}
@Override
public int compareTo(PriorityTask task) {
if(this.priority == task.priority){
return 0;
}
return this.priority<task.priority?1:-1;//優先級大的先執行
}
}
public static void main(String args[]){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
1,
TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>(queueSize)
);
Random random = new Random();
PriorityTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
int p = random.nextInt(100);
task = new PriorityTask(p);
executor.execute(task);
System.out.println("接到任務 " + i + ",優先級爲:" + p);
}
executor.shutdown();
}
}
複製代碼
運行結果見下圖
基於延遲時間優先級的阻塞隊列,DelayQueue和PriorityBlockingQueue很是類似,一樣是「無界」隊列(DelayQueue不須要設置初始容量大小),一樣基於優先級進行排序。但有一點不一樣,DelayQueue中的元素必須實現 Delayed接口(Delayed繼承自Comparable接口),咱們需重寫Delayed.getDelay方法爲元素的釋放(執行任務)設置延遲(getDelay方法的返回值是隊列元素被釋放前的保持時間,若是返回0或一個負值,就意味着該元素已經到期須要被釋放,所以咱們通常用完成時間和當前系統時間做比較)。DelayQueue的構造方法以下
DelayQueue()
DelayQueue(Collection<? extends E> c)
複製代碼
此次咱們將延遲時間當成是任務開發時間,設置開發時間越短的任務優先級越高
public class ExecutorTest {
//省略部分代碼...
private static int taskSize = 5;//任務數
private static int corePoolSize = 0;//核心線程的數量
private static int maximumPoolSize = 5;//線程數的最大值
public static class DelayTask implements Runnable,Delayed{
private long finishTime;
private long delay;
public DelayTask(long delay){
this. delay= delay;
finishTime = (delay + System.currentTimeMillis());//計算出完成時間
}
@Override
public void run() {
if (taskSize > 0) {
try{
System.out.println(getTime() + getName(Thread.currentThread().getName())
+ " 完成一個開發任務,編號爲t" + (taskSize--) + ", 用時:" + delay/1000
);
}catch (Exception e){
e.printStackTrace();
}
}
}
@Override
public long getDelay(@NonNull TimeUnit unit) {
//將完成時間和當前時間做比較,<=0 時說明元素到期需被釋放
return (finishTime - System.currentTimeMillis());
}
@Override
public int compareTo(@NonNull Delayed o) {
DelayTask temp = (DelayTask) o;
return temp.delay < this.delay?1:-1;//延遲時間越短優先級越高
}
}
public static void main(String args[]){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
1,
TimeUnit.SECONDS,
new DelayQueue()
);
Random random = new Random();
DelayTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
long d = 1000 + random.nextInt(10000);
task = new DelayTask(d);
executor.execute(task);
System.out.println("接到任務 " + i + ",預計完成時間爲:" + d/1000);
}
executor.shutdown();
}
}
複製代碼
運行結果以下
基於同步的阻塞隊列,這是一個很是特殊的隊列,由於它內部並沒有數據緩存空間,元素只有在試圖取走的時候纔有可能存在。也就是說,若是在插入元素時後續沒有執行取出的操做,那麼插入的行爲就會被阻塞,若是SynchronousQueue是在線程池中使用的,那麼這種場景下就會拋出RejectedExecutionException異常。可能這麼解釋有點繞,下面咱們會經過講解示例輔助你們理解,先來看構造方法
SynchronousQueue()
SynchronousQueue(boolean fair)
複製代碼
一樣的,參數和以前同樣,就不解釋了,咱們來看示例:
採用了SynchronousQueue的策略後,任務隊列不能儲存任務了。這意味着若是接到新任務時發現沒人有空來開發(程序員手上都有任務,公司招人名額也滿了),那這個新任務就泡湯了(拋出異常)
例如咱們將核心程序員的數量(corePoolSize)設爲3,程序員總數(maximumPoolSize)設爲9,而任務數(taskSize)設爲10
public class ExecutorTest {
//省略部分代碼...
private static int taskSize = 10;//任務數
private static int corePoolSize = 3;//核心線程的數量
private static int maximumPoolSize = 9;//線程數的最大值
public static void main(String args[]){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
1,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
);
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
executor.execute(task);
System.out.println("接到任務 " + i);
}
executor.shutdown();
}
}
複製代碼
在招滿人的狀況下,公司最多就9個程序員,當接到第10個任務時,發現沒人可用了,就會拋出異常。固然,以前成功接收的任務不會受到影響
所以根據SynchronousQueue的特性,在使用SynchronousQueue時一般會將maximumPoolSize設爲「無邊界」,即Integer.MAX_VALUE(在系統爲咱們預設的線程池中,CachedThreadPool就是這麼設置的,具體的咱們後面再細說)
前面講了這麼多,其實都是教你們如何自定義一個線程池。系統爲了方便咱們進行開發,早已封裝好了各類線程池供咱們使用。咱們能夠用Executors.newXXX的方式去實例化咱們須要的線程池。可供選擇的線程池種類不少:
咱們挑其中經常使用的4種講講就行(其實各類線程池的區別只是構建線程池時傳入的參數不一樣而已,通過以前咱們對任務執行策略和各類任務隊列的講解後,理解不一樣種類的線程池就變得很是簡單了。這也正是博主要花費那麼長的篇幅給你們舉例子的緣由,但願你們都能看得懂吧~)
咱們直接看系統是如何封裝的
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
);
}
複製代碼
核心線程數爲0,線程總數設爲Integer.MAX_VALUE,線程的閒置時長爲60s,任務隊列爲SynchronousQueue(同步隊列),結合咱們以前說的,能夠總結出CachedThreadPool的特色以下:
使用示例以下,咱們設置任務數爲10
ExecutorService service = Executors.newCachedThreadPool();
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
service.execute(task);
}
service.shutdown();
複製代碼
源碼以下
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
}
複製代碼
核心線程數由咱們自定義,最大線程數和核心線程數相等,線程閒置時間爲0,任務隊列爲LinkedBlockingQueue,因此FixedThreadPool的特色以下:
使用示例以下,咱們設置任務數爲10,核心線程數爲5
ExecutorService service = Executors.newFixedThreadPool(corePoolSize);
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
service.execute(task);
}
service.shutdown();
複製代碼
源碼以下
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
複製代碼
核心線程數爲1,最大線程數也是1,線程閒置時間爲0,任務隊列爲LinkedBlockingQueue,且SingleThreadExecutor是FinalizableDelegatedExecutorService類的實例,因此SingleThreadExecutor的特色以下:
使用示例以下,咱們設置任務數爲10
ExecutorService service = Executors.newSingleThreadExecutor();
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
service.execute(task);
}
service.shutdown();
複製代碼
源碼以下
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(
corePoolSize,
Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, //DEFAULT_KEEPALIVE_MILLIS = 10L
MILLISECONDS,
new DelayedWorkQueue()
);
}
複製代碼
核心線程數由咱們自定義,最大線程數爲Integer.MAX_VALUE,線程閒置時長爲10毫秒,任務隊列採用了DelayedWorkQueue(和DelayQueue很是像)。ScheduledThreadPool的特色以下:
使用示例以下,咱們調用ScheduledExecutorService.schedule方法提交延遲啓動的任務,延遲時間爲3秒:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
Runnable runnable = new Runnable(){
@Override
public void run() {
System.out.println("開始執行任務,時間:" + getTime());
}
};
scheduledExecutorService.schedule(runnable,3,TimeUnit.SECONDS);
System.out.println("提交任務,時間:" + getTime());
複製代碼
此外還有scheduleAtFixedRate,scheduleWithFixedDelay等提交任務的方法,就不一一舉例了
一、咱們除了用execute方法提交任務之外,還可使用submit方法。submit方法提交的任務需實現Callable接口(有關Callable的知識能夠看下我上一篇博客:大話Android多線程(四) Callable、Future和FutureTask),所以其具備返回值
二、線程池有兩種手動關閉的方法:
emmmm...基本就這些內容了,博主已經儘量地覆蓋線程池的全部知識了(除了源碼解析,之後有機會會出一個單章分析下源碼),如有什麼遺漏或者建議的歡迎留言評論。若是以爲博主寫得還不錯麻煩點個贊,大家的支持是我最大的動力~