自頂向下深刻分析Netty(五)--Future

再次回顧這幅圖,在上一章中,咱們分析了Reactor的完整實現。因爲Java NIO事件驅動的模型,要求Netty的事件處理採用異步的方式,異步處理則須要表示異步操做的結果。Future正是用來表示異步操做結果的對象,Future的類簽名爲:javascript

public interface Future<V>; 

其中的泛型參數V即表示異步結果的類型。java

5.1 總述

也許你已經使用過JDK的Future對象,該接口的方法以下:數組

    // 取消異步操做
    boolean cancel(boolean mayInterruptIfRunning);
    // 異步操做是否取消
    boolean isCancelled();
    // 異步操做是否完成,正常終止、異常、取消都是完成
    boolean isDone();
    // 阻塞直到取得異步操做結果
    V get() throws InterruptedException, ExecutionException;
    // 同上,但最長阻塞時間爲timeout
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

 

咱們的第一印象會以爲這樣的設計並不壞,但仔細思考,便會發現問題:
(1).接口中只有isDone()方法判斷一個異步操做是否完成,可是對於完成的定義過於模糊,JDK文檔指出正常終止、拋出異常、用戶取消都會使isDone()方法返回真。在咱們的使用中,咱們極有多是對這三種狀況分別處理,而JDK這樣的設計不能知足咱們的需求。
(2).對於一個異步操做,咱們更關心的是這個異步操做觸發或者結束後可否再執行一系列動做。好比說,咱們瀏覽網頁時點擊一個按鈕後實現用戶登陸。在javascript中,處理代碼以下:緩存

    $("#login").click(function(){
        login();
    });

 

可見在這樣的狀況下,JDK中的Future便不能處理,因此,Netty擴展了JDK的Future接口,使其能解決上面的兩個問題。擴展的方法以下(相似方法只列出一個):多線程

    // 異步操做完成且正常終止
    boolean isSuccess();
    // 異步操做是否能夠取消
    boolean isCancellable();
    // 異步操做失敗的緣由
    Throwable cause();
    // 添加一個監聽者,異步操做完成時回調,類比javascript的回調函數
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 阻塞直到異步操做完成
    Future<V> await() throws InterruptedException;
    // 同上,但異步操做失敗時拋出異常
    Future<V> sync() throws InterruptedException;
    // 非阻塞地返回異步結果,若是還沒有完成返回null
    V getNow();

 

若是你對Future的狀態還有疑問,放上代碼註釋中的ascii圖打消你的疑慮:異步

 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+

 

可知,Future對象有兩種狀態還沒有完成和已完成,其中已完成又有三種狀態:成功、失敗、用戶取消。各狀態的狀態斷言請在此圖中查找。
仔細看完上面的圖並聯系Future接口中的方法,你是否是也會和我有相同的疑問:Future接口中的方法都是getter方法而沒有setter方法,也就是說這樣實現的Future子類的狀態是不可變的,若是咱們想要變化,那該怎麼辦呢?Netty提供的解決方法是:使用可寫的Future即Promise。Promise接口擴展的方法以下:ide

    // 標記異步操做結果爲成功,若是已被設置(無論成功仍是失敗)則拋出異常IllegalStateException
    Promise<V> setSuccess(V result);
    // 同上,只是結果已被設置時返回False
    boolean trySuccess(V result);

    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);

   // 設置結果爲不可取消,結果已被取消返回False
    boolean setUncancellable();

 

須要注意的是:Promise接口繼承自Future接口,它提供的setter方法與常見的setter方法大爲不一樣。Promise從Uncompleted-->Completed的狀態轉變有且只能有一次,也就是說setSuccess和setFailure方法最多隻會成功一個,此外,在setSuccess和setFailure方法中會通知註冊到其上的監聽者。爲了加深對Future和Promise的理解,咱們能夠將Future類比於定額發票,Promise類比於機打發票。當商戶拿到稅務局的發票時,若是是定額發票,則已經肯定好金額是100仍是50或其餘,商戶不再能更改;若是是機打發票,商戶至關於拿到了一個發票模板,須要多少金額按實際狀況填到模板指定處。顯然,不能兩次使用同一張機打發票打印,這會使發票失效,而Promise作的更好,它使第二次調用setter方法失敗。
至此,咱們從整體上了解了Future和Promise的原理。咱們再看一下類圖:
函數

