java多線程系列之future機制

java多線程系列之future機制

future是什麼?

  • 在執行比較耗時的任務的時候,咱們常常會採起新開線程執行的方式,好比在netty中,若是在io線程中處理耗cpu的計算任務,那麼就會形成io線程的堵塞,致使吞吐率的降低(比較好理解,原本io線程能夠去處理io的,如今卻在等待cpu執行計算任務),這嚴重影響了io的效率。
  • 通常咱們採用線程池來執行異步任務,通常狀況下不須要獲取返回值,可是特殊狀況下是須要獲取返回值的,也就是須要拿到異步任務的執行結果,舉個例子來講:對大量整數進行求和,若是採用多線程來求解,就須要一個彙總線程和多個計算線程,計算線程執行具體的計算任務而且返回求和值,彙總線程進行多個求和值最後的彙總。
  • 那麼若是咱們本身要實現這個異步計算的程序的話能夠採用什麼方式呢?這其實是線程之間的通訊機制,即咱們的彙總線程須要拿到全部計算線程執行完畢的結果,那麼咱們能夠採用共享內存來實現,定義一個全局的map,每一個計算線程執行完畢的結果都放到到map中,而後彙總線程從全局map中取出結果進行累加彙總,這樣就搞定了,這裏面雖然思想很簡單,可是仍是有一些細節須要考慮的,好比彙總線程怎麼判斷全部的任務都執行完畢呢?能夠經過計算任務的總數和已經完成計算任務的數目進行比較。總之咱們確定能夠實現一套這樣的異步計算框架。
  • 那麼進一步抽象,在上面的實現過程當中,實際上咱們關心的就是每一個任務執行的結果,以及任務是否執行完畢,對應到上面提到的計算框架,就是咱們關心是否計算完畢和計算完畢後的值,有了這兩部分的值,咱們的彙總線程就可以很方便的進行計算總的結果了。
  • 其實仔細觀察,對於幾乎全部的異步執行線程,咱們都是關心這兩部分值的,即任務是否執行完畢,執行完後的結果(若是不須要結果能夠返回null),那麼這兩部分的東西確定能夠抽象出來,避免咱們每次編寫線程執行的run方法的時候都要本身提交結果和設置完成標誌,因而java就是設計了這麼一套future機制來幫助開發者

上面就是我結合本身的理解分析的future機制的設計思想,可能說的不夠全,但願有人能夠補充。下面會講解java future的具體實現java

總結一句話:咱們須要異步執行任務而且知道異步任務的執行結果和執行狀態,咱們能夠本身來實現,可是因爲這部分比較通用,因此java經過一種future機制來爲咱們實現了這些功能,這就是future。

