怎麼獲取Thread返回值?(FutureTask解讀)

墮落的人生啊……

如何獲取Thread返回值?

偶然間看到這個問題,對於標配jdk1.8的咱們是否是分分鐘拍出答案?
答曰:簡單,Callable,完美解決,下一題……java

但是,身處jdk1.4(甚至更早)的前輩們,要怎麼作才能拿到線程返回值呢?或者說,禁用Callable技能,怎麼獲取線程返回值?
嗯,這彷佛是線程間通訊的問題;只有Runnable做爲武器,有些麻煩,接受挑戰!程序員

首先,定義任務Task多線程

// 任務Task
class Task implements Runnable{

    @Getter
    Object result;    //返回值

    @Override
    public void run() {
        try {
            // 模擬某耗時邏輯
            System.out.println(String.format("[%s] 執行中..",Thread.currentThread().getName()));
            TimeUnit.SECONDS.sleep(2L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 計算獲得最終結果
        result = Integer.valueOf(9987);
        
        System.out.println(String.format("[%s] 執行完畢..",Thread.currentThread().getName()));
    }
}

以後,啓動線程併發

public static void main(String[] args) {
    // 使用了內部類,採用以下方式new
    CallbackTest callbackTest = new CallbackTest();
    Task task = callbackTest.new Task();
    
    final String threadName = "T-1";
    Thread thread = new Thread(task,threadName);

    thread.start();
    
}

好,T-1線程啓動了,看樣子能很好的執行任務,問題是main方法中怎麼獲取到Task的返回值result呢?
以目前的代碼運行,效果絕對是T-1線程單飛,和main線程沒啥聯繫。jvm

我有一項能力,老是能第一時間至關最簡易的方法。ide

簡單方式

main線程辛苦些,多跑跑腿檢查下result的狀態:函數

public static void main(String[] args) {
    CallbackTest callbackTest = new CallbackTest();
    Task task = callbackTest.new Task();
    final String threadName = "T-1";
    Thread thread = new Thread(task,threadName);
    thread.start();
    
    // main線程頻繁檢查T-1線程
    while (true){
        if(task.getResult()!=null){
            System.out.println(String.format("結果 task=%s",task.getResult()));
            break;
        }

        // 讓cpu稍微冷靜一下
        TimeUnit.MILLISECONDS.sleep(200L);
        System.out.println(String.format("[main] 勤勞檢查result中(result=%s)",task.getResult()));
    }
}

運行效果,多是這樣的:工具

[T-1] 執行中..
[main] 勤勞檢查result中(result=null)
[main] 勤勞檢查result中(result=null)
[main] 勤勞檢查result中(result=null)
[main] 勤勞檢查result中(result=null)
[main] 勤勞檢查result中(result=null)
[main] 勤勞檢查result中(result=null)
[main] 勤勞檢查result中(result=null)
[main] 勤勞檢查result中(result=null)
[main] 勤勞檢查result中(result=null)
[T-1] 執行完畢..
[main] 勤勞檢查result中(result=9987)
結果 task=9987

雖然已加入了對cpu而言人性化的休眠方法(sleep),但這依然不是個很好的方案。該方案極大的操勞了main線程,須要一遍遍的檢查子線程的運行狀況——子線程是否將最終結果賦值。
那有沒有一種方式,能夠在T-1運行完以後,告訴main線程呢?
做爲一個老派(技術陳舊)的程序員,我首先想到的是wait..notify組合this

wait..notify組合方式

  • wait部分,檢查result值,若是爲null則表示T-1還未執行完,安心等待
public static void main(String[] args){

        CallbackTest callbackTest = new CallbackTest();
        Task task = callbackTest.new Task();
        final String threadName = "T-1";
        Thread thread = new Thread(task,threadName);
        thread.start();

        while (true){
            //檢查result狀態,尚未賦值,則等待
            if(task.getResult()==null){
                System.out.println(String.format("[%s] 等待執行..",Thread.currentThread().getName()));
                synchronized (task){
                    try {
                        task.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

            if(task.getResult()!=null){
                System.out.println(String.format("結果 task=%s",task.getResult()));
                break;
            }
        }

    }
  • notify部分,增長喚醒邏輯
class Task implements Runnable{

    @Getter
    Object result;
    
    @Override
    public void run() {
        try {
            System.out.println(String.format("[%s] 執行中..",Thread.currentThread().getName()));
            TimeUnit.SECONDS.sleep(2L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        result = Integer.valueOf(9987);
    
        //喚醒wait的對象
        synchronized (this){
            this.notify();
        }
        System.out.println(String.format("[%s] 執行完畢..",Thread.currentThread().getName()));
    }
}

改造後,執行效果以下:spa

[T-1] 執行中..
[main] 等待執行..
[T-1] 執行完畢..
結果 task=9987

LockSupport實現

其實也可使用LockSupport實現,和 wait / notify 相似,直接貼出完整代碼吧:

public class CallbackTest {
    class Task implements Runnable{
    
        @Getter
        Object result;
    
        // 構造函數傳入調用線程(main線程)
        Thread runner;
        Task(Thread runner){
            this.runner = runner;
        }
    
        @Override
        public void run() {
            try {
                System.out.println(String.format("[%s] 執行中..",Thread.currentThread().getName()));
                TimeUnit.SECONDS.sleep(2L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            result = Integer.valueOf(9987);
    
            //喚醒main線程
            synchronized (this){
                LockSupport.unpark(runner);
            }
            System.out.println(String.format("[%s] 執行完畢..",Thread.currentThread().getName()));
        }
    }
    
    public static void main(String[] args) {
    
        CallbackTest callbackTest = new CallbackTest();
        Task task = callbackTest.new Task(Thread.currentThread());
        final String threadName = "T-1";
        Thread thread = new Thread(task,threadName);
    
        thread.start();
    
        while (true){
            if(task.getResult()==null){
                System.out.println(String.format("[%s] 等待執行..",Thread.currentThread().getName()));
                LockSupport.park(); //main線程阻塞
            }
    
            if(task.getResult()!=null){
                System.out.println(String.format("結果 task=%s",task.getResult()));
                break;
            }
        }
    
    }
}

Callable使用

至此,咱們至關於能夠用本身的方式獲取到Thread的返回值了,此時回顧下文章開始初的解答:

偶然間看到這個問題,對於標配`jdk1.8`的咱們是否是分分鐘拍出答案?
答曰:簡單,`Callable`,完美解決,下一題……

當時很天然的就回答了Callable,先看看它是怎麼用的。

public class CallbackTest {

    class Task implements Callable<Object> {
    
        @Override
        public Object call() {
            try {
                // 某耗時邏輯
                System.out.println(String.format("[%s] 執行中..",Thread.currentThread().getName()));
                TimeUnit.SECONDS.sleep(2L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return Integer.valueOf(9987);
        }
    }
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CallbackTest callbackTest = new CallbackTest();
        Task task = callbackTest.new Task();

        ExecutorService es = Executors.newSingleThreadExecutor();
        Future<Object> future = es.submit(task);
        System.out.println("結果:"+future.get());

        es.shutdown();

    }
}

代碼並不複雜,demo 中獲取返回值的方式是future.get(),這是一個阻塞方法;在子線程執行完(return)以前會一直阻塞。沒用過的開發兄弟(姐妹?)們自行科普吧,很少解釋了。

Callable源碼以下:

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable自己就一接口,沒什麼玄機,玄機在 Future 或者說FutureTask上。

重頭戲來了,看看源碼是怎麼實現從其它線程獲取返回值的。

FutureTask解析

先瞧瞧FutureTask的江湖地位:
clipboard.png

能夠看出,FutureTaskFuture接口Runnable接口的實現類
此事留個大概印象,咱們來看下FutureTask是怎麼和Callable關聯上的?

FutureTask和Callable的關係

(可對照下文,追下源碼;若是實在不理解,可直接跳到本章節末尾結論處)

ExecutorService es = Executors.newSingleThreadExecutor();
Future<Object> future = es.submit(task);

以例子中的ExecutorService的submit方法做爲入口,實際的實現方法爲AbstractExecutorServicesubmit

/* `AbstractExecutorService` */
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);    //註釋1-構建了FutureTask
    execute(ftask);    //註釋2-最終會調用ftask的run方法,也就是調用`步驟1構建的FutureTask對象的run方法
    return ftask;
}
...

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);    //1.1-調用FutureTask的構造函數
}
  • 註釋1 - 觀察FutureTask的構造函數:
// callable是FutureTask的成員變量
private Callable<V> callable;
    
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;    //爲成員變量賦值
    this.state = NEW;       // ensure visibility of callable
}

結論1:經ExecutorService的穿針引線,Callable會最終賦值給FutureTask的成員變量

  • 註釋2 - 再次追蹤下執行部分,注意看註釋的分析:
/* `AbstractExecutorService`的`submit` */
public <T> Future<T> submit(Callable<T> task) {
    ...
    execute(ftask);    //註釋2-最終會調用ftask的run方法,也就是調用`步驟1構建的FutureTask對象的run方法
    ...
}

    ↓↓↓↓↓
    ↓↓↓↓↓

/* ThreadPoolExecutor的execute */
public void execute(Runnable command) {
    ...
    addWorker(null, false);    //添加到worker中
    ...
}

    ↓↓↓↓↓
    ↓↓↓↓↓
    
/* ThreadPoolExecutor的addWorker */
private boolean addWorker(Runnable firstTask, boolean core) {
    w = new Worker(firstTask);    //`Worker`封裝
    final Thread t = w.thread;
    
    ...
            t.start();    //註釋3-worker中的thread執行start方法,會調用對應Runnable的run方法
    ...
}

    ↓↓↓↓↓
    ↓↓↓↓↓
    
/* 內部類`Work` */
final Thread thread;    // 成員變量
Runnable firstTask;    // 成員變量

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;    //賦值成員變量firstTask
    this.thread = getThreadFactory().newThread(this);    //建立新線程,並賦值成員變量thread
}

// 3.1-`註釋3`處的start,會執行此處的run方法,進而會調用runWorker方法
public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    ...
    Runnable task = w.firstTask;
    ...
                    task.run();    //##### 注意了,最終會調用到此處 #####
    ...
}

task.run()中的task又是什麼呢,就是在最開始賦值的FutureTask(註釋1處),看下它的run方法

public void run() {
    ...
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();    //會調用callable的call方法,這個方法中就是咱們本身定義的邏輯
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);    //註釋4-賦值動做
        }
    ...
}

結論2:調用過程,通過一系列週轉,最終會調用Callable的call方法(也就是咱們的自定義邏輯)

