java併發編程:Executor、Executors、ExecutorService

Executors
    在Java 5以後,併發編程引入了一堆新的啓動、調度和管理線程的API。Executor框架即是Java 5中引入的,其內部使用了線程池機制,它在java.util.cocurrent 包下,經過該框架來控制線程的啓動、執行和關閉,能夠簡化併發編程的操做。所以,在Java 5以後,經過Executor來啓動線程比使用Thread的start方法更好,除了更易管理,效率更好(用線程池實現,節約開銷)外,還有關鍵的一點:有助於避免this逃逸問題——若是咱們在構造器中啓動一個線程,由於另外一個任務可能會在構造器結束以前開始執行,此時可能會訪問到初始化了一半的對象用Executor在構造器中。Eexecutor做爲靈活且強大的異步執行框架,其支持多種不一樣類型的任務執行策略,提供了一種標準的方法將任務的提交過程和執行過程解耦開發,基於生產者-消費者模式,其提交任務的線程至關於生產者,執行任務的線程至關於消費者,並用Runnable來表示任務,Executor的實現還提供了對生命週期的支持,以及統計信息收集,應用程序管理機制和性能監視等機制。
1、Executor的UML圖:(經常使用的幾個接口和子類)java


Executor框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。編程


2、Executor和ExecutorService
Executor:一個接口,其定義了一個接收Runnable對象的方法executor,其方法簽名爲executor(Runnable command),該方法接收一個Runable實例,它用來執行一個任務,任務即一個實現了Runnable接口的類,通常來講,Runnable任務開闢在新線程中的使用方法爲:new Thread(new RunnableTask())).start(),但在Executor中,可使用Executor而不用顯示地建立線程:executor.execute(new RunnableTask()); // 異步執行緩存

ExecutorService:是一個比Executor使用更普遍的子類接口,其提供了生命週期管理的方法,返回 Future 對象,以及可跟蹤一個或多個異步任務執行情況返回Future的方法;能夠調用ExecutorService的shutdown()方法來平滑地關閉 ExecutorService,調用該方法後,將致使ExecutorService中止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另外一類是尚未開始執行的),當全部已經提交的任務執行完畢後將會關閉ExecutorService。所以咱們通常用該接口來實現和管理多線程。服務器

經過 ExecutorService.submit() 方法返回的 Future 對象,能夠調用isDone()方法查詢Future是否已經完成。當任務完成時,它具備一個結果,你能夠調用get()方法來獲取該結果。你也能夠不用isDone()進行檢查就直接調用get()獲取結果,在這種狀況下,get()將阻塞,直至結果準備就緒,還能夠取消任務的執行。Future 提供了 cancel() 方法用來取消執行 pending 中的任務。ExecutorService 部分代碼以下:多線程

public interface ExecutorService extends Executor {
void shutdown();
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
}併發

3、Executors類: 主要用於提供線程池相關的操做
Executors類,提供了一系列工廠方法用於建立線程池,返回的線程池都實現了ExecutorService接口。框架

一、public static ExecutorService newFiexedThreadPool(int Threads) 建立固定數目線程的線程池。異步

二、public static ExecutorService newCachedThreadPool():建立一個可緩存的線程池,調用execute 將重用之前構造的線程(若是線程可用)。若是沒有可用的線程,則建立一個新線程並添加到池中。終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。ide


三、public static ExecutorService newSingleThreadExecutor():建立一個單線程化的Executor。性能

 

四、public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

建立一個支持定時及週期性的任務執行的線程池,多數狀況下可用來替代Timer類。

 


newCachedThreadPool()                                                                                                                                         

-緩存型池子,先查看池中有沒有之前創建的線程,若是有,就 reuse.若是沒有,就建一個新的線程加入池中
-緩存型池子一般用於執行一些生存期很短的異步型任務
 所以在一些面向鏈接的daemon型SERVER中用得很少。但對於生存期短的異步任務,它是Executor的首選。
-能reuse的線程,必須是timeout IDLE內的池中線程,缺省     timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。
  注意,放入CachedThreadPool的線程沒必要擔憂其結束,超過TIMEOUT不活動,其會自動被終止。

 

newFixedThreadPool(int)                                                      

-newFixedThreadPool與cacheThreadPool差很少,也是能reuse就用,但不能隨時建新的線程
-其獨特之處:任意時間點,最多隻能有固定數目的活動線程存在,此時若是有新的線程要創建,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子
-和cacheThreadPool不一樣,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,確定很是長,相似依賴上層的TCP或UDP IDLE機制之類的),因此FixedThreadPool多數針對一些很穩定很固定的正規併發線程,多用於服務器
-從方法的源代碼看,cache池和fixed 池調用的是同一個底層 池,只不過參數不一樣:
fixed池線程數固定,而且是0秒IDLE(無IDLE)    
cache池線程數支持0-Integer.MAX_VALUE(顯然徹底沒考慮主機的資源承受能力),60秒IDLE  

