Java多線程類FutureTask源碼閱讀以及淺析

  FutureTask是一個具體的實現類,實現了RunnableFuture接口,RunnableFuture分別繼承了Runnable和Future接口,所以FutureTask類既能夠被線程執行,又能夠拿到線程執行的結果。FutrueTask應用於多線程中異步處理並獲得處理結果的場景,好比:加入有個流程須要調用遠程接口拿到相關數據在本地進行處理,可是這個接口花費時間比較長。若是使用傳統的阻塞線程去處理的話,那麼就會一直阻塞在調用接口這裏,其它的事情都幹不了,這樣操做顯然效率相對較低的。所以,咱們可使用FutureTask來解決這個問題,FutureTask能夠異步調用遠端接口,那麼當前線程就能夠作與遠端接口無關的數據,左右開弓提升效率。java

  FutureTask UML類圖:設計模式

 

   FutureTask類簡單的使用示例:安全

 public static void main(String[] args) throws InterruptedException, ExecutionException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("異步處理");
            Thread.sleep(3000);
            return "ok";
        });
        new Thread(futureTask).start();
        System.out.println("同步處理其它事情");
        Thread.sleep(1000);
        System.out.println("等待異步處理結果:" + futureTask.get());
        System.out.println("處理完成");
    }

  

  1、成員變量

  FutureTask類有state,callable,outcome,runner和waiters 5個成員變量多線程

  

 

 

  1.state

    線程運行狀態,有如下幾種狀態:併發

      NEW:初始狀態,在初始化時的狀態,狀態值爲0;異步

      COMPLETING: 完成中狀態,run方法被調用時,對返回值進行賦值欠的狀態,值爲1;高併發

      NORMAL: 正常狀態,線程正常執行,在返回值被賦值被賦值成功後的狀態,值爲2;this

      EXCEPTIONAL:異常狀態,在執行用戶回調方式call的過程當中出現異常,值爲3;線程

      CANCELLED: 取消狀態,用戶調用cancel(false)方法時的狀態,值爲4;設計

      INTERRUPTING:打斷中狀態,用戶調用cancel(true)方法時的狀態,值爲5;

      INTERRUPTED: 被打斷狀態,用戶調用cancel(true)方法時,runner線程執行打斷方法完成後的狀態,值爲6;

 

    運行狀態轉換:

      NEW -> COMPLETING -> NORMAL

      NEW -> COMPLETING -> EXCEPTIONAL

      NEW -> CANCELLED

      NEW -> INTERRUPTING -> INTERRUPTED

    

 

 

  2.callable

  該成員變量用於異步執行用戶自定義業務代碼,當futureTask得到cpu時間片後調用run方法,在run方法中調用callable.call(),獲取到執行結果。

  

  3.outcome

  異步執行輸出結果,類型爲object。賦值時機時在callable.call()方式執行完成後。

 

  4.runner

  用於執行callable接口,在futureTask被cpu調度時會使用cas賦值爲當前線程。當前線程執行完成後設置爲null,等待gc回收。

 

  5.waiters

  內部類實現的單向鏈表,用於等待獲取執行結果。每次調用get()方法時都會將該線程放入等待隊列的頭部,當該線程被打斷後,或者get(timeout)方法過時後就會重這個等待隊列中移除。當callable.call()執行完成後會從頭部開始遍歷逐個喚醒等待線程,並將執行結果返回。

 

  2、核心方法

  1.run方法

  run方法間接實現於Runnable的接口,因此當futureTask線程得到cpu資源後會調用該方法。

  1.首先先判斷當前狀態是否爲初始化狀態,若是不是初始狀態直接結束該方法。不然使用cas方式給成員變量runner賦值,賦值爲當前線程。用cas方式可以保證多線程環境下賦值是線程安全的。不懂cas的同窗自行查閱相關資料。

  2.若是callable不爲null而且state狀態爲NEW,則執行callable.call()方法,並獲得該方法的返回值。

  3.若是執行call方法出現異常時,執行setException方法,該方法將state的NEW狀態使用cas方式修改成COMPLETING狀態,修改爲功後outcome設置爲當前拋出的異常,狀態再次改成EXCEPTIONAL狀態。而後將等待隊列中的線程都喚醒,並從隊列中移除。調用鉤子done()方法,將callable擲爲null。

  4.若是call方式執行成功,下一步則調用set方法,該方法首先將NEW狀態用cas修改爲COMPLETING狀態,修改爲功後將call執行結果賦值到outcome變量,COMPLETING狀態修改成NORMAL,喚醒等待線程並從隊列移除,調用狗子方法。

  5.執行finally代碼的代碼,將runner擲爲null,若是當前狀態爲打斷中,那麼會將當前資源讓出,直到線程最終被打斷。

 

 /**
     * Runnable#run();
     * 線程得到cpu資源後會執行該方法
     */
    public void run() {
        //判斷當前狀態是否是初始狀態
        //將runner賦值爲當前線程
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //調用用戶業務流程
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //拋出異常,修改響應的狀態
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

  

/**
     * 1.修改狀態值  NEW  --> COMPLETING -->  EXCEPTIONAL
     * 2.移除並喚醒全部等待中的線程
     * @param t
     */
    protected void setException(Throwable t) {
        //將state修改成COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            //將state修改成EXCEPTIONAL
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            //對返回值進行處理
            finishCompletion();
        }
    }

