深刻學習 FutureTask

轉自:http://www.importnew.com/25286.htmlhtml

第一部分:What

在Java中通常經過繼承Thread類或者實現Runnable接口這兩種方式來建立多線程,可是這兩種方式都有個缺陷,就是不能在執行完成後獲取執行的結果,所以Java 1.5以後提供了Callable和Future接口,經過它們就能夠在任務執行完畢以後獲得任務的執行結果。本文會簡要的介紹使用方法,而後會從源代碼角度分析下具體的實現原理。
本文以Java 1.7的代碼進行分析。java

第二部分:How

Callable接口多線程

對於須要執行的任務須要實現Callable接口,Callable接口定義以下:異步

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

 

能夠看到Callable是個泛型接口,泛型V就是要call()方法返回的類型。Callable接口和Runnable接口很像,均可以被另一個線程執行,可是正如前面所說的,Runnable不會返回數據也不能拋出異常。ide

Future接口函數

Future接口表明異步計算的結果,經過Future接口提供的方法能夠查看異步計算是否執行完成,或者等待執行結果並獲取執行結果,同時還能夠取消執行。Future接口的定義以下:this

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

 

  • cancel():cancel()方法用來取消異步任務的執行。若是異步任務已經完成或者已經被取消,或者因爲某些緣由不能取消,則會返回false。若是任務尚未被執行,則會返回true而且異步任務不會被執行。若是任務已經開始執行了可是尚未執行完成,若mayInterruptIfRunning爲true,則會當即中斷執行任務的線程並返回true,若mayInterruptIfRunning爲false,則會返回true且不會中斷任務執行線程。
  • isCanceled():判斷任務是否被取消,若是任務在結束(正常執行結束或者執行異常結束)前被取消則返回true,不然返回false。
  • isDone():判斷任務是否已經完成,若是完成則返回true,不然返回false。須要注意的是:任務執行過程當中發生異常、任務被取消也屬於任務已完成,也會返回true。
  • get():獲取任務執行結果,若是任務還沒完成則會阻塞等待直到任務執行完成。若是任務被取消則會拋出CancellationException異常,若是任務執行過程發生異常則會拋出ExecutionException異常,若是阻塞等待過程當中被中斷則會拋出InterruptedException異常。
  • get(long timeout,Timeunit unit):帶超時時間的get()版本,若是阻塞等待過程當中超時則會拋出TimeoutException異常。

FutureTaskspa

Future只是一個接口,不能直接用來建立對象,FutureTask是Future的實現類,
FutureTask的繼承圖以下:線程

能夠看到,FutureTask實現了RunnableFuture接口,則RunnableFuture接口繼承了Runnable接口和Future接口,因此FutureTask既能當作一個Runnable直接被Thread執行,也能做爲Future用來獲得Callable的計算結果。指針

使用

FutureTask通常配合ExecutorService來使用,也能夠直接經過Thread來使用。

package com.beautyboss.slogen.callback;
 
import java.util.concurrent.*;
 
/**
 * Author  : Slogen
 * AddTime : 17/6/4
 * Email   : huangjian13@meituan.com
 */
public class CallDemo {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
 
        /**
         * 第一種方式:Future + ExecutorService
         * Task task = new Task();
         * ExecutorService service = Executors.newCachedThreadPool();
         * Future<Integer> future = service.submit(task1);
         * service.shutdown();
         */
 
 
        /**
         * 第二種方式: FutureTask + ExecutorService
         * ExecutorService executor = Executors.newCachedThreadPool();
         * Task task = new Task();
         * FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
         * executor.submit(futureTask);
         * executor.shutdown();
         */
 
        /**
         * 第三種方式:FutureTask + Thread
         */
 
        // 2. 新建FutureTask,須要一個實現了Callable接口的類的實例做爲構造函數參數
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());
        // 3. 新建Thread對象並啓動
        Thread thread = new Thread(futureTask);
        thread.setName("Task thread");
        thread.start();
 
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");
 
        // 4. 調用isDone()判斷任務是否結束
        if(!futureTask.isDone()) {
            System.out.println("Task is not done");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int result = 0;
        try {
            // 5. 調用get()方法獲取任務結果,若是任務沒有執行完成則阻塞等待
            result = futureTask.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
 
        System.out.println("result is " + result);
 
    }
 
    // 1. 繼承Callable接口,實現call()方法,泛型參數爲要返回的類型
    static class Task  implements Callable<Integer> {
 
        @Override
        public Integer call() throws Exception {
            System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");
            int result = 0;
            for(int i = 0; i < 100;++i) {
                result += i;
            }
 
            Thread.sleep(3000);
            return result;
        }
    }
}

 

