別再問我Runnable、Callable、Future、FutureTask有什麼關聯了

掃描下方二維碼或者微信搜索公衆號菜鳥飛呀飛,便可關注微信公衆號,閱讀更多Spring源碼分析Java併發編程文章。java

微信公衆號

Runnable與Callable

衆所周知,當咱們使用線程來運行Runnable任務時,是不支持獲取返回值的,由於Runnable接口的run()方法使用void修飾的,方法不支持返回值。而在不少場景下,咱們一方面須要經過線程來異步執行任務,以便提高性能,另外一方面還指望能獲取到任務的執行結果。尤爲是在RPC框架中,異步獲取任務返回值,幾乎是每個RPC接口要實現的功能。這個時候,使用Runnable顯然就沒法知足咱們的需求了,所以Callable就出現了。編程

Callable與Runnable相似,它是一個接口,也只有一個方法:call(),不一樣的是Callable的call()方法有是有返回值的,返回值的類型是一個泛型,泛型由建立Callable對象時指定。微信

public interface Callable<V> {
    V call() throws Exception;
}
複製代碼

Runnable對象能夠傳入到Thread類的構造方法中,經過Thread來運行Runnable任務,而Callable接口則不能直接傳入到Thread中來運行,Callable接口一般結合線程池來使用。線程池ThreadPoolExecutor中除了提供execute()方法來提交任務之外,還提供了submit()的三個重載方法來提交任務,這三個方法均有返回值。 ThreadPoolExecutor類繼承了抽象類AbstractExecutorService,在AbstractExecutorService中定義了submit()重載的三個方法。具體定義以下。數據結構

方法名 說明
Future<?> submit(Runnable task) 該方法雖然返回值對象是Future,可是因爲提交的是Runnable類型的任務,因此使用Future.get()獲取結果時會返回null。
Future submit(Runnable task,T result) 方法的返回值對象是Future,經過Future.get()獲取具體的返回值時,結果與方法的第二個參數result相等。
Future submit(Callable task) 該方法的參數是一個Callable類型的對象,方法有返回值。調用Future.get()獲取到值就是Callable接口的call()方法返回的值

能夠看到,submit()的三個重載方法的返回值均是Future類型的對象,那麼Future又是何方神聖呢?併發

Future與FutureTask

當任務提交到線程池後,咱們可能須要獲取任務的返回值,或者想要知道任務有沒有執行完成,甚至有時候由於特殊狀況須要取消任務,那麼這個時候應該怎麼辦呢?框架

在JUC包下,爲咱們提供了一個工具類:Future。Future是一個接口,它提供了5個方法,當一個任務經過submit()方法提交到線程池後,線程池會返回一個Future類型的對象,咱們能夠經過Future對象的這5個方法來獲取任務在線程池中的狀態。這些方法定義以下。異步

方法名 說明
boolean cancel(boolean mayInterruptIfRunning) 用來取消任務,mayInterruptIfRunning參數用來表示是否須要中斷線程,若是傳true,表示須要中斷線程,那麼就會將任務的狀態設置爲INTERRUPTING;若是爲false,那麼就會將任務的狀態設置爲CANCELLED(關於任務的狀態INTERRUPTINGCANCELLED後面會說明)
boolean isCancelled() 判斷任務是否已經被取消了,返回true表示被取消了
boolean isDone() 判斷任務是否已經完成
V get() 獲取任務的返回值,會一直阻塞當前線程,直到獲取到任務的返回值
V get(long timeout, TimeUnit unit) 以超時的形式獲取任務的返回值,若是在超時時間內沒獲取到任務的返回值,那麼拋出TimeoutException異常

Future接口有一個具體的實現類:FutureTask。事實上線程池ThreadPoolExecutor的三個submit()重載方法,返回的Future類型的對象,都是FutureTask的實例對象。FutureTask的UML圖以下。工具

FutureTask類的UML圖

從UML圖上能夠看到,FutureTask是直接實現了RunnableFuture接口,而RunnableFuture接口又繼承了Runnable和Future接口,所以FutureTask既是Runnable類型,又是Future類型。 當調用submit()方法來向線程池中提交任務時,不管提交的是Runnable類型的任務,仍是提交的是Callable類型的任務,最終都是將任務封裝成一個FutureTask對象。下面以Future<T> submit(Callable<T> task)方法爲例,來看下源碼。源碼分析

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // 調用newTaskFor()將Callable任務封裝成一個FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    // 執行任務
    execute(ftask);
    return ftask;
}

// newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    // 直接new一個FutureTask對象
    return new FutureTask<T>(callable);
}
複製代碼

