FutureTask

FutureTask

What is it


​ 可取消的異步計算。該類提供了 Future的基本實現,其中包括啓動和取消計算的方法,查詢計算是否完成以及檢索計算結果的方法。只有在計算完成後才能檢索結果;若是計算尚未完成,{getcode}方法將會被阻塞。一旦計算完成,計算不能被從新啓動或取消(除非計算是使用調用的runAndReset()。java

​ 該類實現自RunableFuture接口,其中RunableFuture接口又繼承自Runable和Future。因此能夠理解爲:FutureTask是一個能夠計算Future結果的一個Future實現,git

How to use


因爲FutureTask間接或直接實現了Runable和Future接口,因此其具備以下特徵:github

  • 能夠像一個普通的任務同樣,使用線程池提交一個任務並執行。數據庫

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.submit(new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
              return 100;
        }
    }));
  • 能夠像一個普通的任務同樣,使用Thread來執行,但能夠異步獲取結果。緩存

    FutureTask futureTask = new FutureTask<Integer>(new Callable<Integer>() {
       @Override
       public Integer call() throws Exception {
          return 100;
       }
    });
    new Thread(futureTask).start();
    futureTask.get();

When to use


考慮一種使用Cache的場景:通常狀況下,對於熱點數據咱們都會使用cache保存數據,只有當cache失效了,纔會進行耗時的網絡調用或者數據庫查詢。可是當cache失效時,同時有多個該key的查詢,那麼在短期內可能會有多個相同的耗時查詢,瞬間對系統性能會有必定的損失,爲了解決這種狀況能夠採起緩存FutureTask的方式解決:網絡

思路借鑑:https://github.com/javacreed/...異步

//獲取緩存的客戶端
public class CacheClient {
    public static <T> T getCache(int id){
        return null;
    }
}
//Service層邏輯
public class CacheService {
    private static ConcurrentMap<Integer,FutureTask<User>> cacheFuture = new ConcurrentHashMap<>();
    public User getUserInfo(int id) {
        Future<User> future = createFutureIfAbsent(id);
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
    private Future<User> createFutureIfAbsent(final int id) {
        Future<User> future = cacheFuture.get(id);
        if (future == null) {
            FutureTask<User> futureTask = new FutureTask<User>(new Callable<User>() {
                @Override
                public User call() throws Exception {
                    return CacheClient.getCache(id);
                }
            });
            future = cacheFuture.putIfAbsent(id, futureTask);
            if (future == null) {
                future = futureTask;
                futureTask.run();
            }
        }
        return future;
    }
    public class User {
        private int id;
        private String name;
        private String age;
        。。。
    }
}

How to design

狀態機

​ FutureTask做爲一個可運行的Future,其運行過程當中存在狀態的遷移過程,FutureTask的運行狀態有:ide

  • NEW:初始狀態。
  • COMPLETING:結果正在被set過程當中。
  • NORMAL:任務正常執行結束。
  • EXCEPTIONAL:任務執行過程當中發生異常。
  • CANCELLED:任務執行過程當中被取消。
  • INTERRUPTING:任務即將被中斷。
  • INTERRUPTED:任務已經被中斷。

    狀態躍遷:性能

  • 正常結束:NEW->COMPLETING->NORMAL
  • 出現異常:NEW->COMPLETING->EXCEPTIONAL
  • 任務被取消且不響應中斷:NEW->CANCELLED
  • 任務被取消且響應中斷:NEW->INTERRUPTING->INTERRUPTED
成員變量
  • state:指示當前任務執行的狀態。
  • callback:須要被運行的任務。運行完成後將會清空。
  • outcome:保存任務執行以後的結果。
  • runner:持有任務執行過程當中運行線程。
  • waiters:等待線程的堆棧[稍後將作詳細分析]。
構造方法
public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
}

FutureTask有兩個構造方法,雖然兩個構造方法的入參略有不一樣,可是在底層執行時都是按照Callback任務來構建的。並在此過程初始化當前的任務狀態爲:NEWthis

核心方法

下面將從核心方法開始,逐漸分析FutureTask的原理:

  • run():任務執行

    public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                runner = null;
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

    該方法的邏輯很簡單,主要完成了以下任務:

    1.首先判斷任務的有效性:1)該任務的狀態是否爲初始狀態:NEW,2)把運行任務的線程設置給成員變量runner。

    2.執行任務。

    3.根據執行結果設置狀態。

  • get()/get(long timeout, TimeUnit unit)

    public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
     }```

該方法的邏輯更簡單:首先判斷當前的狀態,而後就會調用awaitDone()方法等待結果,當等待超時就會拋出TimeOutException,不然調用report()將結果報告出去。下面看看等待結果是如何處理的:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
          //首先計算出該任務的最終結束時間
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //判斷當前線程是否被中斷
            if (Thread.interrupted()) {
                  //從等待隊列中刪除該線程的等待節點
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            //若是狀態>COMPLETING,說明任務已經結束了,無論是否正常結束,都是能夠返回的
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
              //若是當前狀態仍是COMPLETING,說明結果來沒有返回呢,那就讓出CPU
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            //若是當前任務尚未生成等待節點,那麼就建立一個以當前線程的等待節點。
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
            //採用頭插法構建等待隊列
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                  //任務執行超時了,那麼就刪除等待隊列
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                  //尚未超時,那麼就將當前線程park
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }

該方法雖然篇幅很大,可是完成的任務也是很簡單的,主要能夠總結以下:

  • 首先判斷在超時時間內,任務是否執行完成(失敗)。
  • 經過狀態爲判斷任務是否執行完成或失敗。

​ NOTE:爲何要使用這個waiter?[單獨文章分析:]

Conclusion
相關文章
相關標籤/搜索