


  咱們知道,Future.get() 能夠獲取異步執行的結果,那麼它是怎麼作到的呢?app

  要實現線程的數據交換,咱們按照進程間的通訊方式可知有: 管道、共享內存、Socket套接字。而同一個jvm的兩個線程通訊,全部線程共享內存區域,則必定是經過共享內存再簡單不過了。less


  本文將以 ThreadPoolExecutor 線程池 來解釋這個過程。異步


  首先,若是想要獲取一個線程的執行結果,須要調用  ThreadPoolExecutor.submit(Callable); 方法。而後該方法會返回一個 Future 對象,經過 Future.get(); 便可獲取結果了。jvm



1、首先,咱們來看一下 submit 過程

  僅爲返回了一個 Future<?> 的對象供下游調用!工具

    // AbstractExecutorService
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 包裝一層結果,RunnableFuture, 也實現了 Runnable 接口
        // 實際上就是 FutureTask 
        RunnableFuture<T> ftask = newTaskFor(task);
        // 而後交由 線程池進行調用任務了,即由 jvm 調用執行 Thread
        // 具體執行邏輯,在我以前的文章中也已經闡述,自行搜索
        // 最後,把包裝對象返回便可
        return ftask;

     * Returns a {@code RunnableFuture} for the given callable task.
     * @param callable the callable task being wrapped
     * @param <T> the type of the callable's result
     * @return a {@code RunnableFuture} which, when run, will call the
     * underlying callable and which, as a {@code Future}, will yield
     * the callable's result as its result and provide for
     * cancellation of the underlying task
     * @since 1.6
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    // FutureTask 實例化
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable



  經過上面的分析,咱們能夠看到,異步線程的執行被包裝成了 FutureTask, 而java的異步線程執行都是由jvm調用Thread.run()進行, 因此異步起點也應該從這裏去找:oop

    // FutureTask.run()
    public void run() {
        // 不容許屢次執行
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 直接調用 call() 方法,獲取返回結果
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 執行異常,包裝異常信息
                // 將結果設置到當前的 FutureTask 實例變量 outcome 中,這樣當前線程就能夠獲取了
                // 設置結果時,會將 state 同時變動
                if (ran)
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
     * Sets the result of this future to the given value unless
     * this future has already been set or has been cancelled.
     * <p>This method is invoked internally by the {@link #run} method
     * upon successful completion of the computation.
     * @param v the value
    protected void set(V v) {
        // 設置結果時,還不表明能夠直接獲取了,還有後續工做,因此設置爲 COMPLETING 中間態
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            // 通知線程執行完成等後續工做
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
    private void finishCompletion() {
        // assert state > COMPLETING;
        // 外部看起來是一個 for, 實際上只會執行一次, 目的是爲了保證內部的鎖獲取成功
        // 若是有其餘線程成功後, waiters也就會爲null, 從而自身也一塊兒退出了
        for (WaitNode q; (q = waiters) != null;) {
            // 保證更新的線程安全性
            // 只要鎖獲取成功,就會一次性更新完成,不會失敗
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    // 依次喚醒等待的線程
                    if (t != null) {
                        q.thread = null;
                    WaitNode next = q.next;
                    // 只有把全部 wait 線程都通知完後,才能夠退出
                    if (next == null)
                    q.next = null; // unlink to help gc
                    q = next;

        // 完成後鉤子方法,默認爲空,若是須要作特殊操做能夠自行復寫便可

        callable = null;        // to reduce footprint
    // 簡單看一下異常信息的包裝,與 正常結束方法相似,只是將 outcome 設置爲了異常信息,完成狀態設置爲 EXCEPTIONAL
     * Causes this future to report an {@link ExecutionException}
     * with the given throwable as its cause, unless this future has
     * already been set or has been cancelled.
     * <p>This method is invoked internally by the {@link #run} method
     * upon failure of the computation.
     * @param t the cause of failure
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state


  無論怎麼樣,你明白一點,全部的執行結果都被放到 FutureTask 的 outcome 變量中了,咱們若是想要知道結果,那麼,只須要獲取這個變量就能夠了。




  固然是用戶調用 future.get() 獲取了!

    // Future.get()
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 只要狀態值小於 COMPLETING, 就說明任務還未完成, 去等待完成
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        // 只要等待完成, 再去把結果取回便可
        return report(s);
    // 咱們先看一下結果的取回邏輯 report(), 果真不出意外的簡單, 只管取 outcome 便可
     * Returns result or throws exception for completed task.
     * @param s completed state value
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 正常執行完成, 直接返回
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        // 業務異常則會被包裝成 ExecutionException
        throw new ExecutionException((Throwable)x);
    // 看到取結果這麼簡單,那麼 等待結束的邏輯的呢?看起來好像沒那麼簡單了
     * Awaits completion or aborts on interrupt or timeout.
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            // 中斷則退出
            if (Thread.interrupted()) {
                // 因 q 是鏈表的頭,因此會移除全部的等待隊列,即中斷是對全部線程的
                throw new InterruptedException();

            int s = state;
            // 執行完成後,將線程置空便可,刪除工做會有其餘地方完成
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            // 正在處理結果,稍做等待便可
            else if (s == COMPLETING) // cannot time out yet
            // 其餘狀況,先建立本身的等待標識,以便在下一次循環中進行入隊等待
            else if (q == null)
                q = new WaitNode();
            // 進行一次入隊等待,將 q 做爲頭節點
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 限時的等待,等待超時後,直接返回當前狀態便可
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    return state;
                // 最長等待預約時間
                LockSupport.parkNanos(this, nanos);
            // 此處進行無限期等待,但當被喚醒時,必定有狀態變動的時候,應該會在下一個週期結束循環


    1. 實現Runnable接口,由jvm進行線程調用;
    2. 包裝 Callable.call()方法,帶返回值,當線程被調起時,轉給 call() 方法執行,並返回結果;
    3. 將結果封裝到當前future實例中,以備查;
    4. 當用戶調用get()方法時,保證狀態完成狀況下,最快速地返回結果;


4、擴展: Future.get() vs Thread.join()


  而 Thread.join() 則純粹是爲了等待異步線程執行完成,那它們有什麼殊途同歸之妙嗎?來看下

    // Thread.join(), 經過 isAlive() 判斷是否完成
     * Waits for this thread to die.
     * <p> An invocation of this method behaves in exactly the same
     * way as the invocation
     * <blockquote>
     * {@linkplain #join(long) join}{@code (0)}
     * </blockquote>
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
    public final void join() throws InterruptedException {

     * Waits at most {@code millis} milliseconds for this thread to
     * die. A timeout of {@code 0} means to wait forever.
     * <p> This implementation uses a loop of {@code this.wait} calls
     * conditioned on {@code this.isAlive}. As a thread terminates the
     * {@code this.notifyAll} method is invoked. It is recommended that
     * applications not use {@code wait}, {@code notify}, or
     * {@code notifyAll} on {@code Thread} instances.
     * @param  millis
     *         the time to wait in milliseconds
     * @throws  IllegalArgumentException
     *          if the value of {@code millis} is negative
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
    public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");

        // 無限期等待
        if (millis == 0) {
            while (isAlive()) {
                // 這是個 native 方法,即由jvm進行控制
                // Thread 任務執行完成後,將進行 notifyAll()
                // 同理下面的限時等待
        } else {
            // 限時等待
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                now = System.currentTimeMillis() - base;

  能夠看到, Thread.join() 的等待邏輯是依賴於 jvm 的調度的, 經過 wait/notify 機制實現。與 Future.get() 相比,它是在 以後的,且沒法獲取結果。


5、Runnable如何包裝成Callable ?

  Callable 其實就只是實現了一個 call() 方法而已,若是咱們只實現了 Runnable, 是否就拿不到返回值呢?並非,咱們能夠直接指定返回值對象或者不指定,使用Runnable進行submit();

    // 不指定返回值的 Runnable, 此處的返回值必定 void
    public Future<?> submit(Runnable task);
    // 指定返回值的 Runnable, 由 T 進行返回值接收
    public <T> Future<T> submit(Runnable task, T result);

  可是 Runnable 是怎麼變成 Callable 的呢?其實就是一個 適配器模式的應用,咱們來看一下!

    // AbstractExecutorService.submit()
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 明確返回值爲 Void
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        return ftask;
    // 一樣使用 FutureTask 進行封裝,只是調用了不一樣的構造器
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    // FutureTask, 使用 Executors 工具類生成一個 callable, 屏蔽掉 Callable 與 Runnable 的差別 
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    // Executors 使用一個適合器類將 Runnable 封裝成 Callable
     * Returns a {@link Callable} object that, when
     * called, runs the given task and returns the given result.  This
     * can be useful when applying methods requiring a
     * {@code Callable} to an otherwise resultless action.
     * @param task the task to run
     * @param result the result to return
     * @param <T> the type of the result
     * @return a callable object
     * @throws NullPointerException if task null
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    // 而 RunnableAdapter 也是很簡單, 僅將 call() 轉而調用 run() 方法便可
     * A callable that runs given task and returns given result
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        public T call() {
            return result;


  可是有一個點咱們能夠看到,那就是 result 的獲取,其實就是傳入什麼值,就返回值。而若是想在想要改變其結果,惟一的辦法是使 result 變量 對 Runnable.run() 可見,從而在 run() 方法中改變其值。這就看你怎麼用了!
