掃描下方二維碼或者微信搜索公衆號
菜鳥飛呀飛
,便可關注微信公衆號,閱讀更多Spring源碼分析
和Java併發編程
文章。java
衆所周知,當咱們使用線程來運行Runnable
任務時,是不支持獲取返回值的,由於Runnable接口的run()方法使用void修飾
的,方法不支持返回值。而在不少場景下,咱們一方面須要經過線程來異步執行任務,以便提高性能,另外一方面還指望能獲取到任務的執行結果。尤爲是在RPC框架中,異步獲取任務返回值,幾乎是每個RPC接口要實現的功能。這個時候,使用Runnable顯然就沒法知足咱們的需求了,所以Callable
就出現了。編程
Callable與Runnable相似,它是一個接口,也只有一個方法:call()
,不一樣的是Callable的call()方法有是有返回值的,返回值的類型是一個泛型,泛型由建立Callable對象時指定。微信
public interface Callable<V> {
V call() throws Exception;
}
複製代碼
Runnable對象能夠傳入到Thread類的構造方法中,經過Thread來運行Runnable任務,而Callable接口則不能直接傳入到Thread中來運行,Callable接口一般結合線程池來使用。線程池ThreadPoolExecutor中除了提供execute()方法來提交任務之外,還提供了submit()的三個重載方法來提交任務,這三個方法均有返回值。 ThreadPoolExecutor類繼承了抽象類AbstractExecutorService
,在AbstractExecutorService中定義了submit()重載的三個方法。具體定義以下。數據結構
方法名 | 說明 |
---|---|
Future<?> submit(Runnable task) |
該方法雖然返回值對象是Future,可是因爲提交的是Runnable類型的任務,因此使用Future.get() 獲取結果時會返回null。 |
Future submit(Runnable task,T result) |
方法的返回值對象是Future,經過Future.get() 獲取具體的返回值時,結果與方法的第二個參數result相等。 |
Future submit(Callable task) |
該方法的參數是一個Callable類型 的對象,方法有返回值。調用Future.get()獲取到值就是Callable接口的call()方法返回的值 |
能夠看到,submit()的三個重載方法的返回值均是Future類型的對象,那麼Future又是何方神聖呢?併發
當任務提交到線程池後,咱們可能須要獲取任務的返回值,或者想要知道任務有沒有執行完成,甚至有時候由於特殊狀況須要取消任務,那麼這個時候應該怎麼辦呢?框架
在JUC包下,爲咱們提供了一個工具類:Future
。Future是一個接口,它提供了5個方法,當一個任務經過submit()方法提交到線程池後,線程池會返回一個Future類型的對象,咱們能夠經過Future對象的這5個方法來獲取任務在線程池中的狀態。這些方法定義以下。異步
方法名 | 說明 |
---|---|
boolean cancel(boolean mayInterruptIfRunning) |
用來取消任務,mayInterruptIfRunning參數用來表示是否須要中斷線程,若是傳true,表示須要中斷線程,那麼就會將任務的狀態設置爲INTERRUPTING ;若是爲false,那麼就會將任務的狀態設置爲CANCELLED (關於任務的狀態INTERRUPTING 和CANCELLED 後面會說明) |
boolean isCancelled() |
判斷任務是否已經被取消了,返回true表示被取消了 |
boolean isDone() |
判斷任務是否已經完成 |
V get() |
獲取任務的返回值,會一直阻塞當前線程,直到獲取到任務的返回值 |
V get(long timeout, TimeUnit unit) |
以超時的形式獲取任務的返回值,若是在超時時間內沒獲取到任務的返回值,那麼拋出TimeoutException異常 |
Future接口有一個具體的實現類:FutureTask
。事實上線程池ThreadPoolExecutor的三個submit()重載方法,返回的Future類型的對象,都是FutureTask的實例對象。FutureTask的UML圖以下。工具
從UML圖上能夠看到,FutureTask是直接實現了RunnableFuture
接口,而RunnableFuture接口又繼承了Runnable和Future接口,所以FutureTask既是Runnable類型,又是Future類型。 當調用submit()方法來向線程池中提交任務時,不管提交的是Runnable類型的任務,仍是提交的是Callable類型的任務,最終都是將任務封裝成一個FutureTask對象。下面以Future<T> submit(Callable<T> task)
方法爲例,來看下源碼。源碼分析
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 調用newTaskFor()將Callable任務封裝成一個FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 執行任務
execute(ftask);
return ftask;
}
// newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// 直接new一個FutureTask對象
return new FutureTask<T>(callable);
}
複製代碼
當submit()方法提交的是Runnable任務時,會調用newTaskFor()方法的另外一個重載方法
來將任務封裝成一個FutureTask對象,因此說最終線程池中執行的都是FutureTask類型的任務。(注意,Runnable類型的對象,最終會經過Executors.callable()
方法,將Runnable對象封裝爲一個Callable類型的對象。Executors.callable()的原理是使用適配器模式
,適配器爲 RunnableAdapter
類)性能
知道了Future與這些類之間的關係後,下面就來分析下線程池是如何執行一個FutureTask任務的?以及又是如何經過Future.get()方法就能獲取到任務的返回值的?
想要弄明白Future接口的幾個方法的原理,那麼就必須先搞明白FutureTask的數據接口,以及其設計的原理。
既然想要在線程池外部經過其餘線程獲取到池中任務的狀態,而線程池中的任務都是FutureTask類型,那麼在FutureTask這個對象中,確定存在和任務狀態有關的變量。
在FutureTask中定義了十分重要的屬性。以下表所示。
屬性 | 含義 |
---|---|
int state | state變量用來保存任務的狀態,它的取值有0~6,7個值,每一個值分別表示不一樣的含義,具體含義見下方說明 |
Callable callable | callable表示的是咱們提交的任務,Runnable類型的任務會經過Executors.callable()來轉變爲Callable |
Object outcome | 用來保存Callable的call()方法的返回值 |
Thread runner | 保存執行當前任務的線程 |
WaitNode waiters | 用來保存等待獲取任務返回值的線程的等待隊列,當咱們在主線程中調用Future.get()方法時,就會將主線程封裝成一個WaitNode。當有多個線程同時調用Future.get()方法時,WaitNode會經過next屬性來維護一個鏈表 |
state的取值有7種。每種取值的含義以下代碼註釋。
private volatile int state;
// 任務的初始狀態,當新建一個FutureTask任務時,state值默認爲NEW
private static final int NEW = 0;
// 任務處於完成中,什麼是完成中呢?有兩種狀況
// 1. 任務正常被線程執行完成了,可是尚未將返回值賦值給outcome屬性
// 2. 任務在執行過程當中出現了異常,被捕獲了,而後處理異常了,在將異常對象賦值給outcome屬性以前
private static final int COMPLETING = 1;
// 任務正常被執行完成,並將任務的返回值賦值給outcome屬性以後,會處於NORMAL狀態
private static final int NORMAL = 2;
// 任務出了異常,並將異常對象賦值給outcome屬性以後
private static final int EXCEPTIONAL = 3;
// 調用cancle(false),任務被取消了
private static final int CANCELLED = 4;
// 調用cancle(true),任務取消,可是在線程中斷以前
private static final int INTERRUPTING = 5;
// 調用cancle(true),任務取消,可是在線程中斷以後
private static final int INTERRUPTED = 6;
複製代碼
雖然任務的狀態有7中取值,但大體能夠將其分爲三類:初始狀態、中間狀態、最終狀態。這些狀態的變化關係,以下圖所示。
NEW
;當任務被正常執行完成後,會先將任務的狀態設置爲COMPLETING
;而後將任務的返回值(即Callable的call()方法的返回值)賦值給FutureTask的outcome
屬性,當賦值完成後,再將任務的狀態設置爲NORMAL
。這是一個任務正常執行的流程,也就是對應圖中①
所示的線路。NEW
設置爲COMPLETING
;而後將異常對象賦值給outcome
屬性,當賦值完成後,再將任務狀態設置爲EXCEPTIONAL
。這是任務出現異常的狀況,也就是對應圖中②
所示的線路。NEW
設置爲CANCELLED
,也就是對應圖中③
所對應的路線。INTERRUPTING
;而後調用執行當前任務的線程的interrupt()
方法,最後再設置任務狀態爲INTERRUPTED
,也就是圖中④
所對應的線路。當調用submit()方法提交任務到線程池後,會先調用newTaskFor()
方法將任務封裝成一個FutureTask對象,而後調用execute()方法來執行任務。在execute()方法中會先啓動Worker線程,當線程啓動後,會調用線程的runWorker()方法。在runWorker()
方法中最終會調用到task.run()
方法,也就是FutureTask的run()方法。關於這一步詳細的源碼分析能夠參考這篇文章:線程池ThreadPoolExecutor的實現原理
下面只分析下FutureTask.run()方法。在run()方法中,最終會調用callable屬性的call()方法。當任務正常執行完後,會調用FutureTask的set()
方法來更新任務的狀態以及保存任務的返回值,最後喚醒獲取任務結果的處於等待中的線程。若是出現異常,將會調用setException()
方法來更新任務狀態,保存異常,喚醒等待中的線程。下面是run()方法的源碼,我對源碼進行了刪減,只保留了核心邏輯。
public void run() {
......
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 執行任務
result = c.call();
ran = true;
} catch (Throwable ex) {
// 出異常時將state置爲EXCEPTIONAL
setException(ex);
}
if (ran)
// 設置任務狀態爲COMPLETING,而後保存返回值,最後再設置爲NORMAL
set(result);
}
} finally {
// 其餘處理
......
}
}
複製代碼
對於set()
和setException()
方法,比較簡單,就是經過CAS來更新任務的狀態,而後將任務的返回值賦值給outcome
屬性,最後調用finishCompletion()
方法喚醒waiters
這個屬性構成的等待隊列中的線程。(關於CAS相關的原理和知識,能夠參考這兩篇文章:初識CAS的實現原理 和 Unsafe類的源碼解讀以及使用場景)
接下來結合具體的源碼來分析下Future.get()
方法的執行過程。當調用Future.get()方法時,會調用FutureTask的get()方法。在get()方法,首先判斷任務有沒有完成,若是已經完成了,就直接返回結果,若是沒有完成,則進行等待。
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 若是狀態處於NEW或者COMPLETING狀態,表示任務尚未執行完成,須要等待
if (s <= COMPLETING)
// awaitDone()進行等待
s = awaitDone(false, 0L);
// 返回結果
return report(s);
}
複製代碼
經過調用report(s)
方法返回結果,在report()方法中,會先判斷任務是否是處於NORMAL
狀態,即任務是不是被正常執行完成,只有正常執行完成了,纔會返回結果,不然拋出對應的異常。
private V report(int s) throws ExecutionException {
Object x = outcome;
// 只有任務正常結束時,纔會返回
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
複製代碼
當任務處於NEW或者COMPLETING
狀態時,表示任務正處於執行中或者任務的返回值尚未被賦值給outcome屬性,因此這個時候,還不能返回結果,所以須要進入等待狀態,即調用awaitDone()
方法。在awaitDone()方法中,有一個無限for循環,先判斷任務是不是處於COMPLETING狀態。若是處於COMPLETING狀態,就讓當前線程先放棄CPU的調度權(爲何要放棄CPU的調度權呢?由於從COMPLETING變爲NORMAL狀態,或者其餘狀態,是一段很短的過程,讓當前線程先放棄CPU的調度權,以便讓其餘線程獲得CPU資源,而CPU的時間片也是一段很短的時間,當下次線程在獲取到CPU資源的時候,此時任務的狀態大機率會變爲NORMAL或者其餘最終狀態,因爲代碼是處於for循環中的,因此會進入下一次循環)。若是當前任務不是處於COMPLETING狀態,就會讓線程進行park等待,具體是park超時等待呢,仍是非超時等待呢?由awaitDone()方法傳入的參數決定。(當調用park()方法後,這些線程又是在何時被喚醒的呢?當任務的狀態變爲最終狀態
後,會調用finishCompletion()
方法,喚醒這些處於等待中的線程。)
下面是awaitDone()方法的部分源碼,我對源碼進行了刪減,只保留的主要邏輯。
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
......
for (;;) {
......
// 任務處於COMPLETING中,就讓當前線程先暫時放棄CPU的執行權
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
......
else if (timed) {
// 超時時間計算
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 等待一段時間
LockSupport.parkNanos(this, nanos);
}
else
// 等待
LockSupport.park(this);
}
}
複製代碼