第三部分:Why

構造函數

先從FutureTask的構造函數看起,FutureTask有兩個構造函數,其中一個以下:

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
}

 這個構造函數會把傳入的Callable變量保存在this.callable字段中,該字段定義爲private Callable<V> callable;用來保存底層的調用,在被執行完成之後會指向null,接着會初始化state字段爲NEW。state字段用來保存FutureTask內部的任務執行狀態,一共有7中狀態,每種狀態及其對應的值以下:

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

 

其中須要注意的是state是volatile類型的,也就是說只要有任何一個線程修改了這個變量,那麼其餘全部的線程都會知道最新的值。

爲了後面更好的分析FutureTask的實現,這裏有必要解釋下各個狀態。

  • NEW:表示是個新的任務或者還沒被執行完的任務。這是初始狀態。
  • COMPLETING:任務已經執行完成或者執行任務的時候發生異常,可是任務執行結果或者異常緣由尚未保存到outcome字段(outcome字段用來保存任務執行結果,若是發生異常,則用來保存異常緣由)的時候,狀態會從NEW變動到COMPLETING。可是這個狀態會時間會比較短,屬於中間狀態。
  • NORMAL:任務已經執行完成而且任務執行結果已經保存到outcome字段,狀態會從COMPLETING轉換到NORMAL。這是一個最終態。
  • EXCEPTIONAL:任務執行發生異常而且異常緣由已經保存到outcome字段中後,狀態會從COMPLETING轉換到EXCEPTIONAL。這是一個最終態。
  • CANCELLED:任務還沒開始執行或者已經開始執行可是尚未執行完成的時候,用戶調用了cancel(false)方法取消任務且不中斷任務執行線程,這個時候狀態會從NEW轉化爲CANCELLED狀態。這是一個最終態。
  • INTERRUPTING: 任務還沒開始執行或者已經執行可是尚未執行完成的時候,用戶調用了cancel(true)方法取消任務而且要中斷任務執行線程可是尚未中斷任務執行線程以前,狀態會從NEW轉化爲INTERRUPTING。這是一箇中間狀態。
  • INTERRUPTED:調用interrupt()中斷任務執行線程以後狀態會從INTERRUPTING轉換到INTERRUPTED。這是一個最終態。

有一點須要注意的是,全部值大於COMPLETING的狀態都表示任務已經執行完成(任務正常執行完成,任務執行異常或者任務被取消)。

各個狀態之間的可能轉換關係以下圖所示:

另一個構造函數以下,

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
}

 

這個構造函數會把傳入的Runnable封裝成一個Callable對象保存在callable字段中,同時若是任務執行成功的話就會返回傳入的result。這種狀況下若是不須要返回值的話能夠傳入一個null。

順帶看下Executors.callable()這個方法,這個方法的功能是把Runnable轉換成Callable,代碼以下:

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

 能夠看到這裏採用的是適配器模式,調用RunnableAdapter<T>(task, result)方法來適配,實現以下:

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

 

這個適配器很簡單,就是簡單的實現了Callable接口,在call()實現中調用Runnable.run()方法,而後把傳入的result做爲任務的結果返回。

在new了一個FutureTask對象以後,接下來就是在另外一個線程中執行這個Task,不管是經過直接new一個Thread仍是經過線程池,執行的都是run()方法,接下來就看看run()方法的實現。

run()

run()方法實現以下:

public void run() {
    // 1. 狀態若是不是NEW,說明任務或者已經執行過,或者已經被取消,直接返回
    // 2. 狀態若是是NEW,則嘗試把當前執行線程保存在runner字段中
    // 若是賦值失敗則直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 3. 執行任務
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 4. 任務異常
                setException(ex);
            }
            if (ran)
                // 4. 任務正常執行完畢
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        // 5. 若是任務被中斷,執行中斷處理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

 

run()方法首先會

  1. 判斷當前任務的state是否等於NEW,若是不爲NEW則說明任務或者已經執行過,或者已經被取消,直接返回。
  2. 若是狀態爲NEW則接着會經過unsafe類把任務執行線程引用CAS的保存在runner字段中,若是保存失敗,則直接返回。
  3. 執行任務。

若是任務執行發生異常,則調用setException()方法保存異常信息。setException()方法以下:

 

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

 