/**
     * 1.狀態值  NEW  --> COMPLETING  --> NORMAL
     * 2.設置執行結果值
     * 3.喚醒全部等待中的線程
     * @param v
     */
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

  

  2.get() 和 get(long timeout,TimeUnit timeUnit);

  兩個方法的區別在於前者沒有超時時間,後者由超時時間,流程是基本差很少的。

  1.若是state爲COMPLETED,進入report方法,該方法會判斷當前的狀態爲NORMAL時將outcome返回,不然拋出異常。

  2.若是state不爲COMPLETED進入awaitDone方法。

  3.awaitDone方法顧名思義就是等待操做結果。方法裏面是一個死循環,在循環過程當中若是線程被打斷,就會拋出異常,並將剛建立的等待線程從隊列中移除。

  4.若是狀態已完成,將等待線程綁定的線程設爲null,並將狀態返回。

  5.若是當前狀態爲COMPLETING則將當前cpu資源讓出給其它線程。

  6.若是等待節點爲null,就建立一個新的節點,該節點綁定了當前的線程。

  7.若是新建立的節點尚未與等待隊列進行綁定,那麼就將該節點放入隊列頭部。

  8.若是調用的是由過時時間的方法,那麼判斷若是已經到期了則將該節點從隊列中移除,並返回狀態。不然進入有過時的等待。

  9.線程進入等待狀態,線程會阻塞在這裏,等待run方法執行完成後調用unPark方法。

  10.線程被喚醒後,進入report方法。

  

