FutureTask是怎樣獲取到異步執行結果的?

  所謂異步任務,就是不在當前線程中進行執行,而是另外起一個線程讓其執行。那麼當前線程若是想拿到其執行結果,該怎麼辦呢?java

  若是咱們使用一個公共變量做爲結果容器,兩個線程共用這個值,那麼應該是能夠拿到結果的,可是這樣一來,對業務就會形成侵入干擾了,由於你始終得考慮將這個共享變量傳入到這個異步線程中去且要維持其安全性。安全

  咱們知道,Future.get() 能夠獲取異步執行的結果,那麼它是怎麼作到的呢?app

  要實現線程的數據交換,咱們按照進程間的通訊方式可知有: 管道、共享內存、Socket套接字。而同一個jvm的兩個線程通訊,全部線程共享內存區域,則必定是經過共享內存再簡單不過了。less

 

  本文將以 ThreadPoolExecutor 線程池 來解釋這個過程。異步

 

  首先,若是想要獲取一個線程的執行結果,須要調用  ThreadPoolExecutor.submit(Callable); 方法。而後該方法會返回一個 Future 對象,經過 Future.get(); 便可獲取結果了。jvm

  它具體是怎麼實現的呢?ide

 

1、首先,咱們來看一下 submit 過程

  僅爲返回了一個 Future<?> 的對象供下游調用!工具

    // AbstractExecutorService
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 包裝一層結果,RunnableFuture, 也實現了 Runnable 接口
        // 實際上就是 FutureTask 
        RunnableFuture<T> ftask = newTaskFor(task);
        // 而後交由 線程池進行調用任務了,即由 jvm 調用執行 Thread
        // 具體執行邏輯,在我以前的文章中也已經闡述,自行搜索
        execute(ftask);
        // 最後,把包裝對象返回便可
        return ftask;
    }

    /**
     * Returns a {@code RunnableFuture} for the given callable task.
     *
     * @param callable the callable task being wrapped
     * @param <T> the type of the callable's result
     * @return a {@code RunnableFuture} which, when run, will call the
     * underlying callable and which, as a {@code Future}, will yield
     * the callable's result as its result and provide for
     * cancellation of the underlying task
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    
    // FutureTask 實例化
    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

 

2、異步線程如何執行?

  經過上面的分析,咱們能夠看到,異步線程的執行被包裝成了 FutureTask, 而java的異步線程執行都是由jvm調用Thread.run()進行, 因此異步起點也應該從這裏去找:oop

    // FutureTask.run()
    public void run() {
        // 不容許屢次執行
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 直接調用 call() 方法,獲取返回結果
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 執行異常,包裝異常信息
                    setException(ex);
                }
                // 將結果設置到當前的 FutureTask 實例變量 outcome 中,這樣當前線程就能夠獲取了
                // 設置結果時,會將 state 同時變動
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    /**
     * Sets the result of this future to the given value unless
     * this future has already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon successful completion of the computation.
     *
     * @param v the value
     */
    protected void set(V v) {
        // 設置結果時,還不表明能夠直接獲取了,還有後續工做,因此設置爲 COMPLETING 中間態
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            // 通知線程執行完成等後續工做
            finishCompletion();
        }
    }
    
    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        // 外部看起來是一個 for, 實際上只會執行一次, 目的是爲了保證內部的鎖獲取成功
        // 若是有其餘線程成功後, waiters也就會爲null, 從而自身也一塊兒退出了
        for (WaitNode q; (q = waiters) != null;) {
            // 保證更新的線程安全性
            // 只要鎖獲取成功,就會一次性更新完成,不會失敗
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    // 依次喚醒等待的線程
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    // 只有把全部 wait 線程都通知完後,才能夠退出
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        // 完成後鉤子方法,默認爲空,若是須要作特殊操做能夠自行復寫便可
        done();

        callable = null;        // to reduce footprint
    }
    // 簡單看一下異常信息的包裝,與 正常結束方法相似,只是將 outcome 設置爲了異常信息,完成狀態設置爲 EXCEPTIONAL
    /**
     * Causes this future to report an {@link ExecutionException}
     * with the given throwable as its cause, unless this future has
     * already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon failure of the computation.
     *
     * @param t the cause of failure
     */
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

  在上面這些實現中,咱們也會有點迷糊,我幹啥來了?ui

  無論怎麼樣,你明白一點,全部的執行結果都被放到 FutureTask 的 outcome 變量中了,咱們若是想要知道結果,那麼,只須要獲取這個變量就能夠了。

  固然,也不可能這麼簡單了,起碼你得知道何時獲取該變量是合適的才行!接下來!

 

