@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
@FunctionalInterface public interface Runnable { public abstract void run(); }
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Callable Runnable Future 都是爲異步執行設計的接口類。Callable與Runnable接口的區別是Callable有返回值,而且會拋出異常信息,Runnable沒有返回值,也不容許拋出異常。Future則能夠判斷任務是否執行完,是否取消,以及取消當前任務和獲取結果。java
Runnable runnable = new Runnable() { @Override public void run() { // do business job System.out.println("thread run = " + Math.random()); } }; new Thread(runnable).start(); // new Thread(runnable).run(); System.out.println("done");
定義了一個runnable實例放入Thread而且調用start()方法就能夠啓動一個線程來執行。這裏注意是調用Thread.start()方法,不能調用Thread.run()方法。run()實際上是串行執行的,start()纔會啓動線程異步執行。數據結構
/* What will be run. */ private Runnable target; public Thread(Runnable target) { init(null, target, "Thread-" + nextThreadNum(), 0); } @Override public void run() { if (target != null) { target.run(); } } /** * Causes this thread to begin execution; the Java Virtual Machine * calls the <code>run</code> method of this thread. */ public synchronized void start() { if (threadStatus != 0) throw new IllegalThreadStateException(); group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { } } } private native void start0();
閱讀Thread類的源代碼,構造函數將runnable對象賦值給內部的target變量。調用run()就是直接調用target對象的run()。調用start()實際上是調用start0(),而start0()是一個native本地方法,由JVM調用操做系統類庫來啓動線程。dom
Callable<Double> callable = new Callable<Double>() { @Override public Double call() throws Exception { // do business job return Math.random(); } }; FutureTask<Double> future = new FutureTask<>(callable); new Thread(future).start(); // do business job System.out.println("future result = " + future.get()); System.out.println("future result = " + future.get(100, TimeUnit.MILLISECONDS));
Callable必需要結合Future來一塊兒使用,聲明一個callable實例,經過這個實例再生成一個FutureTask類型的實例放入Thread執行。當調用future.get()的時候,若是future task已經執行完畢則能夠得到結果,不然堵塞當前線程直到線程執行完而且返回結果,future.get(long, TimeUnit)支持獲取執行結果超時限制。異步
爲何必定要生成這個FutureTask實例?緣由是Thread的構造方法只接受Runnable類型的變量ide
Thread(Runnable target) {...} Thread(Runnable target, AccessControlContext acc) {...} Thread(Runnable target, String name) {...}
再看一下FutureTask的定義 函數
public class FutureTask<V> implements RunnableFuture<V> { ... public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } ... } public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
FutureTask繼承自RunnableFuture,而RunnableFuture同時繼承了Runnable和Future。FutureTask是Future的實現類又是Runnable的實現類,能夠得出的結論是Future提供的方法都是基於Callable接口實現的。this
Future內部定義了一組線程的運行狀態spa
/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
從註釋裏面看出來一共有4種狀態的變化操作系統
/** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters; static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
callable即實際運行的callable對象,outcome是運行結果存儲的變量,waiters是一個鏈表結構的東西,實際上是一個對象擁有下面一個對象的指針,後面會解釋(註釋上說這個叫Treiber stack)。線程
線程啓動以後實際調用的是run()方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
run()方法實際調用的是callble.call()方法,獲取到返回值以後調用set()方法,set()方法經過CAS將線程狀態從NEW設置爲COMPLETING,再將返回值設置到outcome變量,而後將線程狀態設置爲NORMAL完成的狀態。最後的finishCompletion()方法下面再講解。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
get()方法判斷線程狀態,若是線程狀態不是小於等於COMPLETING的狀態調用report(),report()方法判斷線程狀態爲NORMAL就直接返回outcome的值,若是線程狀態爲CANCELLED就拋出CancellationException異常。
若是線程狀態小於等於COMPLETING,調用awaitDone方法
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
awaitDone方法內有一個循環,循環內一串判斷條件
循環內的判斷條件都是排他的,這個循環通常會循環三次。
爲何須要一個鏈表?
我能想到的是在多個線程一塊兒調用get()/get(timeout)方法的時候才須要這個鏈表,由於get()/get(timeout)在task處於非完成狀態時是調用LockSupport.park*()阻塞線程的,在多個線程進行get操做,須要一個鏈表來維護這些線程,一會在task執行完或者出現異常的時候,會在這個鏈表中找到正在被堵塞的線程調用LockSupport.unpark()來解除堵塞。由於這樣的機制才須要一個鏈表。
再回顧剛纔的FutureTask.run()方法,出現異常的時候會調用setException(ex),線程執行完以後會執行set(result)。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
出現異常的時候將線程的最終狀態設置爲EXCEPTIONAL,正常結束的時候將線程的最終狀態設置爲NORMAL,而後都會調用finishCompletion()。
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } protected void done() { }
這個方法會循環這個waiters鏈表,取出裏面正在等待的線程逐個調用LockSupport.unpark(t)來解除堵塞。經過CAS操做將waiters設置爲null。
這裏還有一個done()方法,方法體是空的。能夠被子類重寫,作一些線程執行完成以後的操做。這個也能夠會稱爲回調函數。