FutureTask介紹 FutureTask是一種能夠取消的異步的計算任務。它的計算是經過Callable實現的,能夠把它理解爲是能夠返回結果的Runnable。java
使用FutureTask的優點有:node
能夠獲取線程執行後的返回結果; 提供了超時控制功能。 它實現了Runnable接口和Future接口:算法
QQ20170406-230647.png 什麼是異步計算呢?也就是說,在讓該任務執行時,不須要一直等待其運行結束返回結果,而是能夠先去處理其餘的事情,而後再獲取返回結果。例如你想下載一個很大的文件,這時很耗時的操做,不必一直等待着文件下載完,你能夠先去吃個飯,而後再回來看下文件是否下載完成,若是下載完成就可使用了,不然還須要繼續等待。編程
FutureTask的實現 FutureTask的狀態 FutureTask內部有這樣幾種狀態:安全
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
複製代碼
看名字應該很好理解了,當建立一個FutureTask對象是,初始的狀態是NEW,在運行時狀態會轉換,有4中狀態的轉換過程:bash
NEW -> COMPLETING -> NORMAL:正常執行並返回; NEW -> COMPLETING -> EXCEPTIONAL:執行過程當中出現了異常; NEW -> CANCELLED;執行前被取消; NEW -> INTERRUPTING -> INTERRUPTED:取消時被中斷。 使用FutureTask 下面看一下具體的使用過程:多線程
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
FutureTask<Integer> future = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
result += i;
}
return result;
}
});
executor.execute(future);
System.out.println(future.get());
}
}
複製代碼
FutureTask內部結構併發
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** 執行callable的線程 **/
private volatile Thread runner;
/**
* Treiber stack of waiting threads
* 使用Treiber算法實現的無阻塞的Stack,
* 用於存放等待的線程
*/
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
public V get() throws InterruptedException, ExecutionException {
...
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
...
}
...
```
這裏的waiters理解爲一個stack,由於在調用get方法時任務可能尚未執行完,這時須要將調用get方法的線程放入waiters中。
最重要的兩個get方法,用於獲取返回結果,第二種提供了超時控制功能。
FutureTask構造方法
FutureTask有兩個構造方法:
複製代碼
public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }異步
第二種構造方法傳入一個Runnable對象和一個返回值對象,由於Runnable是沒有返回值的,因此要經過result參數在執行完以後返回結果。
run方法
FutureTask實現了Runnable接口,因此須要實現run方法,代碼以下:
複製代碼
public void run() { /* * 首先判斷狀態,若是不是初始狀態,說明任務已經被執行或取消; * runner是FutureTask的一個屬性,用於保存執行任務的線程, * 若是不爲空則表示已經有線程正在執行,這裏用CAS來設置,失敗則返回。 */ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; // 只有初始狀態纔會執行 if (c != null && state == NEW) { V result; boolean ran; try { // 執行任務 result = c.call(); // 若是沒出現異常,則說明執行成功了 ran = true; } catch (Throwable ex) { result = null; ran = false; // 設置異常 setException(ex); } // 若是執行成功,則設置返回結果 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() // 不管是否執行成功,把runner設置爲null runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 若是被中斷,則說明調用的cancel(true), // 這裏要保證在cancel方法中把state設置爲INTERRUPTED // 不然可能在cancel方法中還沒執行中斷,形成中斷的泄露 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }ide
總結一下run方法的執行過程
只有state爲NEW的時候才執行任務;
執行前要設置runner爲當前線程,使用CAS來設置是爲了防止競爭;
若是任務執行成功,任務狀態從NEW轉換爲COMPLETING,若是執行正常,設置最終狀態爲NORMAL;若是執行中出現了異常,設置最終狀態爲EXCEPTIONAL;
喚醒並刪除Treiber Stack中的全部節點;
若是調用了cancel(true)方法進行了中斷,要確保在run方法執行結束前的狀態是INTERRUPTED。
這裏涉及到3個比較重要的方法:setException,set和handlePossibleCancellationInterrupt。
setException方法
複製代碼
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
若是在執行過程當中(也就是調用call方法時)出現了異常,則要把狀態先設置爲COMPLETING,若是成功,設置outcome = t,outcome對象是Object類型的,用來保存異常或者返回結果對象,也就是說,在正常的執行過程當中(沒有異常,沒有調用cancel方法),outcome保存着返回結果對象,會被返回,若是出現了異常或者中斷,則不會返回並拋出異常,這個在介紹report方法時會講到。
接着設置狀態爲EXCEPTIONAL,這也是最終的狀態。
finishCompletion方法稍後再分析。
set方
複製代碼
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
很簡單,與setException相似,只不過這裏的outcome是返回結果對象,狀態先設置爲COMPLETING,而後再設置爲MORMAL。
handlePossibleCancellationInterrupt方法
複製代碼
private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
handlePossibleCancellationInterrupt方法要確保cancel(true)產生的中斷髮生在run或runAndReset方法執行的過程當中。這裏會循環的調用Thread.yield()來確保狀態在cancel方法中被設置爲INTERRUPTED。
這裏不可以清除中斷標記,由於不能肯定中斷必定來自於cancel方法。
finishCompletion方法
```
private void finishCompletion() {
// assert state > COMPLETING;
// 執行該方法時state必須大於COMPLETING
// 逐個喚醒waiters中的線程
for (WaitNode q; (q = waiters) != null;) {
// 設置棧頂節點爲null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
// 喚醒線程
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
// 若是next爲空,說明棧空了,跳出循環
WaitNode next = q.next;
if (next == null)
break;
// 方便gc回收
q.next = null; // unlink to help gc
// 從新設置棧頂node
q = next;
}
break;
}
}
// 鉤子方法
done();
callable = null; // to reduce footprint
}
```
在調用get方法時,若是任務尚未執行結束,則會阻塞調用的線程,而後把調用的線程放入waiters中,這時,若是任務執行完畢,也就是調用了finishCompletion方法,waiters會依次出棧並逐個喚醒對應的線程。
由此能夠想到,WaitNode必定是在get方法中被添加到棧中的,下面來看下get方法的實現。
get方法
```
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
```
這兩個方法相似,首先判斷狀態,若是s <= COMPLETING,說明任務已經執行完畢,但set方法或setException方法還未執行結束(還未設置狀態爲NORMAL或EXCEPTIONAL),這時須要將當前線程添加到waiters中並阻塞。
第二種get提供了超時功能,若是在規定時間內任務還未執行完畢或者狀態仍是COMPLETING,則獲取結果超時,拋出TimeoutException。而第一種get會一直阻塞直到state > COMPLETING。
awaitDone方法
awaitDone方法的工做是根據狀態來判斷是否可以返回結果,若是任務還未執行完畢,要添加到waiters中並阻塞,不然返回狀態。代碼以下:
```
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 計算到期時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 若是被中斷,刪除節點,拋出異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 若是任務執行完畢而且設置了最終狀態或者被取消,則返回。
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// s == COMPLETING時經過Thread.yield();讓步其餘線程執行,
// 主要是爲了讓狀態改變
else if (s == COMPLETING) // 讓步其餘線程,若是由其餘線程繼續搶佔執行計算結果
Thread.yield();
// 建立一個WaitNode
else if (q == null)
q = new WaitNode();
// CAS設置棧頂節點,若是該節點已經設置了,則這裏再也不執行設置棧頂。
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 若是設置了超時,則計算是否已經到了開始設置的到期時間
else if (timed) {
nanos = deadline - System.nanoTime();
// 若是已經到了到期時間,刪除節點,返回狀態
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞到到期時間
LockSupport.parkNanos(this, nanos);
}
// 若是沒有設置超時,會一直阻塞,直到被中斷或者被喚醒
else
LockSupport.park(this);
}
}
```
removeWaiter方法
```
private void removeWaiter(WaitNode node) {
if (node != null) {
// 將thread設置爲null是由於下面要根據thread是否爲null判斷是否要把node移出
node.thread = null;
// 這裏自旋保證刪除成功
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// q.thread != null說明該q節點不須要移除
if (q.thread != null)
pred = q;
// 若是q.thread == null,且pred != null,須要刪除q節點
else if (pred != null) {
// 刪除q節點
pred.next = s;
// pred.thread == null時說明在併發狀況下被其餘線程修改了;
// 返回第一個for循環重試
if (pred.thread == null) // check for race
continue retry;
}
// 若是q.thread != null且pred == null,說明q是棧頂節點
// 設置棧頂元素爲s節點,若是失敗則返回重試
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
```
cancel方法
cancel方法用於取消任務,這裏可能有兩種狀況,一種是任務已經執行了,另外一種是還未執行,代碼以下:
```
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// mayInterruptIfRunning參數表示是否要進行中斷
if (mayInterruptIfRunning) {
try {
// runner保存着當前執行任務的線程
Thread t = runner;
// 中斷線程
if (t != null)
t.interrupt();
} finally { // final state
// 設置最終狀態爲INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
```
第一個if判斷可能有些很差理解,其實等價於以下代碼:
```
if (!state == NEW ||
!UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED))
```
若是狀態不是NEW,或者設置狀態爲INTERRUPTING或CANCELLED失敗,則取消失敗,返回false。
簡單來講有一下兩種狀況:
若是當前任務尚未執行,那麼state == NEW,那麼會嘗試設置狀態,若是設置狀態失敗會返回false,表示取消失敗;
若是當前任務已經被執行了,那麼state > NEW,也就是!state == NEW爲true,直接返回false。
也就是說,若是任務一旦開始執行了(state != NEW),那麼就不能被取消。
若是mayInterruptIfRunning爲true,要中斷當前執行任務的線程。
report方法
get方法在調用awaitDone方法後,會調用report方法進行返回:
```
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
```
很簡單,能夠看到有3中執行狀況:
若是s == NORMAL爲true,說明是正常執行結束,那麼根據上述的分析,在正常執行結束時outcome存放的是返回結果,把outcome返回;
若是s >= CANCELLED爲true,說明是被取消了,拋出CancellationException;
若是s < CANCELLED,那麼狀態只能是是EXCEPTIONAL,表示在執行過程當中出現了異常,拋出ExecutionException。
runAndReset方法
該方法和run方法相似,區別在於這個方法不會設置任務的執行結果值,因此在正常執行時,不會修改state,除非發生了異常或者中斷,最後返回是否正確的執行並復位:
```
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 不獲取和設置返回值
c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 是否正確的執行並復位 return ran && s == NEW; } ``` 總結 本文分析了FutureTask的執行過程和獲取返回值的過程,要注意如下幾個地方: FutureTask是線程安全的,在多線程下任務也只會被執行一次; 注意在執行時各類狀態的切換; get方法調用時,若是任務沒有結束,要阻塞當前線程,法阻塞的線程會保存在一個Treiber Stack中; get方法超時功能若是超時未獲取成功,會拋出TimeoutException; 注意在取消時的線程中斷,在run方法中必定要保證結束時的狀態是INTERRUPTED,不然在cancel方法中可能沒有執行interrupt,形成中斷的泄露。 部分摘自: 《JDK1.8源碼》 http://www.ideabuffer.cn/2017/04/06/FutureTask%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/ 《java併發編程的藝術》複製代碼