  • 註釋4 - 看下此處的賦值動做
//成員變量
private Object outcome;

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;    //賦值給成員變量
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

綜上,可得出結論:FutureTask包裝了Callable,執行期call方法後,將返回值賦值給成員變量

接下來探索下返回值的獲取,即Future.get()的實現。

返回值獲取

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)    // 1-未完成狀態,線程阻塞
        s = awaitDone(false, 0L);
    return report(s);    // 2-已完成狀態,直接獲取
}

// 1.1-阻塞
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    ...
        LockSupport.park(this);    //阻塞
    ...

// 2.1-返回了outcome
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

get() 的邏輯並不複雜:

  1. 判斷狀態,若是此時還未執行完,或者說還未給成員變量outcome(call() 方法返回值的引用)賦值,阻塞
  2. 若是此時已經給outcome賦值,則將該對象返回

怎麼樣,是否是有點似曾相識?這和咱們本身實現的那一版的邏輯是一致的!
再次看下set() 方法,裏面應該會有LockSupport.unpark(Thread t) 方法。

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();    //這裏看上去很可疑
    }
}

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (WAITERS.weakCompareAndSet(this, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);    //吼吼吼,抓到你了,果真有LockSupport.unpark(t)
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

果真找到了park方法對應的unpark,證實咱們的推斷是正確的——FutureTask的核心實現思路,與咱們本身的實現方式是一致的(尤爲LockSupport版本),即子線程未完成時阻塞,已完成時釋放。

主邏輯分析完了,再來兩個開胃甜點。

關注點

LockSupport

對比本身和源碼的實現,都用LockSupport,使用的阻塞方法卻不相同——park() vs park(Object blocker)

差異在哪?引用官方文檔的解釋:

The three forms of park each also support a blocker object parameter. 
This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. 
當線程被阻塞時記錄此對象,以容許監視和診斷工具識別線程被阻塞的緣由。
(Such tools may access blockers using method getBlocker(java.lang.Thread).) 

The use of these forms rather than the original forms without this parameter is strongly encouraged. 
有參數的park(Object blocker)是被強烈推薦的

The normal argument to supply as a blocker within a lock implementation is this.

按文檔中的意思:傳入的blocker對象,至關於一個標誌對象,線程阻塞時會記錄下來。下面的例子能明顯看出差異

舉例說明:(轉自 https://www.jianshu.com/p/835...

private static void parkVsParkBlocker() {
    Thread t1 = new Thread(() -> {
        LockSupport.park();
    }, "t1");
    t1.start();

    Object blocker = new Object();
    Thread t2 = new Thread(() -> {
        LockSupport.park(blocker);
    }, "t2");
    t2.start();

    LockSupport.getBlocker(t2);

    unpark(t1, 60);
    unpark(t2, 60);
}

Print java stack trace of a given jvm process.
jstack jps -l | grep LockSupport | awk '{print $1}'

clipboard.png

VarHandle

FutureTask做爲抽象出的工具類,考慮了多線程環境下的get()的狀況,這部分在前文故意忽略了。
而併發環境下的數據統一,主要靠volatile關鍵字+CAS來達成。(經典模式)

// 狀態,記錄子線程執行狀況
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

// 記錄子線程,運行Callable.call()的線程
private volatile Thread runner;
// 等待節點,鏈表
private volatile WaitNode waiters;

volatile關鍵字,此處主要用於讓其它線程可見(可見性);那CASCompare And Sweep)是作什麼的?

本質上,它就是個樂觀鎖:

  1. 比較某內存地址下的某一變量的當前值猜測值是否一致,若是一致,原子替換該變量爲新值,return true
  2. 若是不一致,return false

jdk 9以前,主要靠Unsafe實現CAS;自jdk 9開始,推出了VarHandle旨在替代 AtomicXX 工具,以及方便開發人員使用Unsafe的部分權能

以狀態 state 的變動爲例,看下VarHandle是如何完成CAS的:

private volatile int state;

/* 聲明和賦值 */
private static final VarHandle STATE;
static{
    try {
        MethodHandles.Lookup l = MethodHandles.lookup();    //1 - 經過MethodHandles.lookup()聲明MethodHandles.Lookup對象
        STATE = l.findVarHandle(FutureTask.class, "state", int.class);    //2 - 賦值VarHandle STATE,此時STATE和state就創建了某種聯繫
    } catch (ReflectiveOperationException e) {
        throw new ExceptionInInitializerError(e);
    }
}

/* 調用 */
protected void set(V v) {
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {    // 3 - 當前對象,將state變量,由NEW改爲COMPLETING
        outcome = v;
        STATE.setRelease(this, NORMAL); // final state
        finishCompletion();
    }
}
相關文章
相關標籤/搜索