概述node
FutureTask 是一個可取消的、異步執行任務的類,它的繼承結構以下:segmentfault
它實現了 RunnableFuture 接口,而該接口又繼承了 Runnable 接口和 Future 接口,所以 FutureTask 也具備這兩個接口所定義的特徵。FutureTask 的主要功能:安全
1. 異步執行任務,而且任務只執行一次;多線程
2. 監控任務是否完成、取消任務;併發
3. 獲取任務執行結果。app
下面分析其代碼實現。less
代碼分析異步
分析 FutureTask 的代碼以前,先看下它實現的接口。RunnableFuture 接口定義以下:flex
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
ui
RunnableFuture 接口繼承了 Runnable 接口和 Future 接口,而 Runnable 接口只有一個 run 方法,這裏再也不贅述。下面分析 Future 接口。
Future 接口
Future 接口方法定義以下:
主要方法分析:
/*
* 嘗試取消執行任務。若任務已完成、已取消,或者因爲其餘某些緣由沒法取消,則嘗試失敗。
* 若成功,且調用該方法時任務未啓動,則此任務不會再運行;
* 若任務已啓動,則根據參數 mayInterruptIfRunning 決定是否中斷該任務。
*/
boolean cancel(boolean mayInterruptIfRunning);
// 若該任務正常結束以前被取消,則返回 true
boolean isCancelled();
/*
* 若該任務已完成,則返回 true
* 這裏的「完成」,多是因爲正常終止、異常,或者取消,這些狀況都返回 true
*/
boolean isDone();
// 等待計算完成(若是須要),而後獲取結果
V get() throws InterruptedException, ExecutionException;
// 若是須要,最多等待計算完成的給定時間,而後檢索其結果(若是可用)
// PS: 該方法與前者的區別在於加了超時等待
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
FutureTask 代碼分析
任務的狀態變量:
// 任務的狀態
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
其中 state 表示任務的狀態,總共有 7 種,它們之間的狀態轉換可能有如下 4 種狀況:
1. 任務執行正常:NEW -> COMPLETING -> NORMAL
2. 任務執行異常:NEW -> COMPLETING -> EXCEPTIONAL
3. 任務取消:NEW -> CANCELLED
4. 任務中斷:NEW -> INTERRUPTING -> INTERRUPTED
示意圖:
在分析其餘成員變量以前,先看一個內部嵌套類 WaitNode:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
代碼比較簡單,就是對 Thread 的封裝,能夠理解爲單鏈表的節點。
其餘成員變量:
/** The underlying callable; nulled out after running */
// 提交的任務
private Callable<V> callable;
/** The result to return or exception to throw from get() */
// get() 方法返回的結果(或者異常)
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
// 執行任務的線程
private volatile Thread runner;
/** Treiber stack of waiting threads */
// 等待線程的 Treiber 棧
private volatile WaitNode waiters
其中 waiters 是一個 Treiber 棧,簡單來講,就是由單鏈表組成的線程安全的棧,如圖所示:
構造器
// 建立一個 FutureTask 對象,在運行時將執行給定的 Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// 建立一個 FutureTask,在運行時執行給定的 Runnable,
// 並安排 get 將在成功完成時返回給定的結果
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
這兩個構造器分別傳入 Callable 對象和 Runnable 對象(適配爲 Callable 對象),而後將其狀態初始化爲 NEW。
run: 執行任務
public void run() {
// 使用 CAS 進行併發控制,防止任務被執行屢次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 調用 Callable 的 call 方法執行任務
result = c.call();
ran = true;
} catch (Throwable ex) {
// 異常處理
result = null;
ran = false;
setException(ex);
}
// 正常處理
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 線程被中斷
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
set & setException: 更新狀態值,喚醒棧中等待的線程
protected void set(V v) {
// CAS 將 state 修改成 COMPLETING,該狀態是一箇中間狀態
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 輸出結果賦值
// 將 state 更新爲 NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
// CAS 將 state 修改成 COMPLETING,該狀態是一箇中間狀態
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t; // 輸出結果賦值
// 將 state 更新爲 EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
這兩個方法的操做相似,都是更新 state 的值並給返回結果 outcome 賦值,而後執行結束操做 finishCompletion 方法:
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 將 waiters 置空
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 喚醒 WaitNode 封裝的線程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
finishCompletion 方法的做用就是喚醒棧中全部等待的線程,並清空棧。其中的 done 方法實現爲空:
protected void done() { }
子類能夠重寫該方法實現回調功能。
get: 獲取執行結果
// 獲取執行結果(阻塞式)
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 若任務未執行完,則等待它執行完成
if (s <= COMPLETING)
// 任務未完成
s = awaitDone(false, 0L);
// 封裝返回結果
return report(s);
}
// 獲取執行結果(有超時等待)
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
這兩個方法都是獲取任務執行的結果,原理也基本同樣,區別在於後者有超時等待(超時會拋出 TimeoutException 異常)。
awaitDone: 等待任務執行完成
// Awaits completion or aborts>private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 響應線程中斷
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// s > COMPLETING 表示任務已執行完成(包括正常執行、異常等狀態)
// 則返回對應的狀態值
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// s == COMPLETING 是一箇中間狀態,表示任務還沒有完成
// 這裏讓出 CPU 時間片
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 執行到這裏,表示 s == NEW,將當前線程封裝爲一個 WaitNode 節點
else if (q == null)
q = new WaitNode();
// 這裏表示 q 並未入棧,CAS 方式將當 WaitNode 入棧
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 有超時的狀況
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
// 將當前線程掛起
else
LockSupport.park(this);
}
}
該方法的主要判斷步驟以下:
1. 若線程被中斷,則響應中斷;
2. 若任務已完成,則返回狀態值;
3. 若任務正在執行,則讓出 CPU 時間片;
4. 若任務未執行,則將當前線程封裝爲 WaitNode 節點;
5. 若 WaitNode 未入棧,則執行入棧;
6. 若已入棧,則將線程掛起。
以上步驟是循環執行的,其實該方法的主要做用就是:當任務執行完成時,返回狀態值;不然將當前線程掛起。
removeWaiter: 移除棧中的節點
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart> for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;
}
break;
}
}
report 方法:封裝返回結果
private V report(int s) throws ExecutionException {
Object x = outcome; // 輸出結果賦值
// 正常結束
if (s == NORMAL)
return (V)x;
// 取消
if (s >= CANCELLED)
throw new CancellationException();
// 執行異常
throw new ExecutionException((Throwable)x);
}
該方法就是對返回結果的包裝,不管是正常結束或是拋出異常。
cancel: 取消任務
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
// 若容許中斷,則嘗試中斷線程
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
場景舉例
FutureTask 適合多線程執行一些耗時的操做,而後獲取執行結果。下面結合線程池簡單分析其用法,示例代碼以下(僅供參考):
public class FutureTaskTest {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<FutureTask<Integer>> taskList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int finalI = i;
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
// 模擬耗時任務
TimeUnit.SECONDS.sleep(finalI * 2);
System.out.println(Thread.currentThread().getName() + " 計算中……");
return finalI * finalI;
});
taskList.add(futureTask);
executorService.submit(futureTask); // 提交到線程池
}
System.out.println("任務所有提交,主線程作其餘操做");
// 獲取執行結果
for (FutureTask<Integer> futureTask : taskList) {
Integer result = futureTask.get();
System.out.println("result-->" + result);
}
// 關閉線程池
executorService.shutdown();
}
}
小結
FutureTask 是一個封裝任務(Runnable 或 Callable)的類,能夠異步執行任務,並獲取執行結果,適用於耗時操做場景。
參考連接:
http://www.hchstudio.cn/article/2017/2b8f/
https://segmentfault.com/a/1190000016572591
https://www.jianshu.com/p/43dab9b7c25b