/**
     * @throws CancellationException {@inheritDoc}
     */
    /**
     * 獲取執行結果
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    /**
     * 獲取執行結果
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

  

  3.awaitDone(boolean timed, long nanos)

  該方法是等待完成,或者線程被打斷而拋出異常退出,有或者是通過nanos 這麼多納秒後退出。方法內部是一個死循環,經過各類條件判斷是否知足條件退出,不然線程進入等待狀態,直到被其餘線程喚醒。

  1.首先會判斷當前線程是否有打斷標記,若是被打斷過,刪除剛建立出來的等待節點,並拋出InterruptedException異常。

  2.若是當前任務是已完成狀態,直接將當前狀態返回。

  3.若是當前任務狀態爲完成中,說明其餘線程正在操做,當前線程無須要重複操做,只須要將cpu資源讓出來。

  4.若是前三個條件均未知足,則會建立等待節點,而後進入第二輪循環。

  5.第二輪循環,將第二輪循環建立的等待節點放入等待鏈表的頭部,並使用cas方式給waiters賦值,保證多線程下正常正確的賦值。

  6.第三輪循環,若是用戶調用的是有過時時間的get方法,則會計算當前剩餘時間,1)若是剩餘時間小於等於0,則說明已通過期,那麼就會移除當前等待中的節點,將當前任務狀態返回。2)不然調用LockSupport的有過時時間的parkNanos,該方法會讓線程進入等待狀態,也即線程會阻塞在這裏,過時時間不會超過用戶傳入的過時時間。若是用戶調用的是沒有過時時間的方法,那麼調用LockSupport的有無過時時間的parkNanos,該方法會讓線程無限的等待下午,知道有其餘線程將他喚醒。

  源碼:

/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * 等待完成或打斷退出或超時退出
     * @param timed true if use timed waits 是否有超時時間
     * @param nanos time to wait, if timed 等待時間
     * @return state upon completion 狀態碼
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //返回true,線程被打斷過,但並不會直接拋出異常
            //而是等其餘線程將線程喚醒以後,發現該線程在等待過程當中執行了打斷操做
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            //這裏的意思是任務已完成,又能夠能是正常結束,也有能夠能是用戶取消,或者異常,打斷
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //讓出cpu資源
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            //第一次循環會進入這個條件建立節點
            else if (q == null)
                q = new WaitNode();
            //第二次循環給新建立的q節點放在waiters鏈表的頭部
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                //過時退出
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //有限時間掛起線程
                LockSupport.parkNanos(this, nanos);
            }
            else
                //無限時間掛起線程
                LockSupport.park(this);
        }
    }

  

  3、總結

  FutureTask是運用了高併發設計思想的Future設計模式。它很好的處理了高併發下處理多件獲取或建立數據並沒有相關聯的操做耗時長的問題。設計者能夠將耗時比較長的操做(好比遠程調用接口等)使用異步的方式(即建立一個新的線程)去處理,那麼主線程就能夠作其餘的事情了,這樣能夠大大減小總體的處理時間。這個模式適用於多個無關聯的時間,若是A操做的進行須要B操做的結果才能夠開始,那麼A實際上是一直帶阻塞等待B的結果的,這個串行執行的耗時差很少,使用future模式意義不大。

  FutureTask的get方法在用戶邏輯代碼未返回結果時仍而後進入阻塞,可是用戶業務代碼的執行並不受主線程(建立FutureTask的線程)的影響。咱們能夠經過重寫done方法來獲取到完成動做,這樣咱們再調用get方法時就不會阻塞。

  在現實生活中就有不少相似Future模式的例子。好比你的生日快到了,你須要去蛋糕店訂蛋糕,同時還須要買其餘的禮品,開party所需的東西等,假設蛋糕店製做蛋糕須要花費1個小時,購買其餘物品須要2小時。用傳統的串行的方式就是你去蛋糕店跟老闆說你要訂蛋糕,老闆根據你的需求開始製做蛋糕,你就在店裏坐着等製做完成。1個小時後終於製做好蛋糕了,而後你才能拿着蛋糕去買其餘東西,買完其餘東西有須要耗費2小時,最後你總共花費了3小時。當使用Future模式時,你事先寫好你須要訂多大的,什麼口味的蛋糕,而後去到蛋糕跟老闆說你先去買其餘東西,一會再過來拿。可是你忘記留聯繫方式給蛋糕店老闆了(沒有重寫done方法),因此你並不知道蛋糕何時作好,提早過去拿,那你還得在店裏等蛋糕作好。若是重寫了done方法,至關於給店老闆留了電話號碼,等蛋糕作好老闆就會打電話給你,你過拿蛋糕時就不會說太早過去要等一會或太晚過去了。咱們用最壞的狀況來計算,你買其餘東西花了兩個小時,製做蛋糕花了1個小時。因爲製做蛋糕和你沒其餘東西是分開同時進行的,因此最終你只花了2個小時,比串行的方式快了1個小時。

  以上就是我在看FutureTask源碼過程當中的總結,若有錯漏歡迎提出。

相關文章
相關標籤/搜索