我靠(call) ,個人將來(Future)在哪裏???


點擊藍色「Java建設者 」關注我喲java

加個「星標」,及時閱讀最新技術文章web


這是Java建設者第 107 篇原創文章


你們好,我是 cxuan,以前一直在分享操做系統相關的文章,兜兜轉轉回到了 Java 文章分享,本篇文章是讀者投稿,來和你一塊兒聊一聊 Future ~算法

咱們你們都知道,在 Java 中建立線程主要有三種方式:設計模式

  • 繼承 Thread 類;
  • 實現 Runnable 接口;
  • 實現 Callable 接口。

然後二者的區別在於 Callable 接口中的 call() 方法能夠異步地返回一個計算結果 Future,而且通常須要配合 ExecutorService 來執行。這一套操做在代碼實現上彷佛也並不難,但是對於call()方法具體怎麼(被ExecutorService)執行的,以及 Future 這個結果是怎麼獲取的,卻又不是很清楚了。安全

那麼本篇文章,咱們就一塊兒來學習下 Callable 接口以及 Future 的使用,主要面向兩個問題:微信

  • 承載着具體任務的 call() 方法如何被執行的?
  • 任務的執行結果如何獲得?

你可能會說,這兩個難道不是一個問題嗎?任務執行了就會有返回結果,而返回結果也必定是任務執行了才返回的,難道還能返回一個其餘任務的結果麼??不要着急,耐心的看下去,你就會發現,這兩個還真的就是一個問題。app

本文將分爲兩個部分,第一部分分別介紹 任務執行、以及結果這三個概念在 Java API 中的實體和各自的繼承關係,第二部分經過一個簡單的例子回顧他們的用法,再理解下這兩個問題的答案。框架

Callable、Executor 與 Future

既然是一個任務被執行並返回結果,那麼咱們先來看看具體的任務,也就是 Callable 接口。less

任務:Callable

很是簡單,只包含一個有泛型「返回值」的 call() 方法,須要在最後返回定義類型的結果。若是任務沒有須要返回的結果,那麼將泛型 V 設爲 void 並return null;就能夠了。對比的是 Runnable,另外一個明顯的區別則是 Callable能夠拋出異常。異步

public interface Callable<V{
    call() throws Exception;
}


public interface Runnable {
    public abstract void run();
}

執行:ExecutorService

說到線程就少不了線程池,而說到線程池確定離不開 Executor 接口。下面這幅圖是 Executor 的框架,咱們經常使用的是其中的兩個具體實現類 ThreadPoolExecutor 以及 ScheduledThreadPoolExecutor,在 Executors 類中經過靜態方法獲取。Executors 中包含了線程池以及線程工廠的構造,與 Executor 接口的關係相似於 Collection 接口和 Collections 類的關係。

那麼咱們自頂向下,從源碼上了解一下 Executor 框架,學習學習任務是如何被執行的。首先是 Executor 接口,其中只定義了 execute() 方法。

public interface Executor {
    void execute(Runnable command);
}

ExecutorService 接口繼承了 Executor 接口,主要擴展了一系列的 submit() 方法以及對 executor 的終止和判斷狀態。以第一個<T> Future<T> submit(Callable<T> task);爲例,其中 task 爲用戶定義的執行的異步任務,Future 表示了任務的執行結果,泛型 T 表明任務結果的類型。

public interface ExecutorService extends Executor {

    void shutdown();                // 現有任務完成後中止線程池
 
    List<Runnable> shutdownNow();   // 當即中止線程池

    boolean isShutdown();           // 判斷是否已中止

    boolean isTerminated();

    <T> Future<T> submit(Callable<T> task);        // 提交Callale任務

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    // 針對Callable集合的invokeAll()等方法
}

抽象類AbstractExecutorServiceThreadPoolExecutor 的基類,在下面的代碼中,它實現了ExecutorService 接口中的 submit() 方法。註釋中是對應的 newTaskFor() 方法的代碼,很是簡單,就是將傳入的Callable 或 Runnable 參數封裝成一個 FutureTask 對象。