newScheduledThreadPool(int)

-調度型線程池
-這個池子裏的線程能夠按schedule依次delay執行,或週期執行

SingleThreadExecutor()

-單例線程,任意時間池中只能有一個線程
-用的是和cache池和fixed池相同的底層池,但線程數目是1-1,0秒IDLE(無IDLE)

4、Executor VS  ExecutorService VS Executors
正如上面所說,這三者均是 Executor 框架中的一部分。Java 開發者頗有必要學習和理解他們,以便更高效的使用 Java 提供的不一樣類型的線程池。總結一下這三者間的區別,以便你們更好的理解:

Executor 和 ExecutorService 這兩個接口主要的區別是:ExecutorService 接口繼承了 Executor 接口,是 Executor 的子接口
Executor 和 ExecutorService 第二個區別是:Executor 接口定義了 execute()方法用來接收一個Runnable接口的對象,而 ExecutorService 接口中的 submit()方法能夠接受Runnable和Callable接口的對象。
Executor 和 ExecutorService 接口第三個區別是 Executor 中的 execute() 方法不返回任何結果,而 ExecutorService 中的 submit()方法能夠經過一個 Future 對象返回運算結果。
Executor 和 ExecutorService 接口第四個區別是除了容許客戶端提交一個任務,ExecutorService 還提供用來控制線程池的方法。好比:調用 shutDown() 方法終止線程池。能夠經過 《Java Concurrency in Practice》 一書瞭解更多關於關閉線程池和如何處理 pending 的任務的知識。
Executors 類提供工廠方法用來建立不一樣類型的線程池。好比: newSingleThreadExecutor() 建立一個只有一個線程的線程池,newFixedThreadPool(int numOfThreads)來建立固定線程數的線程池,newCachedThreadPool()能夠根據須要建立新的線程,但若是已有線程是空閒的會重用已有線程。
下面給出一個Executor執行Callable任務的示例代碼:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CallableDemo{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<String>> resultList = new ArrayList<Future<String>>();

//建立10個任務並執行
for (int i = 0; i < 10; i++){
//使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中
Future<String> future = executorService.submit(new TaskWithResult(i));
//將任務執行結果存儲到List中
resultList.add(future);
}

//遍歷任務的結果
for (Future<String> fs : resultList){
try{
while(!fs.isDone);//Future返回若是沒有完成,則一直循環等待,直到Future返回完成
System.out.println(fs.get()); //打印各個線程(任務)執行的結果
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}finally{
//啓動一次順序關閉,執行之前提交的任務,但不接受新任務
executorService.shutdown();
}
}
}
}


class TaskWithResult implements Callable<String>{
private int id;

public TaskWithResult(int id){
this.id = id;
}

/**
* 任務的具體過程,一旦任務傳給ExecutorService的submit方法,
* 則該方法自動在一個線程上執行
*/
public String call() throws Exception {
System.out.println("call()方法被自動調用!!! " + Thread.currentThread().getName());
//該返回結果將被Future的get方法獲得
return "call()方法被自動調用,任務返回的結果是:" + id + " " + Thread.currentThread().getName();
}
}
5、自定義線程池
自定義線程池,能夠用ThreadPoolExecutor類建立,它有多個構造方法來建立線程池,用該類很容易實現自定義的線程池,這裏先貼上示例程序:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest{
public static void main(String[] args){
//建立等待隊列
BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
//建立線程池,池中保存的線程數爲3,容許的最大線程數爲5
ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
//建立七個任務
Runnable t1 = new MyThread();
Runnable t2 = new MyThread();
Runnable t3 = new MyThread();
Runnable t4 = new MyThread();
Runnable t5 = new MyThread();
Runnable t6 = new MyThread();
Runnable t7 = new MyThread();
//每一個任務會在一個線程上執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.execute(t6);
pool.execute(t7);
//關閉線程池
pool.shutdown();
}
}