Future類圖
Future類圖

 

類圖給咱們的第一印象是:繁雜。咱們抓住關鍵點:Future和Promise兩條分支,分而治之。咱們使用自頂向下的方法分析其實現細節,使用兩條線索:oop

    AbstractFuture<--CompleteFuture<--CompleteChannelFuture<--Succeeded/FailedChannelFuture
    
    DefaultPromise<--DefaultChannelPromise

 

5.2 Future

5.2.1 AbstractFuture

AbstractFuture主要實現Future的get()方法,取得Future關聯的異步操做結果:this

    @Override
    public V get() throws InterruptedException, ExecutionException {
        await();    // 阻塞直到異步操做完成

        Throwable cause = cause();
        if (cause == null) {
            return getNow();    // 成功則返回關聯結果
        }
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;    // 由用戶取消
        }
        throw new ExecutionException(cause);    // 失敗拋出異常
    }

 

其中的實現簡單明瞭,但關鍵調用方法的具體實現並無,咱們將在子類實現中分析。對應的加入超時時間的get(long timeout, TimeUnit unit)實現也相似,再也不列出。

5.2.2 CompleteFuture

Complete表示操做已完成,因此CompleteFuture表示一個異步操做已完成的結果,由此可推知:該類的實例在異步操做完成時建立,返回給用戶,用戶則使用addListener()方法定義一個異步操做。若是你熟悉javascript,將Listener類比於回調函數callback()可方便理解。
咱們首先看其中的字段和構造方法:

    // 執行器,執行Listener中定義的操做
    private final EventExecutor executor;
    
    // 這有一個構造方法,可知executor是必須的
    protected CompleteFuture(EventExecutor executor) {
        this.executor = executor;
    }

 

CompleteFuture類定義了一個EventExecutor,可視爲一個線程,用於執行Listener中的操做。咱們再看addListener()和removeListener()方法:

    public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        // 因爲這是一個已完成的Future,因此當即通知Listener執行
        DefaultPromise.notifyListener(executor(), this, listener);
        return this;
    }
    
    public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
        // 因爲已完成,Listener中的操做已完成,沒有須要刪除的Listener
        return this;
    }

 

其中的實現也很簡單,咱們看一下GenericFutureListener接口,其中只定義了一個方法:

    // 異步操做完成是調用
    void operationComplete(F future) throws Exception;
關於Listener咱們再關注一下ChannelFutureListener,它並無擴展GenericFutureListener接口,因此相似於一個標記接口。咱們看其中實現的三個通用ChannelFutureListener:

    ChannelFutureListener CLOSE = (future) --> {
        future.channel().close();   //操做完成時關閉Channel
    };
    
    ChannelFutureListener CLOSE_ON_FAILURE = (future) --> {
        if (!future.isSuccess()) {
            future.channel().close();   // 操做失敗時關閉Channel
        }
    };
    
    ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = (future) --> {
        if (!future.isSuccess()) {
            // 操做失敗時觸發一個ExceptionCaught事件
            future.channel().pipeline().fireExceptionCaught(future.cause());
        }
    };

 

這三個Listener對象定義了對Channel處理時經常使用的操做,若是符合需求,能夠直接使用。
因爲CompleteFuture表示一個已完成的異步操做,因此可推知sync()和await()方法都將當即返回。此外,可推知線程的狀態以下,再也不列出代碼:

    isDone() = true; isCancelled() = false; 

 

5.2.3 CompleteChannelFuture

CompleteChannelFuture的類簽名以下:

    abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture

 

ChannelFuture是否是以爲很親切?你確定已經使用過ChannelFuture。ChannelFuture接口相比於Future只擴展了一個方法channel()用於取得關聯的Channel對象。CompleteChannelFuture還繼承了CompleteFuture<Void>,尖括號中的泛型表示Future關聯的結果,此結果爲Void,意味着CompleteChannelFuture不關心這個特定結果即get()相關方法返回null。也就是說,咱們能夠將CompleteChannelFuture純粹的視爲一種回調函數機制。
CompleteChannelFuture的字段只有一個:

    private final Channel channel; // 關聯的Channel對象

 