在setException()方法中

  • 首先會CAS的把當前的狀態從NEW變動爲COMPLETING狀態。
  • 把異常緣由保存在outcome字段中,outcome字段用來保存任務執行結果或者異常緣由。
  • CAS的把當前任務狀態從COMPLETING變動爲EXCEPTIONAL。這個狀態轉換對應着上圖中的二。
  • 調用finishCompletion()。關於這個方法後面在分析。

5. 若是任務成功執行則調用set()方法設置執行結果,該方法實現以下:

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

 

這個方法跟上面分析的setException()差很少,

  • 首先會CAS的把當前的狀態從NEW變動爲COMPLETING狀態。
  • 把任務執行結果保存在outcome字段中。
  • CAS的把當前任務狀態從COMPLETING變動爲NORMAL。這個狀態轉換對應着上圖中的一。
  • 調用finishCompletion()。

發起任務線程跟執行任務線程一般狀況下都不會是同一個線程,在任務執行線程執行任務的時候,任務發起線程能夠查看任務執行狀態、獲取任務執行結果、取消任務等等操做,接下來分析下這些操做。

get()

任務發起線程能夠調用get()方法來獲取任務執行結果,若是此時任務已經執行完畢則會直接返回任務結果,若是任務還沒執行完畢,則調用方會阻塞直到任務執行結束返回結果爲止。get()方法實現以下:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

 

get()方法實現比較簡單,會

  1. 判斷任務當前的state <= COMPLETING是否成立。前面分析過,COMPLETING狀態是任務是否執行完成的臨界狀態。
  2. 若是成立,代表任務尚未結束(這裏的結束包括任務正常執行完畢,任務執行異常,任務被取消),則會調用awaitDone()進行阻塞等待。
  3. 若是不成立代表任務已經結束,調用report()返回結果。

awaitDone()

當調用get()獲取任務結果可是任務還沒執行完成的時候,調用線程會調用awaitDone()方法進行阻塞等待,該方法定義以下:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    // 計算等待截止時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 1. 判斷阻塞線程是否被中斷,若是被中斷則在等待隊
        // 列中刪除該節點並拋出InterruptedException異常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
 
        // 2. 獲取當前狀態,若是狀態大於COMPLETING
        // 說明任務已經結束(要麼正常結束,要麼異常結束,要麼被取消)
        // 則把thread顯示置空,並返回結果
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 3. 若是狀態處於中間狀態COMPLETING
        // 表示任務已經結束可是任務執行線程還沒來得及給outcome賦值
        // 這個時候讓出執行權讓其餘線程優先執行
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 4. 若是等待節點爲空,則構造一個等待節點
        else if (q == null)
            q = new WaitNode();
        // 5. 若是尚未入隊列,則把當前節點加入waiters首節點並替換原來waiters
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                    q.next = waiters, q);
        else if (timed) {
            // 若是須要等待特定時間,則先計算要等待的時間
            // 若是已經超時,則刪除對應節點並返回對應的狀態
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            // 6. 阻塞等待特定時間
            LockSupport.parkNanos(this, nanos);
        }
        else
            // 6. 阻塞等待直到被其餘線程喚醒
            LockSupport.park(this);
    }
}

 

awaitDone()中有個死循環,每一次循環都會

  1. 判斷調用get()的線程是否被其餘線程中斷,若是是的話則在等待隊列中刪除對應節點而後拋出InterruptedException異常。
  2. 獲取任務當前狀態,若是當前任務狀態大於COMPLETING則表示任務執行完成,則把thread字段置null並返回結果。
  3. 若是任務處於COMPLETING狀態,則表示任務已經處理完成(正常執行完成或者執行出現異常),可是執行結果或者異常緣由尚未保存到outcome字段中。這個時候調用線程讓出執行權讓其餘線程優先執行。
  4. 若是等待節點爲空,則構造一個等待節點WaitNode。
  5. 若是第四步中新建的節點還沒如隊列,則CAS的把該節點加入waiters隊列的首節點。
  6. 阻塞等待。