當submit()方法提交的是Runnable任務時,會調用newTaskFor()方法的另外一個重載方法來將任務封裝成一個FutureTask對象,因此說最終線程池中執行的都是FutureTask類型的任務。(注意,Runnable類型的對象,最終會經過Executors.callable()方法,將Runnable對象封裝爲一個Callable類型的對象。Executors.callable()的原理是使用適配器模式,適配器爲 RunnableAdapter類)性能

知道了Future與這些類之間的關係後,下面就來分析下線程池是如何執行一個FutureTask任務的?以及又是如何經過Future.get()方法就能獲取到任務的返回值的?

設計原理與數據結構

想要弄明白Future接口的幾個方法的原理,那麼就必須先搞明白FutureTask的數據接口,以及其設計的原理。

既然想要在線程池外部經過其餘線程獲取到池中任務的狀態,而線程池中的任務都是FutureTask類型,那麼在FutureTask這個對象中,確定存在和任務狀態有關的變量。

在FutureTask中定義了十分重要的屬性。以下表所示。

屬性 含義
int state state變量用來保存任務的狀態,它的取值有0~6,7個值,每一個值分別表示不一樣的含義,具體含義見下方說明
Callable callable callable表示的是咱們提交的任務,Runnable類型的任務會經過Executors.callable()來轉變爲Callable
Object outcome 用來保存Callable的call()方法的返回值
Thread runner 保存執行當前任務的線程
WaitNode waiters 用來保存等待獲取任務返回值的線程的等待隊列,當咱們在主線程中調用Future.get()方法時,就會將主線程封裝成一個WaitNode。當有多個線程同時調用Future.get()方法時,WaitNode會經過next屬性來維護一個鏈表

state的取值有7種。每種取值的含義以下代碼註釋。

private volatile int state;
// 任務的初始狀態,當新建一個FutureTask任務時,state值默認爲NEW
private static final int NEW          = 0;
// 任務處於完成中,什麼是完成中呢?有兩種狀況
// 1. 任務正常被線程執行完成了,可是尚未將返回值賦值給outcome屬性
// 2. 任務在執行過程當中出現了異常,被捕獲了,而後處理異常了,在將異常對象賦值給outcome屬性以前
private static final int COMPLETING   = 1;
// 任務正常被執行完成,並將任務的返回值賦值給outcome屬性以後,會處於NORMAL狀態
private static final int NORMAL       = 2;
// 任務出了異常,並將異常對象賦值給outcome屬性以後
private static final int EXCEPTIONAL  = 3;
// 調用cancle(false),任務被取消了
private static final int CANCELLED    = 4;
// 調用cancle(true),任務取消,可是在線程中斷以前
private static final int INTERRUPTING = 5;
// 調用cancle(true),任務取消,可是在線程中斷以後
private static final int INTERRUPTED  = 6;
複製代碼

雖然任務的狀態有7中取值,但大體能夠將其分爲三類:初始狀態、中間狀態、最終狀態。這些狀態的變化關係,以下圖所示。

任務狀態變化

  • 當一個任務被提交到線程池後,它的初始狀態爲NEW;當任務被正常執行完成後,會先將任務的狀態設置爲COMPLETING;而後將任務的返回值(即Callable的call()方法的返回值)賦值給FutureTask的outcome屬性,當賦值完成後,再將任務的狀態設置爲NORMAL。這是一個任務正常執行的流程,也就是對應圖中所示的線路。
  • 當任務被提交到線程池後,線程在執行任務中出現了異常,那麼會現將任務的狀態由NEW設置爲COMPLETING;而後將異常對象賦值給outcome屬性,當賦值完成後,再將任務狀態設置爲EXCEPTIONAL。這是任務出現異常的狀況,也就是對應圖中所示的線路。
  • 當任務被提交到線程池後,若是調用Future對象的cancle()方法,當cancle()傳入的傳入的參數爲false時,會直接將任務的狀態由NEW設置爲CANCELLED,也就是對應圖中所對應的路線。
  • 當cancle()方法傳入的參數爲true時,會先將任務狀態設置爲INTERRUPTING;而後調用執行當前任務的線程的interrupt()方法,最後再設置任務狀態爲INTERRUPTED,也就是圖中所對應的線路。

源碼分析

當調用submit()方法提交任務到線程池後,會先調用newTaskFor()方法將任務封裝成一個FutureTask對象,而後調用execute()方法來執行任務。在execute()方法中會先啓動Worker線程,當線程啓動後,會調用線程的runWorker()方法。在runWorker()方法中最終會調用到task.run()方法,也就是FutureTask的run()方法。關於這一步詳細的源碼分析能夠參考這篇文章:線程池ThreadPoolExecutor的實現原理