下面分析java裏面future機制的具體實現

  • execute方式:咱們知道一個類若是實現了runnable接口,它就可以被線程來執行,由於實現了runnable接口就擁有了run方法,因此可以被執行。因此最簡單的異步線程執行方式以下:利用Executors框架來建立一個線程池,而後調用execute方法來提交異步任務,注意這裏的execute方法是沒有返回的,也就是說咱們無法知道提交的任務的執行結果。node

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.execute(()->System.out.println("異步執行!"));複製代碼
  • submit方式:前面提到的java給咱們提供的線程池接口ExecutorService提供了兩種提交異步任務的方式,一種就是沒有返回值的execute方法(因爲ExecutorService接口是extends了Executor接口的,因此擁有了execute方法),還有一種是帶有返回值的submit方法。在submit方法中,提供了三個重載方法:多線程

    <T> Future<T> submit(Callable<T> task);
        Future<?> submit(Runnable task);
    <T> Future<T> submit(Runnable task, T result);複製代碼

    能夠看到,submit方法支持實現了callable和runnable的task,不一樣於runnable只有沒有返回值的run方法,callable提供了一個帶返回值的call方法,能夠有返回值。正是由於runnable沒有返回值,因此第二個重載方法返回值爲null,第三個重載方法裏面能夠從外部設置一個返回值,這個返回值將會做爲runnable的返回值。具體代碼以下:併發

    public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }複製代碼

    兩個方法都調用newTaskFor方法來建立了一個RunnableFuture的對象,而後調用execute方法來執行這個對象,說明咱們線程池真正執行的對象就是這個RunnableFuture對象。框架

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }複製代碼

    由上面代碼看出就是建立了一個futureTask對象,這個對象封裝了咱們提供的runnable和callable對象。futuretask實現了runnablefuture接口,這就是說明futuretask具有了runnable的功能(能被線程執行)和future功能(可以獲取自身執行的結果和狀態)。能被線程執行功能是咱們本身經過實現runnable接口或者callable接口來完成的。future功能前面咱們提過是很通用的功能,因此java給咱們實現了。下面就進入futuretask查看。異步

  • futuretask對象:futuretask是真正的future功能實現的地方。前面說過這個一個RunnableFuture對象,因此咱們看看它的run方法this

    private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;    
        /** 封裝的callable對象 */
        private Callable<V> callable;
        /** task的執行結果 */
        private Object outcome; 
        /** 當前線程池的哪一個線程正在執行這個task */
        private volatile Thread runner;
        /** 等待的線程列表 */
        private volatile WaitNode waiters;
    
        public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;// 1. 內部包裝的一個callable對象
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();// 2. 調用包裝的call方法
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);//3. 設置返回值
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }複製代碼

    前面提到futuretask是封裝了runnable和callable的,但是爲何內部只有一個callable呢,其實是由於futuretask本身調用適配器轉換了一下:代碼以下,採用了java的適配器模式。spa

    public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    
        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }複製代碼

    futuretask的run方法調用了內部封裝的callable對象的call方法,獲取返回值,而且設置到本身outcome中,state表明執行的狀態,這樣就經過代理的方式代理了咱們的callable的call方法,幫助咱們獲取執行的結果和狀態,因此咱們本身編寫業務邏輯的時候就不用去管這層通用的邏輯了。這裏面還有一個waitnode咱們單獨講線程

  • WaitNode: 經過前面的分析咱們知道,實際上咱們submit任務以後返回的future對象就是線程池爲咱們建立的runnablefuture對象,也就是futuretask這個對象。future接口爲咱們提供了一系列的方法,以下設計

    V get() throws InterruptedException, ExecutionException;
        boolean cancel(boolean mayInterruptIfRunning);複製代碼

    上面是主要的兩個方法,get和cancel,cancel的時候調用runner的interrupt方法便可

    public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW &&
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();
            }
            return true;
        }複製代碼

    其中unsafe是用於cas操做的,在java併發包中大量用到,後續會講解。

    get方法的設計是阻塞的,也就是說若是結果沒有返回時須要等待的,因此纔會有waitnode這個對象的產生,當多個線程都調用futuretask的get方法的時候,若是結果還沒產生,就都須要等待,這時候全部等待的線程就會造成一個鏈表,因此waitnode實際上就是線程的鏈表。

    static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }複製代碼

    再看get方法:若是任務沒有完成就調用awaitDone進入阻塞,若是完成了直接調用report返回結果

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }複製代碼
    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {//1. 若是等待過程當中,被中斷過了,那麼就移除本身
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)//2. cas更新鏈表節點
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);//3. locksupport原語讓線程進入休眠
                }
                else
                    LockSupport.park(this);
            }
        }複製代碼

    仍是比較好看懂,其中LockSupport是原語,讓線程進行休眠。若是線程在休眠中醒來了,有多是多種狀況,好比get的時間到了,也就是從3中醒來了,這樣的話下一次循環就會判斷時間到了,從而remove掉節點退出。還有可能等待的線程被interrupt了,這時候就會走到1的邏輯,經過判斷中斷標記將其remove掉。

    既然有了waitnode這個等待鏈表,那麼確定會有相應的喚醒機制,當執行完畢以後就會將waitnode鏈表上的線程一次喚醒,以下。

    private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }複製代碼

實際上java的future接口所提供的功能比較有限,好比listen機制就沒有,都須要異步任務發起者主動去查詢狀態和結果,並且沒有提供非阻塞的等待機制。可是咱們能夠本身靈活的實現,後續將參照netty中的future機制進行詳細講解。

相關文章
相關標籤/搜索