多線程高併發編程(7) -- Future源碼分析

一.概念

  A Future計算的結果。 提供方法來檢查計算是否完成,等待其完成,並檢索計算結果。 結果只能在計算完成後使用方法get進行檢索,若有必要,阻塞,直到準備就緒。 取消由cancel方法執行。 提供其餘方法來肯定任務是否正常完成或被取消。 計算完成後,不能取消計算。 若是您想使用Future ,以便不可撤銷,但不提供可用的結果,則能夠聲明Future<?>表格的類型,並返回null做爲基礎任務的結果。 node

public interface Future<V> {
    //嘗試取消執行此任務。若是任務已經完成,已經被取消或因爲某些其餘緣由而沒法取消,則此嘗試將失敗。
    //若是成功,而且在調用 cancel 時此任務還沒有開始,則該任務永遠沒法運行。
    //若是任務已經開始,則 mayInterruptIfRunning 參數肯定是否應中斷執行該任務的線程以嘗試中止該任務。
    //mayInterruptIfRunning == true, 表示中斷執行中的線程,false 表示讓線程正常完成
    boolean cancel(boolean mayInterruptIfRunning);
    //若是此任務在正常完成以前被取消,則返回true。
    boolean isCancelled();
    //若是此任務完成,則返回true。完成多是因爲正常終止,異常或取消引發的,在全部這些狀況下,此方法都將返回true。
    boolean isDone();
    //必要時等待計算完成,而後檢索其結果
    V get() throws InterruptedException, ExecutionException;
    //必要時最多等待給定時間以完成計算,而後檢索其結果(若是有)。
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

  

  Future是一個接口,提供了方法來檢測當前的任務是否已經結束,還能夠等待任務結束而且拿到一個結果,經過調用Future的get()方法能夠當任務結束後返回一個結果值,若是工做沒有結束,則會阻塞當前線程,直到任務執行完畢;能夠經過調用cancel()方法來中止一個任務,若是任務已經中止,則cancel()方法會返回true;若是任務已經完成或者已經中止了或者這個任務沒法中止,則cancel()會返回一個false。當一個任務被成功中止後,他沒法再次執行。isDone()和isCancel()方法能夠判斷當前工做是否完成和是否取消。  多線程

  類圖結構:

 

  • ScheduledFuture:這個接口表示一個延時的行爲能夠被取消。一般一個安排好的future是定時任務SchedualedExecutorService的結果;
  • RunnableFuture: 這個接口同時繼承Future接口和Runnable接口,在成功執行run()方法後,能夠經過Future訪問執行結果;
  • ForkJoinTask:基於任務的抽象類,能夠經過ForkJoinPool來執行。一個ForkJoinTask是相似於線程實體,可是相對於線程實體是輕量級的。大量的任務和子任務會被ForkJoinPool池中的真實線程掛起來,以某些使用限制爲代價;
  • CompletableFuture:一個Future類是顯示的完成,並且能被用做一個完成等級,經過它的完成觸發支持的依賴函數和行爲。當兩個或多個線程要執行完成或取消操做時,只有一個可以成功;
  • RunnableScheduledFuture:執行延遲和週期性任務;在成功執行run()方法後,能夠經過Future訪問執行結果;
  • FutureTask:可取消的異步計算
    • 該類提供了一個Future的基本實現 ,具備啓動和取消計算的方法,查詢計算是否完整,並檢索計算結果。結果只能在計算完成後才能檢索; 若是計算還沒有完成,則get方法將阻止。一旦計算完成,則沒法從新啓動或取消計算(除非使用runAndReset()調用計算 );
    • A FutureTask可用於包裝Callable或Runnable對象。 由於FutureTask實現Runnable ,一個FutureTask能夠提交到一個Executor執行;
  • RecursiveTask:遞歸結果ForkJoinTask;
  • RecursiveAction:遞歸結果ForkJoinTask;

二.用法