// 1.第一個重載方法,參數爲Callable
public <T> Future<T> submit(Callable<T> task) {
  if (task == nullthrow new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  // return new FutureTask<T>(callable);
  execute(ftask);
  return ftask;
}

// 2.第二個重載方法,參數爲Runnable
public Future<?> submit(Runnable task) {
  if (task == nullthrow new NullPointerException();
  RunnableFuture<Void> ftask = newTaskFor(task, null);
  // return new FutureTask<T>(task, null);
  execute(ftask);
  return ftask;
}

// 3.第三個重載方法,參數爲Runnable + 返回對象
public <T> Future<T> submit(Runnable task, T result) {
  if (task == nullthrow new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task, result);
  // return new FutureTask<T>(task, result);
  execute(ftask);
  return ftask;
}

那麼也就是說,不管傳入的是 Callable 仍是 Runnable,submit() 方法其實就作了三件事

具體來講,submit() 中首先生成了一個 RunnableFuture 引用的 FutureTask 實例,而後調用 execute() 方法來執行它,那麼咱們能夠推測 FutureTask 繼承自 RunnableFuture,而 RunnableFuture 又實現了 Runnable,由於execute() 的參數應爲 Runnable 類型。上面還涉及到了 FutureTask 的構造函數,也來看一下。

public FutureTask(Callable<V> callable) {
  this.callable = callable;
  this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
  this.callable = Executors.callable(runnable, result); // 經過適配器將runnable在call()中執行並返回result
  this.state = NEW;
}

FutureTask 共有兩個構造方法。第一個構造方法比較簡單,對應上面的第一個 submit(),採用組合的方式封裝Callable 並將狀態設爲NEW;而第二個構造方法對應上面的後兩個 submit() 重載,不一樣之處是首先使用了Executors.callable來將 Runnable 和 result 組合成 Callable,這裏採用了適配器RunnableAdapter implements Callable,巧妙地在 call() 中執行 Runnable 並返回結果。

static final class RunnableAdapter<Timplements Callable<T{
  final Runnable task;
  final T result;                // 返回的結果;顯然:須要在run()中賦值

  RunnableAdapter(Runnable task, T result) {
    this.task = task;
    this.result = result;
  }
  public T call() {
    task.run();
    return result;
  }
}

在適配器設計模式中,一般包含目標接口 Target、適配器 Adapter 和被適配者 Adaptee 三類角色,其中目標接口表明客戶端(當前業務系統)所須要的功能,一般爲藉口或抽象類;被適配者爲現存的不能知足使用需求的類;適配器是一個轉換器,也稱 wrapper,用於給被適配者添加目標功能,使得客戶端能夠按照目標接口的格式正確訪問。對於 RunnableAdapter 來講,Callable 是其目標接口,而 Runnable 則是被適配者。RunnableAdapter 經過覆蓋 call() 方法使其可按照 Callable 的要求來使用,同時其構造方法中接收被適配者和目標對象,知足了 call() 方法有返回值的要求。

那麼總結一下 submit() 方法執行的流程,就是:「Callable 被封裝在 Runnable 的子類中傳入 execute() 得以執行」

結果:Future

要說 Future 就是異步任務的執行結果其實並不許確,由於它表明了一個任務的執行過程,有狀態、能夠被取消,而 get() 方法的返回值纔是任務的結果。

public interface Future<V{

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    get() throws InterruptedException, ExecutionException;

    get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException
;
}

咱們在上面中還提到了 RuunableFuture 和 FutureTask。從官方的註釋來看,RuunableFuture 就是一個能夠 run的 future,實現了 Runnable 和 Future 兩個接口,在 run() 方法中執行完計算時應該將結果保存起來以便經過 get()獲取。

public interface RunnableFuture<Vextends RunnableFuture<V{
    /**
     * Sets this Future to the result of its computation unless it has been cancelled.
     */

    void run();
}

FutureTask 直接實現了 RunnableFuture 接口,做爲執行過程,共有下面這幾種狀態,其中 COMPLETING 爲一個暫時狀態,表示正在設置結果或異常,對應的,設置完成後狀態變爲 NORMAL 或 EXCEPTIONAL;CANCELLED、INTERRUPTED 表示任務被取消或中斷。在上面的構造方法中,將 state 初始化爲 NEW。

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

而後是 FutureTask 的主要內容,主要是 run() 和 get()。注意 outcome 的註釋,不管是否發生異常返回的都是這個 outcome,由於在執行中若是執行成功就將結果設置給了它(set()),而發生異常時將異常賦給了他(setException()),而在獲取結果時也都返回了 outcome(經過report())。

public class FutureTask<Vimplements RunnableFuture<V{
    
    private Callable<V> callable;         // target,待執行的任務
    
    /** 保存執行結果或異常,在get()方法中返回/拋出 */
    private Object outcome; // 非volatile,經過CAS保證線程安全
    
    
    public void run() {
        ......
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();            // 調用call()執行用戶任務並獲取結果
                ran = true;                   // 執行完成,ran置爲true
            } catch (Throwable ex) {          // 調用call()出現異常,而run()方法繼續執行
                 result = null;
                 ran = false;
                 setException(ex);            
                 // setException(Throwable t): compareAndSwapInt(NEW, COMPLETING);  outcome = t;      
            }
            if (ran)
                set(result);                  
             // set(V v): compareAndSwapInt(NEW, COMPLETING);  outcome = v;
        }
    }
    
    
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false0L);         // 加入隊列等待COMPLETING完成,可響應超時、中斷
        return report(s);
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException 
{
        // 超時等待
    }
    
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)                              // 將outcome做爲執行結果返回
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);   // 將outcome做爲捕獲的返回
    }
}

