解析 Callable Runnable Future 使用和原理

概念

Callable類的定義

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Runnable類的定義 

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Future類的定義 

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Callable Runnable Future 都是爲異步執行設計的接口類。Callable與Runnable接口的區別是Callable有返回值,而且會拋出異常信息,Runnable沒有返回值,也不容許拋出異常。Future則能夠判斷任務是否執行完,是否取消,以及取消當前任務和獲取結果。java

使用實例

Runnable實例

Runnable runnable = new Runnable() {
    @Override
    public void run() {

        // do business job

        System.out.println("thread run = " + Math.random());
    }
};

new Thread(runnable).start();
//  new Thread(runnable).run();

System.out.println("done");

定義了一個runnable實例放入Thread而且調用start()方法就能夠啓動一個線程來執行。這裏注意是調用Thread.start()方法,不能調用Thread.run()方法。run()實際上是串行執行的,start()纔會啓動線程異步執行。數據結構

start()和run()區別

/* What will be run. */
private Runnable target;

public Thread(Runnable target) {
    init(null, target, "Thread-" + nextThreadNum(), 0);
}

@Override
public void run() {
    if (target != null) {
        target.run();
    }
}

/**
 * Causes this thread to begin execution; the Java Virtual Machine
 * calls the <code>run</code> method of this thread.
 */
public synchronized void start() {
    if (threadStatus != 0)
        throw new IllegalThreadStateException();

    group.add(this);

    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
        }
    }
}

private native void start0();

閱讀Thread類的源代碼,構造函數將runnable對象賦值給內部的target變量。調用run()就是直接調用target對象的run()。調用start()實際上是調用start0(),而start0()是一個native本地方法,由JVM調用操做系統類庫來啓動線程。dom

Callable+Future實例

Callable<Double> callable = new Callable<Double>() {
    @Override
    public Double call() throws Exception {

        // do business job

        return Math.random();
    }
};

FutureTask<Double> future = new FutureTask<>(callable);

new Thread(future).start();

// do business job

System.out.println("future result = " + future.get());
System.out.println("future result = " + future.get(100, TimeUnit.MILLISECONDS));

Callable必需要結合Future來一塊兒使用,聲明一個callable實例,經過這個實例再生成一個FutureTask類型的實例放入Thread執行。當調用future.get()的時候,若是future task已經執行完畢則能夠得到結果,不然堵塞當前線程直到線程執行完而且返回結果,future.get(long, TimeUnit)支持獲取執行結果超時限制。異步

爲何必定要生成這個FutureTask實例?緣由是Thread的構造方法只接受Runnable類型的變量ide

Thread(Runnable target) {...}
Thread(Runnable target, AccessControlContext acc) {...}
Thread(Runnable target, String name) {...}

再看一下FutureTask的定義 函數

public class FutureTask<V> implements RunnableFuture<V> {
    ...
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;
    }
    ...
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

FutureTask繼承自RunnableFuture,而RunnableFuture同時繼承了Runnable和Future。FutureTask是Future的實現類又是Runnable的實現類,能夠得出的結論是Future提供的方法都是基於Callable接口實現的。this

Future 源碼閱讀

Future線程狀態

Future內部定義了一組線程的運行狀態spa

/**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 *
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

從註釋裏面看出來一共有4種狀態的變化操作系統

  1. NEW(初始化)-> COMPLETING(運行中)-> NORMAL(完成狀態)
  2. NEW(初始化)-> COMPLETING(運行中)-> EXCEPTIONAL(運行發生錯誤)
  3. NEW(初始化)-> CANCELLED(還未運行已經被取消)
  4. NEW(初始化)-> INTERRUPTING(運行中被取消)-> INTERRUPTED(被取消狀態)

Future內部變量

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

callable即實際運行的callable對象,outcome是運行結果存儲的變量,waiters是一個鏈表結構的東西,實際上是一個對象擁有下面一個對象的指針,後面會解釋(註釋上說這個叫Treiber stack)。線程

Future.run()

線程啓動以後實際調用的是run()方法

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

run()方法實際調用的是callble.call()方法,獲取到返回值以後調用set()方法,set()方法經過CAS將線程狀態從NEW設置爲COMPLETING,再將返回值設置到outcome變量,而後將線程狀態設置爲NORMAL完成的狀態。最後的finishCompletion()方法下面再講解。

Future.get()

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

get()方法判斷線程狀態,若是線程狀態不是小於等於COMPLETING的狀態調用report(),report()方法判斷線程狀態爲NORMAL就直接返回outcome的值,若是線程狀態爲CANCELLED就拋出CancellationException異常。

若是線程狀態小於等於COMPLETING,調用awaitDone方法

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

awaitDone方法內有一個循環,循環內一串判斷條件

  1. 若是線程狀態大於COMPLETING,將q(waitNode)變量的thread設置爲null,而後把線程狀態返回出去
  2. 若是線程狀態等於COMPLETING,調用Thread.yield()讓出當前線程的CPU使用時間
  3. 若是q==null,建立一個新的WaitNode節點
  4. 若是queued==false(還未被加入等待隊列),使用CAS操做將上一步建立的waitNode設置爲waiters鏈表的表頭
  5. 若是有超時限制,判斷是否超時,若是超時,將waiters鏈表的節點移除,若是未超時,調用LockSupport.parkNanos()阻塞線程
  6. 以上都不知足,調用LockSupport.park()阻塞線程

循環內的判斷條件都是排他的,這個循環通常會循環三次。

  • 第一次循環執行q==null的條件,建立WaitNode節點。
  • 第二次循環執行!queued條件,將剛纔建立的waitNode節點設置爲waiters鏈表的表頭。WaitNode類存了一個線程的引用以及下一個WaitNode節點的引用,這是一個單向鏈表的數據結構。
  • 第三次循環執行到LockSupport.park*()阻塞線程

爲何須要一個鏈表?

我能想到的是在多個線程一塊兒調用get()/get(timeout)方法的時候才須要這個鏈表,由於get()/get(timeout)在task處於非完成狀態時是調用LockSupport.park*()阻塞線程的,在多個線程進行get操做,須要一個鏈表來維護這些線程,一會在task執行完或者出現異常的時候,會在這個鏈表中找到正在被堵塞的線程調用LockSupport.unpark()來解除堵塞。由於這樣的機制才須要一個鏈表。

再回顧剛纔的FutureTask.run()方法,出現異常的時候會調用setException(ex),線程執行完以後會執行set(result)。

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

出現異常的時候將線程的最終狀態設置爲EXCEPTIONAL,正常結束的時候將線程的最終狀態設置爲NORMAL,而後都會調用finishCompletion()。

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

protected void done() { }

這個方法會循環這個waiters鏈表,取出裏面正在等待的線程逐個調用LockSupport.unpark(t)來解除堵塞。經過CAS操做將waiters設置爲null。

這裏還有一個done()方法,方法體是空的。能夠被子類重寫,作一些線程執行完成以後的操做。這個也能夠會稱爲回調函數。

相關文章
相關標籤/搜索