   一個場景,咱們要學習作飯,那麼咱們須要準備廚具和食材,廚具經過電子商務網購,食材去菜市場挑選。那麼可使用多線程來併發進行,即咱們能夠先網購下單,在等待快遞員送貨過來的這段時間去菜市場買食材,節省時間,提升效率。併發

  1. 直接開啓線程,使用類繼承Thread重寫方法實現網購,join阻塞直到廚具到達纔開始作飯。
    public class FutureTest {
        public static void main(String[] args) throws InterruptedException {
            long startTime = System.currentTimeMillis();
            OnlineShopping shopping = new OnlineShopping();
            shopping.start();
            Thread.sleep(2000);//等待送貨執行完
            System.out.println("第二步:食材到位");
            shopping.join();//阻塞訂單直到快遞送到獲得廚具
            System.out.println("第三步:開始廚藝");
            System.out.println("總共用時:" + (System.currentTimeMillis() - startTime) + "ms");
        }
    
        static class OnlineShopping extends Thread {
            @Override
            public void run() {
                System.out.println("第一步:下單");
                System.out.println("第一步:等待送貨");
                try {
                    Thread.sleep(5000);//送貨中
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("第一步:快遞送到");
            }
        }
    }
    //======結果======
    第一步:下單
    第一步:等待送貨
    第二步:食材到位
    第一步:快遞送到
    第三步:開始廚藝
    總共用時:5003ms 
  2. 使用Future模式來完成上述操做,經過Callable返回結果來獲取廚具,能夠經過FutureTask靈活地操做訂單,因而可知,比繼承Thread完成的訂單,Future模式更具備靈活性,
    public class FutureTest {
        public static void main(String[] args) throws Exception {
            long startTime = System.currentTimeMillis();
            Callable<String> shopping = () ->{
                System.out.println("第一步:下單");
                System.out.println("第一步:等待送貨");
                Thread.sleep(5000);//快遞員送貨中
                System.out.println("第一步:快遞送到");
                return "廚具到達";
            };
            FutureTask<String> task = new FutureTask<>(shopping);
            new Thread(task).start();
            Thread.sleep(2000);//保證下單操做執行到「等待送貨」中
            System.out.println("第二步:食材到位");
            if (!task.isDone()) {  // 聯繫快遞員,詢問是否到貨
                System.out.println("第三步:廚具還沒到,心情好就等着(心情很差就調用cancel方法取消訂單)");
            }
            String chuju = task.get();//獲得廚具
            System.out.println("第三步:開始廚藝");
            System.out.println("總共用時:" + (System.currentTimeMillis() - startTime) + "ms");
        }
    }
    //======結果======
    第一步:下單
    第一步:等待送貨
    第二步:食材到位
    第三步:廚具還沒到,心情好就等着(心情很差就調用cancel方法取消訂單)
    第一步:快遞送到
    第三步:開始廚藝
    總共用時:5048ms 

三.分析

  使用Future模式的三部曲:app

