可取消的異步計算。該類提供了 Future的基本實現,其中包括啓動和取消計算的方法,查詢計算是否完成以及檢索計算結果的方法。只有在計算完成後才能檢索結果;若是計算尚未完成,{getcode}方法將會被阻塞。一旦計算完成,計算不能被從新啓動或取消(除非計算是使用調用的runAndReset()。java
該類實現自RunableFuture接口,其中RunableFuture接口又繼承自Runable和Future。因此能夠理解爲:FutureTask是一個能夠計算Future結果的一個Future實現,git
因爲FutureTask間接或直接實現了Runable和Future接口,因此其具備以下特徵:github
能夠像一個普通的任務同樣,使用線程池提交一個任務並執行。數據庫
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { return 100; } }));
能夠像一個普通的任務同樣,使用Thread來執行,但能夠異步獲取結果。緩存
FutureTask futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { return 100; } }); new Thread(futureTask).start(); futureTask.get();
考慮一種使用Cache的場景:通常狀況下,對於熱點數據咱們都會使用cache保存數據,只有當cache失效了,纔會進行耗時的網絡調用或者數據庫查詢。可是當cache失效時,同時有多個該key的查詢,那麼在短期內可能會有多個相同的耗時查詢,瞬間對系統性能會有必定的損失,爲了解決這種狀況能夠採起緩存FutureTask的方式解決:網絡
思路借鑑:https://github.com/javacreed/...異步
//獲取緩存的客戶端 public class CacheClient { public static <T> T getCache(int id){ return null; } } //Service層邏輯 public class CacheService { private static ConcurrentMap<Integer,FutureTask<User>> cacheFuture = new ConcurrentHashMap<>(); public User getUserInfo(int id) { Future<User> future = createFutureIfAbsent(id); try { return future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } private Future<User> createFutureIfAbsent(final int id) { Future<User> future = cacheFuture.get(id); if (future == null) { FutureTask<User> futureTask = new FutureTask<User>(new Callable<User>() { @Override public User call() throws Exception { return CacheClient.getCache(id); } }); future = cacheFuture.putIfAbsent(id, futureTask); if (future == null) { future = futureTask; futureTask.run(); } } return future; } public class User { private int id; private String name; private String age; 。。。 } }
FutureTask做爲一個可運行的Future,其運行過程當中存在狀態的遷移過程,FutureTask的運行狀態有:ide
狀態躍遷:性能
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
FutureTask有兩個構造方法,雖然兩個構造方法的入參略有不一樣,可是在底層執行時都是按照Callback任務來構建的。並在此過程初始化當前的任務狀態爲:NEWthis
下面將從核心方法開始,逐漸分析FutureTask的原理:
run():任務執行
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
該方法的邏輯很簡單,主要完成了以下任務:
1.首先判斷任務的有效性:1)該任務的狀態是否爲初始狀態:NEW,2)把運行任務的線程設置給成員變量runner。
2.執行任務。
3.根據執行結果設置狀態。
get()/get(long timeout, TimeUnit unit)
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }```
該方法的邏輯更簡單:首先判斷當前的狀態,而後就會調用awaitDone()方法等待結果,當等待超時就會拋出TimeOutException,不然調用report()將結果報告出去。下面看看等待結果是如何處理的:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { //首先計算出該任務的最終結束時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //判斷當前線程是否被中斷 if (Thread.interrupted()) { //從等待隊列中刪除該線程的等待節點 removeWaiter(q); throw new InterruptedException(); } int s = state; //若是狀態>COMPLETING,說明任務已經結束了,無論是否正常結束,都是能夠返回的 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //若是當前狀態仍是COMPLETING,說明結果來沒有返回呢,那就讓出CPU else if (s == COMPLETING) // cannot time out yet Thread.yield(); //若是當前任務尚未生成等待節點,那麼就建立一個以當前線程的等待節點。 else if (q == null) q = new WaitNode(); else if (!queued) //採用頭插法構建等待隊列 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); //任務執行超時了,那麼就刪除等待隊列 if (nanos <= 0L) { removeWaiter(q); return state; } //尚未超時,那麼就將當前線程park LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }
該方法雖然篇幅很大,可是完成的任務也是很簡單的,主要能夠總結以下:
NOTE:爲何要使用這個waiter
?[單獨文章分析:]