3、如何獲取異步執行結果?

  固然是用戶調用 future.get() 獲取了!

    // Future.get()
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 只要狀態值小於 COMPLETING, 就說明任務還未完成, 去等待完成
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        // 只要等待完成, 再去把結果取回便可
        return report(s);
    }
    // 咱們先看一下結果的取回邏輯 report(), 果真不出意外的簡單, 只管取 outcome 便可
    /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 正常執行完成, 直接返回
        if (s == NORMAL)
            return (V)x;
        // 此處會包含 CANCELLED/INTERRUPTING/INTERRUPTED
        if (s >= CANCELLED)
            throw new CancellationException();
        // 業務異常則會被包裝成 ExecutionException
        throw new ExecutionException((Throwable)x);
    }
    // 看到取結果這麼簡單,那麼 等待結束的邏輯的呢?看起來好像沒那麼簡單了
    /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            // 中斷則退出
            if (Thread.interrupted()) {
                // 因 q 是鏈表的頭,因此會移除全部的等待隊列,即中斷是對全部線程的
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 執行完成後,將線程置空便可,刪除工做會有其餘地方完成
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 正在處理結果,稍做等待便可
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 其餘狀況,先建立本身的等待標識,以便在下一次循環中進行入隊等待
            else if (q == null)
                q = new WaitNode();
            // 進行一次入隊等待,將 q 做爲頭節點
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 限時的等待,等待超時後,直接返回當前狀態便可
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 最長等待預約時間
                LockSupport.parkNanos(this, nanos);
            }
            // 此處進行無限期等待,但當被喚醒時,必定有狀態變動的時候,應該會在下一個週期結束循環
            else
                LockSupport.park(this);
        }
    }

  能夠看到,等待邏輯仍是有點多的,畢竟場景多。至此,咱們已經徹底看到了一個,如何獲取異步線程的執行結果實現了。總結下:

    1. 實現Runnable接口,由jvm進行線程調用;
    2. 包裝 Callable.call()方法,帶返回值,當線程被調起時,轉給 call() 方法執行,並返回結果;
    3. 將結果封裝到當前future實例中,以備查;
    4. 當用戶調用get()方法時,保證狀態完成狀況下,最快速地返回結果;

 

4、擴展: Future.get() vs Thread.join()

  Future.get()方法,一方面是爲了獲取異步線程的執行結果,另外一方面也作到了等待線程執行完成的效果。

  而 Thread.join() 則純粹是爲了等待異步線程執行完成,那它們有什麼殊途同歸之妙嗎?來看下

    // Thread.join(), 經過 isAlive() 判斷是否完成
    /**
     * Waits for this thread to die.
     *
     * <p> An invocation of this method behaves in exactly the same
     * way as the invocation
     *
     * <blockquote>
     * {@linkplain #join(long) join}{@code (0)}
     * </blockquote>
     *
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
     */
    public final void join() throws InterruptedException {
        join(0);
    }

    /**
     * Waits at most {@code millis} milliseconds for this thread to
     * die. A timeout of {@code 0} means to wait forever.
     *
     * <p> This implementation uses a loop of {@code this.wait} calls
     * conditioned on {@code this.isAlive}. As a thread terminates the
     * {@code this.notifyAll} method is invoked. It is recommended that
     * applications not use {@code wait}, {@code notify}, or
     * {@code notifyAll} on {@code Thread} instances.
     *
     * @param  millis
     *         the time to wait in milliseconds
     *
     * @throws  IllegalArgumentException
     *          if the value of {@code millis} is negative
     *
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
     */
    public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        // 無限期等待
        if (millis == 0) {
            while (isAlive()) {
                // 這是個 native 方法,即由jvm進行控制
                // Thread 任務執行完成後,將進行 notifyAll()
                // 同理下面的限時等待
                wait(0);
            }
        } else {
            // 限時等待
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

  能夠看到, Thread.join() 的等待邏輯是依賴於 jvm 的調度的, 經過 wait/notify 機制實現。與 Future.get() 相比,它是在 以後的,且沒法獲取結果。

 

5、Runnable如何包裝成Callable ?

  Callable 其實就只是實現了一個 call() 方法而已,若是咱們只實現了 Runnable, 是否就拿不到返回值呢?並非,咱們能夠直接指定返回值對象或者不指定,使用Runnable進行submit();

    // 不指定返回值的 Runnable, 此處的返回值必定 void
    public Future<?> submit(Runnable task);
    // 指定返回值的 Runnable, 由 T 進行返回值接收
    public <T> Future<T> submit(Runnable task, T result);

  可是 Runnable 是怎麼變成 Callable 的呢?其實就是一個 適配器模式的應用,咱們來看一下!

    // AbstractExecutorService.submit()
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 明確返回值爲 Void
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    // 一樣使用 FutureTask 進行封裝,只是調用了不一樣的構造器
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    // FutureTask, 使用 Executors 工具類生成一個 callable, 屏蔽掉 Callable 與 Runnable 的差別 
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    // Executors 使用一個適合器類將 Runnable 封裝成 Callable
    /**
     * Returns a {@link Callable} object that, when
     * called, runs the given task and returns the given result.  This
     * can be useful when applying methods requiring a
     * {@code Callable} to an otherwise resultless action.
     * @param task the task to run
     * @param result the result to return
     * @param <T> the type of the result
     * @return a callable object
     * @throws NullPointerException if task null
     */
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    // 而 RunnableAdapter 也是很簡單, 僅將 call() 轉而調用 run() 方法便可
    /**
     * A callable that runs given task and returns given result
     */
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

  簡單不失優雅!這就是,大巧不工!

  可是有一個點咱們能夠看到,那就是 result 的獲取,其實就是傳入什麼值,就返回值。而若是想在想要改變其結果,惟一的辦法是使 result 變量 對 Runnable.run() 可見,從而在 run() 方法中改變其值。這就看你怎麼用了!

相關文章
相關標籤/搜索