假設當前state=NEW且waiters爲NULL,也就是說尚未任何一個線程調用get()獲取執行結果,這個時候有兩個線程threadA和threadB前後調用get()來獲取執行結果。再假設這兩個線程在加入阻塞隊列進行阻塞等待以前任務都沒有執行完成且threadA和threadB都沒有被中斷的狀況下(由於若是threadA和threadB在進行阻塞等待結果以前任務就執行完成或線程自己被中斷的話,awaitDone()就執行結束返回了),執行過程是這樣的,以threadA爲例:

  1. 第一輪for循環,執行的邏輯是q == null,因此這時候會新建一個節點q。第一輪循環結束。
  2. 第二輪for循環,執行的邏輯是!queue,這個時候會把第一輪循環中生成的節點的netx指針指向waiters,而後CAS的把節點q替換waiters。也就是把新生成的節點添加到waiters鏈表的首節點。若是替換成功,queued=true。第二輪循環結束。
  3. 第三輪for循環,進行阻塞等待。要麼阻塞特定時間,要麼一直阻塞知道被其餘線程喚醒

在threadA和threadB都阻塞等待以後的waiters結果如圖

cancel()方法會作下面幾件事:

1 .判斷任務當前執行狀態,若是任務狀態不爲NEW,則說明任務或者已經執行完成,或者執行異常,不能被取消,直接返回false表示執行失敗。

2. 判斷須要中斷任務執行線程,則

  • 把任務狀態從NEW轉化到INTERRUPTING。這是個中間狀態。
  • 中斷任務執行線程。
  • 修改任務狀態爲INTERRUPTED。這個轉換過程對應上圖中的四。

3. 若是不須要中斷任務執行線程,直接把任務狀態從NEW轉化爲CANCELLED。若是轉化失敗則返回false表示取消失敗。這個轉換過程對應上圖中的四。

4. 調用finishCompletion()。

finishCompletion()

根據前面的分析,不論是任務執行異常仍是任務正常執行完畢,或者取消任務,最後都會調用finishCompletion()方法,該方法實現以下:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
 
    done();
 
    callable = null;        // to reduce footprint
}

 

這個方法的實現比較簡單,依次遍歷waiters鏈表,喚醒節點中的線程,而後把callable置空。
被喚醒的線程會各自從awaitDone()方法中的LockSupport.park*()阻塞中返回,而後會進行新一輪的循環。在新一輪的循環中會返回執行結果(或者更確切的說是返回任務的狀態)。

report()

report()方法用在get()方法中,做用是把不一樣的任務狀態映射成任務執行結果。實現以下:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 1. 任務正常執行完成,返回任務執行結果
    if (s == NORMAL)
        return (V)x;
    // 2. 任務被取消,拋出CancellationException異常
    if (s >= CANCELLED)
        throw new CancellationException();
    // 3. 其餘狀態,拋出執行異常ExecutionException
    throw new ExecutionException((Throwable)x);
}

 映射關係以下圖所示:

若是任務處於NEW、COMPLETING和INTERRUPTING這三種狀態的時候是執行不到report()方法的,因此不必對這三種狀態進行轉換。

get(long,TimeUnit)

帶超時等待的獲取任務結果,實現以下:

public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        // 若是awaitDone()超時返回以後任務還沒結束,則拋出異常
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

 

跟get()不一樣點在於get(long,TimeUnit)會在awaitDone()超時返回以後拋出TimeoutException異常。

isCancelled()和isDone()

這兩個方法分別用來判斷任務是否被取消和任務是否執行完成,實現都比較簡單,代碼以下:

public boolean isCancelled() {
    return state >= CANCELLED;
}

	
public boolean isDone() {
    return state != NEW;
}

 

根據前面的分析,這兩個方法就很容易理解不用多作解釋了,O(∩_∩)O。

總結下,其實FutureTask的實現仍是比較簡單的,當用戶實現Callable()接口定義好任務以後,把任務交給其餘線程進行執行。FutureTask內部維護一個任務狀態,任何操做都是圍繞着這個狀態進行,並隨時更新任務狀態。任務發起者調用get*()獲取執行結果的時候,若是任務尚未執行完畢,則會把本身放入阻塞隊列中而後進行阻塞等待。當任務執行完成以後,任務執行線程會依次喚醒阻塞等待的線程。調用cancel()取消任務的時候也只是簡單的修改任務狀態,若是須要中斷任務執行線程的話則調用Thread.interrupt()中斷任務執行線程。

第四部分:Other

有個值得關注的問題就是當任務還在執行的時候用戶調用cancel(true)方法可否真正讓任務中止執行呢?
在前面的分析中咱們直到,當調用cancel(true)方法的時候,實際執行仍是Thread.interrupt()方法,而interrupt()方法只是設置中斷標誌位,若是被中斷的線程處於sleep()、wait()或者join()邏輯中則會拋出InterruptedException異常。

所以結論是:cancel(true)並不必定可以中止正在執行的異步任務。

相關文章
相關標籤/搜索