上面就是我結合本身的理解分析的future機制的設計思想,可能說的不夠全,但願有人能夠補充。下面會講解java future的具體實現java
execute方式:咱們知道一個類若是實現了runnable接口,它就可以被線程來執行,由於實現了runnable接口就擁有了run方法,因此可以被執行。因此最簡單的異步線程執行方式以下:利用Executors框架來建立一個線程池,而後調用execute方法來提交異步任務,注意這裏的execute方法是沒有返回的,也就是說咱們無法知道提交的任務的執行結果。node
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(()->System.out.println("異步執行!"));複製代碼
submit方式:前面提到的java給咱們提供的線程池接口ExecutorService提供了兩種提交異步任務的方式,一種就是沒有返回值的execute方法(因爲ExecutorService接口是extends了Executor接口的,因此擁有了execute方法),還有一種是帶有返回值的submit方法。在submit方法中,提供了三個重載方法:多線程
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);複製代碼
能夠看到,submit方法支持實現了callable和runnable的task,不一樣於runnable只有沒有返回值的run方法,callable提供了一個帶返回值的call方法,能夠有返回值。正是由於runnable沒有返回值,因此第二個重載方法返回值爲null,第三個重載方法裏面能夠從外部設置一個返回值,這個返回值將會做爲runnable的返回值。具體代碼以下:併發
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}複製代碼
兩個方法都調用newTaskFor方法來建立了一個RunnableFuture的對象,而後調用execute方法來執行這個對象,說明咱們線程池真正執行的對象就是這個RunnableFuture對象。框架
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}複製代碼
由上面代碼看出就是建立了一個futureTask對象,這個對象封裝了咱們提供的runnable和callable對象。futuretask實現了runnablefuture接口,這就是說明futuretask具有了runnable的功能(能被線程執行)和future功能(可以獲取自身執行的結果和狀態)。能被線程執行功能是咱們本身經過實現runnable接口或者callable接口來完成的。future功能前面咱們提過是很通用的功能,因此java給咱們實現了。下面就進入futuretask查看。異步
futuretask對象:futuretask是真正的future功能實現的地方。前面說過這個一個RunnableFuture對象,因此咱們看看它的run方法this
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 封裝的callable對象 */
private Callable<V> callable;
/** task的執行結果 */
private Object outcome;
/** 當前線程池的哪一個線程正在執行這個task */
private volatile Thread runner;
/** 等待的線程列表 */
private volatile WaitNode waiters;
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;// 1. 內部包裝的一個callable對象
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();// 2. 調用包裝的call方法
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);//3. 設置返回值
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}複製代碼
前面提到futuretask是封裝了runnable和callable的,但是爲何內部只有一個callable呢,其實是由於futuretask本身調用適配器轉換了一下:代碼以下,採用了java的適配器模式。spa
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}複製代碼
futuretask的run方法調用了內部封裝的callable對象的call方法,獲取返回值,而且設置到本身outcome中,state表明執行的狀態,這樣就經過代理的方式代理了咱們的callable的call方法,幫助咱們獲取執行的結果和狀態,因此咱們本身編寫業務邏輯的時候就不用去管這層通用的邏輯了。這裏面還有一個waitnode咱們單獨講線程
WaitNode: 經過前面的分析咱們知道,實際上咱們submit任務以後返回的future對象就是線程池爲咱們建立的runnablefuture對象,也就是futuretask這個對象。future接口爲咱們提供了一系列的方法,以下設計
V get() throws InterruptedException, ExecutionException;
boolean cancel(boolean mayInterruptIfRunning);複製代碼
上面是主要的兩個方法,get和cancel,cancel的時候調用runner的interrupt方法便可
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}複製代碼
其中unsafe是用於cas操做的,在java併發包中大量用到,後續會講解。
get方法的設計是阻塞的,也就是說若是結果沒有返回時須要等待的,因此纔會有waitnode這個對象的產生,當多個線程都調用futuretask的get方法的時候,若是結果還沒產生,就都須要等待,這時候全部等待的線程就會造成一個鏈表,因此waitnode實際上就是線程的鏈表。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}複製代碼
再看get方法:若是任務沒有完成就調用awaitDone進入阻塞,若是完成了直接調用report返回結果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}複製代碼
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {//1. 若是等待過程當中,被中斷過了,那麼就移除本身
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)//2. cas更新鏈表節點
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);//3. locksupport原語讓線程進入休眠
}
else
LockSupport.park(this);
}
}複製代碼
仍是比較好看懂,其中LockSupport是原語,讓線程進行休眠。若是線程在休眠中醒來了,有多是多種狀況,好比get的時間到了,也就是從3中醒來了,這樣的話下一次循環就會判斷時間到了,從而remove掉節點退出。還有可能等待的線程被interrupt了,這時候就會走到1的邏輯,經過判斷中斷標記將其remove掉。
既然有了waitnode這個等待鏈表,那麼確定會有相應的喚醒機制,當執行完畢以後就會將waitnode鏈表上的線程一次喚醒,以下。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}複製代碼