下面只分析下FutureTask.run()方法。在run()方法中,最終會調用callable屬性的call()方法。當任務正常執行完後,會調用FutureTask的set()方法來更新任務的狀態以及保存任務的返回值,最後喚醒獲取任務結果的處於等待中的線程。若是出現異常,將會調用setException()方法來更新任務狀態,保存異常,喚醒等待中的線程。下面是run()方法的源碼,我對源碼進行了刪減,只保留了核心邏輯。

public void run() {
    ......
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 執行任務
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                // 出異常時將state置爲EXCEPTIONAL
                setException(ex);
            }
            if (ran)
                // 設置任務狀態爲COMPLETING,而後保存返回值,最後再設置爲NORMAL
                set(result);
        }
    } finally {
        // 其餘處理
        ......
    }
}
複製代碼

對於set()setException()方法,比較簡單,就是經過CAS來更新任務的狀態,而後將任務的返回值賦值給outcome屬性,最後調用finishCompletion()方法喚醒waiters這個屬性構成的等待隊列中的線程。(關於CAS相關的原理和知識,能夠參考這兩篇文章:初識CAS的實現原理Unsafe類的源碼解讀以及使用場景

接下來結合具體的源碼來分析下Future.get()方法的執行過程。當調用Future.get()方法時,會調用FutureTask的get()方法。在get()方法,首先判斷任務有沒有完成,若是已經完成了,就直接返回結果,若是沒有完成,則進行等待。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 若是狀態處於NEW或者COMPLETING狀態,表示任務尚未執行完成,須要等待
    if (s <= COMPLETING)
        // awaitDone()進行等待
        s = awaitDone(false, 0L);
    // 返回結果
    return report(s);
}
複製代碼

經過調用report(s)方法返回結果,在report()方法中,會先判斷任務是否是處於NORMAL狀態,即任務是不是被正常執行完成,只有正常執行完成了,纔會返回結果,不然拋出對應的異常。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 只有任務正常結束時,纔會返回
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
複製代碼

當任務處於NEW或者COMPLETING狀態時,表示任務正處於執行中或者任務的返回值尚未被賦值給outcome屬性,因此這個時候,還不能返回結果,所以須要進入等待狀態,即調用awaitDone()方法。在awaitDone()方法中,有一個無限for循環,先判斷任務是不是處於COMPLETING狀態。若是處於COMPLETING狀態,就讓當前線程先放棄CPU的調度權(爲何要放棄CPU的調度權呢?由於從COMPLETING變爲NORMAL狀態,或者其餘狀態,是一段很短的過程,讓當前線程先放棄CPU的調度權,以便讓其餘線程獲得CPU資源,而CPU的時間片也是一段很短的時間,當下次線程在獲取到CPU資源的時候,此時任務的狀態大機率會變爲NORMAL或者其餘最終狀態,因爲代碼是處於for循環中的,因此會進入下一次循環)。若是當前任務不是處於COMPLETING狀態,就會讓線程進行park等待,具體是park超時等待呢,仍是非超時等待呢?由awaitDone()方法傳入的參數決定。(當調用park()方法後,這些線程又是在何時被喚醒的呢?當任務的狀態變爲最終狀態後,會調用finishCompletion()方法,喚醒這些處於等待中的線程。)

下面是awaitDone()方法的部分源碼,我對源碼進行了刪減,只保留的主要邏輯。

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    ......
    for (;;) {
        ......
        // 任務處於COMPLETING中,就讓當前線程先暫時放棄CPU的執行權
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        ......                               
        else if (timed) {
        	// 超時時間計算
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            // 等待一段時間
            LockSupport.parkNanos(this, nanos);
        }
        else
        	// 等待
            LockSupport.park(this);
    }
}
複製代碼

總結

  • 本文主要介紹了Runnable接口和Callable接口的區別,前者沒有返回值,能被Thread直接執行;後者有返回值,不能被Thread直接執行須要經過線程池來執行。
  • 接着介紹了Future接口的5個方法,以及它的實現類FutureTask的幾個重要屬性以及數據結構。不管是Runnable仍是Callable對象,當提交到線程池後,均是被封裝成一個FutureTask對象後執行。對於Future的使用場景,在Netty和Dubbo均有大量的應用。
  • 最後結合源碼詳細介紹了FutureTask的get()方法和run()方法的實現原理。

推薦

微信公衆號
相關文章
相關標籤/搜索