Future 和 FutureTask 源碼詳解

Future 概述

Future 就像它字面上的意思差很少就是「將來」的意思。在實際開發場景中有時須要異步調用一些接口或函數,可是這些接口或函數都不會當即返回結果且該結果不會影響後續程序的執行,這個時候使用Future就很是的適合,舉例來講:bash

如查一個數據集合,第一頁至第一百頁,返回總頁數的總結集,而後導出。一次須要limit 0 10000,這樣,一個SQL查詢出很是慢。但用100個線程,一個線程只查limit0 10 就很是快了, 利用多線程的特性,返回多個集合,在順序合併成總集合。多線程


經常使用方法源碼分析

構造方法

Future 是一個接口而 FutureTask 是它的經常使用實現類。異步

FutureTask(Callable callable)函數

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable 設置任務的初始化狀態
    }
複製代碼

FutureTask(Runnable runnable, V result)源碼分析

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result); // 調用該方法生成一個callable
        this.state = NEW;       // ensure visibility of callable
    }
複製代碼

該方法最終是經過 RunnableAdapter 來生成的 callableui

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
複製代碼

這裏實現了 Callable 接口this


run 方法

FutureTask<HashMap<String,String>> future = new FutureTask<>(new Callable<HashMap<String,String>>() 
{   這裏實現call方法 }
new Thread(future).start();
複製代碼

這裏的 start 方法會去調用 FutureTask 中 run 方法,這裏就不講這裏如何要調用 run 方法了,想了解細節的話能夠去看一下 Thread 的 start 方法裏面講的很清楚。在講 run 方法以前先講一下線程任務的運行狀態:spa

/** 
	 * 狀態之間的轉換以下:
	 * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0; // 初始化狀態
    private static final int COMPLETING   = 1; // 完成中
    private static final int NORMAL       = 2; // 正常
    private static final int EXCEPTIONAL  = 3; // 異常
    private static final int CANCELLED    = 4; // 取消
    private static final int INTERRUPTING = 5; // 中斷中
    private static final int INTERRUPTED  = 6; // 已中斷
複製代碼

下面開始詳細講解 run 方法:線程

public void run() {
        if (state != NEW || // 這裏的 NEW 表明的是該線程任務的運行狀態,此時表明的意思是初始化
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread())) // compareAndSwapObject 設置當前線程爲 runner 線程
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); // 調用失敗就講異常設置到返回結果中
                }
                if (ran)
                    set(result);  // 調用成功則執行 set 講執行結果設置到返回結果中
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
複製代碼

這裏須要講一下 set(result) 這個方法code

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  // 設置線程任務的狀態爲執行中狀態
            outcome = v; // 設置返回值
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state 設置線程任務的最終狀態爲正常狀態
            finishCompletion(); // 移除等待的線程並對等待的線程進行喚醒操做及調用 done 並置空callable
        }
    }
複製代碼

這裏就不講 setException(ex) 這個方法了,由於它和 set 方法幾乎一致。可是在這裏咱們看到了一個 finishCompletion 方法,那這個方法具體是怎麼執行的呢?下面就來對它進行一個細緻的分析:

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 設置等待線程的頭節點置空
                for (;;) { // 自旋喚醒全部等待線程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }
複製代碼

get 方法

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING) // 判斷是否須要等待
            s = awaitDone(false, 0L);
        return report(s);
    }
複製代碼

等待方法 awaitDone

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) { // 判斷是否中斷
                removeWaiter(q); // 移除等待節點
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) { // 這裏大於 COMPLETING 表示已經執行完畢或者已經拋出異常
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield(); // 使當前線程從執行狀態(運行狀態)變爲可執行態(就緒狀態)。說人話就是隔一會執行。
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q); // 設置等待節點
            else if (timed) { // 這裏是設置了超時時間纔會調用
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
複製代碼

設置返回值 report 方法

private V report(int s) throws ExecutionException {
        Object x = outcome;  // 拿到返回值
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);  // 其餘的異常處理辦法
    }
複製代碼

總結

這裏主要講了 run 方法和 get 方法。在 run 方法中是經過拿取和改變 state 的狀態來判斷和執行具體代碼。在 run 方法中只要不出現異常,一開始 state 都是 NEW 。在是 NEW 的狀況下才會去 runner 設置爲當前線程。後續纔會去調用具體的 call 方法,無論是拋出異常或者是拿到正確的執行結果都會設置到返回值當中。在設置返回值,又會去更改 state 的狀態,以後就會對等待的線程依次進行喚醒(這裏可能也沒有等待線程,由於等待線程的出現是由於在異步執行的代碼還未執行完拿到結果,這邊就調用了 get 方法。此時就會出現等待線程),喚醒的線程就能拿到結果了。這裏的 get 方法中的總體邏輯就相對來講要簡單一些了,在 get 方法中首先經過 state 判斷是否須要等待,若要等待就調用 awaitDone 方法去加入等待(這裏會有邏輯去判斷是否取消、中斷、超時)。最後在調用 report 方法返回結果值(這裏有邏輯會對異常進行一個處理)。其實這裏都是經過改變和獲取 state 的狀態來對後續的操做進行一個判斷。

相關文章
相關標籤/搜索