  1. 建立Callable重寫call方法,把網購邏輯封裝到call中,返回定義結果「廚具」;
    public interface Callable<V> {
        V call() throws Exception;
    }
  2. 建立FutureTask,把Callable實例放入FutureTask的構造方法中;
    public class FutureTask<V> implements RunnableFuture<V>{
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // 確保callable的可見性
        }
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // 確保callable的可見性
        }
    }
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }

     FutureTask的run方法:Callable的call是被FutureTask的run方法調用的,不是異步運行的;

    public void run() {
            // 1. 若是 state !=  NEW 說明 run 方法已經運行過,直接 return
            // 2. 若是 state == NEW && CAS 競爭 設置 runner 失敗,說明已經有別的線程在運行,直接 return
            // NEW 的狀態由構造方法初始化,runner 是運行該 Callable 的線程
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;// 這裏的callable是從構造方法裏面傳人的
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;// 標識符
                    try {
                        result = c.call();//得到結果
                        ran = true;
                    } catch (Throwable ex) {//異常
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)//成功沒有異常,設置返回值
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                //在狀態設置以前,runner必須是非空的,以防止對run()的併發調用
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                //爲防止泄漏中斷,必須在空runner以後將狀態設置爲重複讀
                int s = state;
                // 若是最終狀態 >= INTERRUPTING,則處理中斷
                // cancel 方法會經過參數 mayInterruptIfRunning 來設置 state 的值
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

     狀態屬性state:

        private volatile int state;//狀態,volatile讓狀態可見性
        private static final int NEW          = 0;//構造方法建立時的狀態
        private static final int COMPLETING   = 1;//這是一箇中間態,完成時和出現異常時有使用到
        private static final int NORMAL       = 2;//完成運行時的最終狀態
        private static final int EXCEPTIONAL  = 3;//異常時的最終狀態
        private static final int CANCELLED    = 4;//已取消
        private static final int INTERRUPTING = 5;//中斷中
        private static final int INTERRUPTED  = 6;//已中斷
        
        可能的 state 轉換:
        NEW -> COMPLETING -> NORMAL
        NEW -> COMPLETING -> EXCEPTIONAL
        NEW -> CANCELLED
        NEW -> INTERRUPTING -> INTERRUPTED

    set設置返回值:

     private Object outcome;//經過get方法得到的返回值
        //設置返回值,狀態NEW -> COMPLETING -> NORMAL
        protected void set(V v) {
             // 這裏爲何要用 CAS 由於可能會和 cancel 方法產生競爭。
            // 若是競爭失敗,說明取消競爭成功,在 cancel 方法承擔喚醒的工做,因此直接跳過。
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//NEW -> COMPLETING
                // 競爭成功
                outcome = v;//outcome爲返回結果
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終狀態爲NORMAL,COMPLETING -> NORMAL
                finishCompletion();
            }
        }

    setException運行時異常:

    //狀態:NEW -> COMPLETING -> EXCEPTIONAL
    protected void setException(Throwable t) {
            // 這裏爲何要用 CAS 由於可能會和 cancel 方法產生競爭。
            // 若是競爭失敗,說明取消競爭成功,在 cancel 方法承擔喚醒的工做,因此直接跳過。
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//NEW -> COMPLETING
                // 競爭成功
                outcome = t; // outcome 爲一個 Throwable
                // 把最終狀態改成 EXCEPTIONAL
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state,COMPLETING -> EXCEPTIONAL
                finishCompletion();
            }
        }

    finishCompletion:

        //刪除當前線程並喚醒全部等待線程,調用done(),並取消進行中的方法
        private void finishCompletion() {
            // assert state > COMPLETING;
            //從 waiters 末尾開始遍歷,for 自旋直到 CAS 成功。
            for (WaitNode q; (q = waiters) != null;) {
                // 使用 CAS 把 waiters 設置爲 null,和 awaitDone 和 removeWatier 方法競爭
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    // 自旋喚醒全部線程
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);// 喚醒線程
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
            // 這個一個空方法,主要用於繼承重寫,這也是模板模式的體現
            done();
            callable = null;        // to reduce footprint
        }
        protected void done() { }
    
    
        //===========例子==============
        //ExecutorCompletionService 的做用就是把線程池的執行結果放到一個已完成隊列中,方便獲取執行結果,其內部主要經過一個 FutureTask 的實現類 QueueingFuture 來實現這個功能:
        private class QueueingFuture extends FutureTask<Void> {
                QueueingFuture(RunnableFuture<V> task) {
                    super(task, null);
                    this.task = task;
                }
                protected void done() { completionQueue.add(task); }//done方法是FutureTask方法的重寫。FutureTask在完成時會執行done方法,把task放入已完成隊列completionQueue。
                private final Future<V> task;
            }

    get得到返回結果:

        public V get() throws InterruptedException, ExecutionException {
            int s = state;//獲得狀態
            if (s <= COMPLETING)//狀態未完成,把獲取結果的線程放入等待鏈表,而後阻塞,直至被中斷、完成或出現異常。
                s = awaitDone(false, 0L);
            return report(s);//返回結果
        }
        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;//用於計時
            WaitNode q = null;
            boolean queued = false;
            for (;;) {//自旋
                //若是已經被中斷,則removeWaiter,拋出中斷異常
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                int s = state;
                if (s > COMPLETING) {//task已經結束
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                //第二個線程進來,正在運行,發現前面有等待節點,則讓出cpu
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                // 第一次遍歷,初始化 WaitNode,第一個節點進來時進行,第一個線程狀態是new
                else if (q == null)
                    q = new WaitNode();
                 // 是否已入隊,沒有則把WaitNode接到末尾,第一個線程第二次遍歷時運行下面代碼
                else if (!queued)
                    // 和 finishCompletion 和 removeWaiter 競爭
                    // 1. finishCompletion競爭成功,說明state已經 > COMPLETING則下次循環就會退出
                    // 2. removeWaiter競爭成功,說明waiters變化了,下一次循環再次競爭
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                // 若是使用了計時,則判斷是否超時,若是超時則移出WaitNode並當即返回無需等待結果,不然阻塞 nanos
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    //阻塞,直到被喚醒(正常完成 || 異常 || 中斷)
                    LockSupport.park(this);
            }
        }
        //根據awaitDone返回狀態返回結果或拋出異常
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)//正常
                return (V)x;
            if (s >= CANCELLED)//取消
                throw new CancellationException();
            // task 執行過程當中出現異常
            throw new ExecutionException((Throwable)x);
        }

    removeWaiter:

        /**
             * Tries to unlink a timed-out or interrupted wait node to avoid
             * accumulating garbage.  Internal nodes are simply unspliced
             * without CAS since it is harmless if they are traversed anyway
             * by releasers.  To avoid effects of unsplicing from already
             * removed nodes, the list is retraversed in case of an apparent
             * race.  This is slow when there are a lot of nodes, but we don't
             * expect lists to be long enough to outweigh higher-overhead
             * schemes.
             *嘗試取消連接超時或中斷的等待節點以免堆積垃圾。內部節點的拼接沒有CAS,
             *由於這對釋放者不管如何遍歷都沒有影響。 爲了不已刪除節點節點未拼接的影響,
             *若是出現明顯的競爭,則從新遍歷列表。 當節點不少時會很慢,可是咱們不
             *指望列表足夠長以抵消較高的開銷計劃。
             */
        private void removeWaiter(WaitNode node) {
            if (node != null) {
                node.thread = null;
                retry:
                for (;;) {          // restart on removeWaiter race
                    //遍歷整個鏈表
                    for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                        s = q.next;
                        //把q看成前一個節點,遍歷下一個節點
                        if (q.thread != null)
                            pred = q;
                         // q.thread == null && pred != null,表示當前節點不是第一個節點,是一箇中間節點
                         // 這裏沒有使用 CAS,若是出現多個線程同時遍歷,前一個節點變爲null,則從新從頭遍歷
                         // 爲何沒有使用 CAS 由於做者的想法是這個鏈表不會太長,因此咱們使用時不該該使這個鏈表太長
                         // 操做:把下一個節點鏈接到前一個節點的後面
                        else if (pred != null) {
                            pred.next = s;//把s鏈接到pred後面
                            if (pred.thread == null) // check for race
                                continue retry;
                        }
                        // q.thread == null && pred == null,表示第一個節點的 thread == null,
                        // 這裏使用 CAS,由於可能多個線程在操做
                        // 操做:把下一個節點設置爲末尾節點,若是競爭失敗則從新從頭遍歷
                        else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                              q, s))
                            continue retry;
                    }
                    break;
                }
            }
        }

     isDone:

        public boolean isDone() {
            return state != NEW;
        }
  3.  建立Thread,把FutureTask實例放入構造方法中,start開啓線程less

 參考:https://www.jianshu.com/p/414cc2f0002c異步

相關文章
相關標籤/搜索