CompleteChannelFuture的大部分方法實現中,只是將方法返回的Future覆蓋爲ChannelFuture對象(ChannelFuture接口的要求),代碼不在列出。咱們看一下executor()方法:

    @Override
    protected EventExecutor executor() {
        EventExecutor e = super.executor(); // 構造方法指定
        if (e == null) {
            return channel().eventLoop();   // 構造方法未指定使用channel註冊到的eventLoop
        } else {
            return e;
        }
    }

 

5.2.4 Succeeded/FailedChannelFuture

Succeeded/FailedChannelFuture爲特定的兩個異步操做結果,回憶總述中關於Future狀態的講解,成功意味着

    Succeeded: isSuccess() == true, cause() == null;
    Failed:    isSuccess() == false, cause() == non-null     

 

代碼中的實現也很簡單,再也不列出。須要注意的是,其中的構造方法不建議用戶調用,通常使用Channel對象的方法newSucceededFuture()和newFailedFuture(Throwable)代替。

5.3 Promise

5.3.1 DefaultPromise

咱們首先看其中的static字段:

    // 能夠嵌套的Listener的最大層數,可見最大值爲8
    private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
            SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
    // result字段由使用RESULT_UPDATER更新
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER;
    // 此處的Signal是Netty定義的類,繼承自Error,異步操做成功且結果爲null時設置爲改值
    private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
    // 異步操做不可取消
    private static final Signal UNCANCELLABLE = Signal.valueOf(...);
    // 異步操做失敗時保存異常緣由
    private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(...);

 

嵌套的Listener,是指在listener的operationComplete方法中,能夠再次使用future.addListener()繼續添加listener,Netty限制的最大層數是8,用戶可以使用系統變量io.netty.defaultPromise.maxListenerStackDepth設置。
再看其中的私有字段:

    // 異步操做結果
    private volatile Object result;
    // 執行listener操做的執行器
    private final EventExecutor executor;
    // 監聽者
    private Object listeners;
    // 阻塞等待該結果的線程數
    private short waiters;
    // 通知正在進行標識
    private boolean notifyingListeners;

 

也許你已經注意到,listeners是一個Object類型。這彷佛不合常理,通常狀況下咱們會使用一個集合或者一個數組。Netty之因此這樣設計,是由於大多數狀況下listener只有一個,用集合和數組都會形成浪費。當只有一個listener時,該字段爲一個GenericFutureListener對象;當多餘一個listener時,該字段爲DefaultFutureListeners,能夠儲存多個listener。明白了這些,咱們分析關鍵方法addListener():

    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        synchronized (this) {
            addListener0(listener); // 保證多線程狀況下只有一個線程執行添加操做
        }

        if (isDone()) {
            notifyListeners();  // 異步操做已經完成通知監聽者
        }
        return this;
    }
    
    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            listeners = listener;   // 只有一個
        } else if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).add(listener); // 大於兩個
        } else {
            // 從一個擴展爲兩個
            listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);   
        }
    }

 

從代碼中能夠看出,在添加Listener時,若是異步操做已經完成,則會notifyListeners():

    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {   //執行線程爲指定線程
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth(); // 嵌套層數
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                // 執行前增長嵌套層數
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);   
                try {
                    notifyListenersNow();
                } finally {
                    // 執行完畢,不管如何都要回滾嵌套層數
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }
        // 外部線程則提交任務給執行線程
        safeExecute(executor, () -> { notifyListenersNow(); });
    }
    
    private static void safeExecute(EventExecutor executor, Runnable task) {
        try {
            executor.execute(task);
        } catch (Throwable t) {
            rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
        }
    }

 

因此,外部線程不能執行監聽者Listener中定義的操做,只能提交任務到指定Executor,其中的操做最終由指定Executor執行。咱們再看notifyListenersNow()方法:

    private void notifyListenersNow() {
        Object listeners;
        // 此時外部線程可能會執行添加Listener操做,因此須要同步
        synchronized (this) { 
            if (notifyingListeners || this.listeners == null) {
                // 正在通知或已沒有監聽者(外部線程刪除)直接返回
                return; 
            }
            notifyingListeners = true;  
            listeners = this.listeners;
            this.listeners = null;
        }
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) { // 通知單個
                notifyListeners0((DefaultFutureListeners) listeners);
            } else { // 通知多個(遍歷集合調用單個)
                notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
            }
            synchronized (this) {
                // 執行完畢且外部線程沒有再添加監聽者
                if (this.listeners == null) {
                    notifyingListeners = false; 
                    return; 
                }
                // 外部線程添加了監聽者繼續執行
                listeners = this.listeners; 
                this.listeners = null;
            }
        }
    }
    
    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }

 