class MyThread implements Runnable{
@Override
public void run(){
System.out.println(Thread.currentThread().getName() + "正在執行。。。");
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
 運行結果以下:

 從結果中能夠看出,七個任務是在線程池的三個線程上執行的。這裏簡要說明下用到的ThreadPoolExecuror類的構造方法中各個參數的含義。
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

corePoolSize:線程池中所保存的核心線程數,包括空閒線程。

maximumPoolSize:池中容許的最大線程數。

keepAliveTime:線程池中的空閒線程所能持續的最長時間。

unit:持續時間的單位。

workQueue:任務執行前保存任務的隊列,僅保存由execute方法提交的Runnable任務。


根據ThreadPoolExecutor源碼前面大段的註釋,咱們能夠看出,當試圖經過excute方法將一個Runnable任務添加到線程池中時,按照以下順序來處理:
    一、若是線程池中的線程數量少於corePoolSize,即便線程池中有空閒線程,也會建立一個新的線程來執行新添加的任務;
    二、若是線程池中的線程數量大於等於corePoolSize,但緩衝隊列workQueue未滿,則將新添加的任務放到workQueue中,按照FIFO的原則依次等待執行(線程池中有線程空閒出來後依次將緩衝隊列中的任務交付給空閒的線程執行);

   三、若是線程池中的線程數量大於等於corePoolSize,且緩衝隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會建立新的線程來處理被添加的任務;


  四、若是線程池中的線程數量等於了maximumPoolSize,有4種處理方式(該構造方法調用了含有5個參數的構造方法,並將最後一個構造方法爲RejectedExecutionHandler類型,它在處理線程溢出時有4種方式,這裏再也不細說,要了解的,本身能夠閱讀下源碼)。

    總結起來,也便是說,當有新的任務要處理時,先看線程池中的線程數量是否大於corePoolSize,再看緩衝隊列workQueue是否滿,最後看線程池中的線程數量是否大於maximumPoolSize。

    另外,當線程池中的線程數量大於corePoolSize時,若是裏面有線程的空閒時間超過了keepAliveTime,就將其移除線程池,這樣,能夠動態地調整線程池中線程的數量。

 

咱們大體來看下Executors的源碼,newCachedThreadPool的不帶RejectedExecutionHandler參數(即第五個參數,線程數量超過maximumPoolSize時,指定處理方式)的構造方法以下:

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
它將corePoolSize設定爲0,而將maximumPoolSize設定爲了Integer的最大值,線程空閒超過60秒,將會從線程池中移除。因爲核心線程數爲0,所以每次添加任務,都會先從線程池中找空閒線程,若是沒有就會建立一個線程(SynchronousQueue<Runnalbe>決定的,後面會說)來執行新的任務,並將該線程加入到線程池中,而最大容許的線程數爲Integer的最大值,所以這個線程池理論上能夠不斷擴大。
    再來看newFixedThreadPool的不帶RejectedExecutionHandler參數的構造方法,以下:

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
它將corePoolSize和maximumPoolSize都設定爲了nThreads,這樣便實現了線程池的大小的固定,不會動態地擴大,另外,keepAliveTime設定爲了0,也就是說線程只要空閒下來,就會被移除線程池,勇於LinkedBlockingQueue下面會說。
    下面說說幾種排隊的策略:

    一、直接提交。緩衝隊列採用 SynchronousQueue,它將任務直接交給線程處理而不保持它們。若是不存在可用於當即運行任務的線程(即線程池中的線程都在工做),則試圖把任務加入緩衝隊列將會失敗,所以會構造一個新的線程來處理新添加的任務,並將其加入到線程池中。直接提交一般要求無界 maximumPoolSizes(Integer.MAX_VALUE) 以免拒絕新提交的任務。newCachedThreadPool採用的即是這種策略。

    二、無界隊列。使用無界隊列(典型的即是採用預約義容量的 LinkedBlockingQueue,理論上是該緩衝隊列能夠對無限多的任務排隊)將致使在全部 corePoolSize 線程都工做的狀況下將新任務加入到緩衝隊列中。這樣,建立的線程就不會超過 corePoolSize,也所以,maximumPoolSize 的值也就無效了。當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列。newFixedThreadPool採用的即是這種策略。

    三、有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(通常緩衝隊列使用ArrayBlockingQueue,並制定隊列的最大長度)有助於防止資源耗盡,可是可能較難調整和控制,隊列大小和最大池大小須要相互折衷,須要設定合理的參數。

6、比較Executor和new Thread()
new Thread的弊端以下:

a. 每次new Thread新建對象性能差。b. 線程缺少統一管理,可能無限制新建線程,相互之間競爭,及可能佔用過多系統資源致使死機或oom。c. 缺少更多功能,如定時執行、按期執行、線程中斷。相比new Thread,Java提供的四種線程池的好處在於:a. 重用存在的線程,減小對象建立、消亡的開銷,性能佳。b. 可有效控制最大併發線程數,提升系統資源的使用率,同時避免過多資源競爭,避免堵塞。c. 提供定時執行、按期執行、單線程、併發數控制等功能。————————————————版權聲明:本文爲CSDN博主「不斷前行的菜鳥_」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處連接及本聲明。原文連接:https://blog.csdn.net/weixin_40304387/article/details/80508236

相關文章
相關標籤/搜索