FutureTask 實現了 RunnableFuture 接口,因此有兩方面的做用。

  • 第一,做爲 Runnable 傳入 execute() 方法來執行,同時封裝 Callable 對象並在 run() 中調用其 call() 方法;
  • 第二,做爲 Future 管理任務的執行狀態,將 call() 的返回值保存在 outcome 中以經過 get() 獲取。這彷佛就能回答開頭的兩個問題,而且渾然天成,就好像是一個問題,除非發生異常的時候返回的不是任務的結果而是異常對象。

總結一下繼承關係:

2、使用舉例

文章的標題有點唬人,說到底仍是講 Callable 的用法。如今咱們知道了 Future 表明了任務執行的過程和結果,做爲 call() 方法的返回值來獲取執行結果;而 FutureTask 是一個 Runnable 的 Future,既是任務執行的過程和結果,又是 call 方法最終執行的載體。下面經過一個例子看看他們在使用上的區別。

首先建立一個任務,即定義一個任務類實現 Callable 接口,在 call() 方法裏添加咱們的操做,這裏用耗時三秒而後返回 100 模擬計算過程。

class MyTask implements Callable<Integer{
    @Override
    public Integer call() throws Exception {
        System.out.println("子線程開始計算...");
        for (int i=0;i<3;++i){
            Thread.sleep(1000);
            System.out.println("子線程計算中,用時 "+(i+1)+" 秒");
        }
        System.out.println("子線程計算完成,返回:100");
        return 100;
    }
}

而後呢,建立一個線程池,並實例化一個 MyTask 備用。

ExecutorService executor = Executors.newCachedThreadPool();
MyTask task = new MyTask();

如今,分別使用 Future 和 FutureTask 來獲取執行結果,看看他們有什麼區別。

使用Future

Future 通常做爲 submit() 的返回值使用,並在主線程中以阻塞的方式獲取異步任務的執行結果。

System.out.println("主線程啓動線程池");
Future<Integer> future = executor.submit(task);
System.out.println("主線程獲得返回結果:"+future.get());
executor.shutdown();

看看輸出結果:

主線程啓動線程池
子線程開始計算...
子線程計算中,用時 1 秒
子線程計算中,用時 2 秒
子線程計算中,用時 3 秒
子線程計算完成,返回:100
主線程獲得返回結果:100

因爲 get() 方法阻塞獲取結果,因此輸出順序爲子線程計算完成後主線程輸出結果。

使用FutureTask

因爲 FutureTask 集「任務與結果」於一身,因此咱們可使用 FutureTask 自身而非返回值來管理任務,這須要首先利用 Callable 對象來構造 FutureTask,並調用不一樣的submit()重載方法。

System.out.println("主線程啓動線程池");
FutureTask<Integer> futureTask = new FutureTask<>(task);
executor.submit(futureTask);                                 // 做爲Ruunable傳入submit()中
System.out.println("主線程獲得返回結果:"+futureTask.get());    // 做爲Future獲取結果
executor.shutdown();

這段程序的輸出與上面中徹底相同,其實二者在實際執行中的區別也不大,雖然前者調用了submit(Callable<T> task)然後者調用了submit(Runnable task),但最終都經過execute(futuretask)來把任務加入線程池中。

總結

上面大費周章其實只是儘量細緻地講清楚了 Callable 中的任務是如何執行的,總結起來就是:

  • 線程池中,submit() 方法實際上將 Callable 封裝在 FutureTask 中,將其做爲 Runnable 的子類傳給 execute()真正執行;
  • FutureTask 在 run() 中調用 Callable 對象的 call() 方法並接收返回值或捕獲異常保存在 Object outcome 中,同時管理執行過程當中的狀態 state
  • FutureTask 同時做爲 Future 的子類,經過 get() 返回任務的執行結果,若未執行完成則經過等待隊列進行阻塞等待完成;

FutureTask 做爲一個 Runnable 的 Future,其中最重要的兩個方法以下。


往期精選

再見,Navicat!同事安利的這個IDEA的兄弟,真香!

Linux 是如何管理內存的?

最棒 Spring Boot 乾貨總結 !

換人!這些算法都不會還學什麼操做系統



本文分享自微信公衆號 - Java建設者(javajianshe)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索