到此爲止,咱們分析完了Promise最重要的addListener()和notifyListener()方法。在源碼中還有static的notifyListener()方法,這些方法是CompleteFuture使用的,對於CompleteFuture,添加監聽者的操做不須要緩存,直接執行Listener中的方法便可,執行線程爲調用線程,相關代碼可回顧CompleteFuture。addListener()相對的removeListener()方法實現簡單,咱們再也不分析。
回憶result字段,修飾符有volatile,因此使用RESULT_UPDATER更新,保證更新操做爲原子操做。Promise不攜帶特定的結果(即攜帶Void)時,成功時設置爲靜態字段的Signal對象SUCCESS;若是攜帶泛型參數結果,則設置爲泛型一致的結果。對於Promise,設置成功、設置失敗、取消操做,三個操做至多隻能調用一個且同一個方法至多生效一次,再次調用會拋出異常(set)或返回失敗(try)。這些設置方法原理相同,咱們以setSuccess()爲例分析:

    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();  // 能夠設置結果說明異步操做已完成,故通知監聽者
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
    
    private boolean setSuccess0(V result) {
        // 爲空設置爲Signal對象Success
        return setValue0(result == null ? SUCCESS : result);
    }
    
    private boolean setValue0(Object objResult) {
        // 只有結果爲null或者UNCANCELLABLE時纔可設置且只能夠設置一次
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            checkNotifyWaiters();   // 通知等待的線程
            return true;
        }
        return false;
    }

 

checkNotifyWaiters()方法喚醒調用await()和sync()方法等待該異步操做結果的線程,代碼以下:

    private synchronized void checkNotifyWaiters() {
        // 確實有等待的線程才notifyAll
        if (waiters > 0) {  
            notifyAll();    // JDK方法
        }
    }

 

有了喚醒操做,那麼sync()和await()的實現是怎麼樣的呢?咱們首先看sync()的代碼:

    public Promise<V> sync() throws InterruptedException {
        await();
        rethrowIfFailed();  // 異步操做失敗拋出異常
        return this;
    }

 

可見,sync()和await()很相似,區別只是sync()調用,若是異步操做失敗,則會拋出異常。咱們接着看await()的實現:

    public Promise<V> await() throws InterruptedException {
        // 異步操做已經完成,直接返回
        if (isDone()) {
            return this;    
        }
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        // 死鎖檢測
        checkDeadLock();
        // 同步使修改waiters的線程只有一個
        synchronized (this) {
            while (!isDone()) { // 等待直到異步操做完成
                incWaiters();   // ++waiters;
                try {
                    wait(); // JDK方法
                } finally {
                    decWaiters(); // --waiters
                }
            }
        }
        return this;
    }

 

其中的實現簡單明瞭,其餘await()方法也相似,再也不分析。咱們注意其中的checkDeadLock()方法用來進行死鎖檢測:

    protected void checkDeadLock() {
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(toString());
        }
    }

 

也就是說,不能在同一個線程中調用await()相關的方法。爲了更好的理解這句話,咱們使用代碼註釋中的例子來解釋。Handler中的channelRead()方法是由Channel註冊到的eventLoop執行的,其中的Future的Executor也是這個eventLoop,因此不能在channelRead()方法中調用await這一類(包括sync)方法

    // 錯誤的例子
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ChannelFuture future = ctx.channel().close();
        future.awaitUninterruptibly();
        // ...
    }

    // 正確的作法
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ChannelFuture future = ctx.channel().close();
        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                // ... 使用異步操做
            }
        });
    }

 

到了這裏,咱們已經分析完Future和Promise的主要實現。剩下的DefaultChannelPromise、VoidChannelPromise實現都很簡單,咱們再也不分析。ProgressivePromise表示異步的進度結果,也再也不進行分析。

做者:Hypercube 連接:https://www.jianshu.com/p/a06da3256f0c 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。
相關文章
相關標籤/搜索