Netty 源碼分析之 四 Promise 與 Future: 雙子星的祕密

永順大牛寫的系列教程《源碼之下無祕密 ── 作最好的 Netty 源碼分析教程》是目前我讀過最好的netty源碼分析文章。但不知道什麼緣由,做者在寫到第三章的時候停更了。所以,我想嘗試憑着我的的理解,續寫後邊幾個章節。java

寫在最前

永順前輩已經寫完章節有以下:git

續寫章節:github

本文使用的netty版本爲4.1.33編程

Future<V>和Promise<V>的關係

Netty內部的io.netty.util.concurrent.Future<V> 繼承自java.util.concurrent.Future<V>,而Promise<V>是前者的一個特殊實現。
Future和Promise類圖.pngbootstrap

Java原生Future<V>

Java併發編程包下提供了Future<V>接口。Future在異步編程中表示該異步操做的結果,經過Future<V>的內部方法能夠實現狀態檢查、取消執行、獲取執行結果等操做。內部的方法以下:segmentfault

// 嘗試取消執行
    boolean cancel(boolean mayInterruptIfRunning);
    // 是否已經被取消執行
    boolean isCancelled();
    // 是否已經執行完畢
    boolean isDone();
    // 阻塞獲取執行結果
    V get() throws InterruptedException, ExecutionException;
    // 阻塞獲取執行結果或超時後返回
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

Netty對Future<V>的擴展

原生的Future<V>功能比較有限,Netty擴展了Future<V>並增長了如下方法:數組

  • 增長了更加豐富的狀態判斷方法
// 判斷是否執行成功
    boolean isSuccess();
    // 判斷是否能夠取消執行
    boolean isCancellable();
  • 支持獲取致使I/O操做異常
Throwable cause();
  • 增長了監聽回調有關方法,支持future完成後執行用戶指定的回調方法
// 增長回調方法
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 增長多個回調方法
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 刪除回調方法
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 刪除多個回調方法
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  • 增長了更豐富的阻塞等待結果返回的兩類方法。其中一類是sync方法,阻塞等待結果且若是執行失敗後向外拋出致使失敗的異常;另一類是await方法,僅阻塞等待結果返回,不向外拋出異常。
// 阻塞等待,且若是失敗拋出異常
    Future<V> sync() throws InterruptedException;
    // 同上,區別是不可中斷阻塞等待過程
    Future<V> syncUninterruptibly();

    // 阻塞等待
    Future<V> await() throws InterruptedException;
    // 同上,區別是不可中斷阻塞等待過程
    Future<V> awaitUninterruptibly();

Promise<V>

Promise<V>接口繼續繼承了Future<V>,並增長若干個設置狀態並回調的方法:promise

// 設置成功狀態並回調
    Promise<V> setSuccess(V result);
    boolean trySuccess(V result);
    // 設置失敗狀態並回調
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    // 設置爲不可取消狀態
    boolean setUncancellable();

可見,Promise<V>做爲一個特殊的Future<V>,只是增長了一些狀態設置方法。因此它經常使用於傳入I/O業務代碼中,用於I/O結束後設置成功(或失敗)狀態,並回調方法。緩存

經過Promise設置I/O執行結果

以客戶端鏈接的註冊過程爲例,調用鏈路以下:服務器

io.netty.bootstrap.Bootstrap.connect()
--> io.netty.bootstrap.Bootstrap.doResolveAndConnect()
---->io.netty.bootstrap.AbstractBootstrap.initAndRegister()
------>io.netty.channel.MultithreadEventLoopGroup.register()
-------->io.netty.channel.SingleThreadEventLoop.register()

一直跟蹤到SingleThreadEventLoop中,會看到這段代碼:

@Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

此處新建了一個DefaultChannelPromise,構造函數傳入了當前的channel以及當前所在的線程this。從第一節的類圖咱們知道,DefaultChannelPromise同時實現了Future和Promise,具備上述提到的全部方法。

而後繼續將該promise傳遞進另一個register方法中:

@Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

在該register方法中,繼續將promise傳遞到Unsafe的register方法中,而當即返回了以ChannelFuture的形式返回了該promise。顯然這裏是一個異步回調處理:上層的業務能夠拿到返回的ChannelFuture阻塞等待結果或者設置回調方法,而繼續往下傳的Promise能夠用於設置執行狀態而且回調設置的方法。

咱們繼續往下debug能夠看到:

// io.netty.channel.AbstractChannel.AbstractUnsafe.java
 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                // 若是已經註冊過,則置爲失敗
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                // 若是線程類型不兼容,則置爲失敗
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    // 出現異常狀況置promise爲失敗
                    safeSetFailure(promise, t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // 註冊以前,先將promise置爲不可取消轉態
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();
                // promise置爲成功
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
             
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                // 出現異常狀況置promise爲失敗
                safeSetFailure(promise, t);
            }
        }

可見,底層的I/O操做成功與否均可以經過Promise設置狀態,並使得外層的ChannelFuture能夠感知獲得I/O操做的結果。

經過ChannelFuture獲取I/O執行結果

咱們再來看看被返回的ChannelFuture的用途:

// io.netty.bootstrap.AbstractBootstrap.java

    final ChannelFuture initAndRegister() {
        //...
        ChannelFuture regFuture = config().group().register(channel);
        // 若是異常不爲null,則意味着底層的I/O已經失敗,而且promise設置了失敗異常
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

這裏經過檢查失敗異常棧是否爲空,能夠提早檢查到I/O是否失敗。繼續回溯,還能夠看到:

// io.netty.bootstrap.AbstractBootstrap.java

 private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // 若是註冊已經成功
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
       // 若是註冊還沒有完成
       // ...
    }
}

此處,經過ChannelFuture#isDone()方法能夠知道底層的註冊是否完成,若是完成,則繼續進行bind操做。

可是由於註冊是個異步操做,若是此時註冊可能還沒完成,那就會進入以下邏輯:

// io.netty.bootstrap.AbstractBootstrap.java

//...
else {
    // Registration future is almost always fulfilled already, but just in case it's not.
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if (cause != null) {
                // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                // IllegalStateException once we try to access the EventLoop of the Channel.
                promise.setFailure(cause);
            } else {
                // Registration was successful, so set the correct executor to use.
                // See https://github.com/netty/netty/issues/2586
                promise.registered();

                doBind0(regFuture, channel, localAddress, promise);
            }
        }
    });
    return promise;
}

這裏新建了一個新的PendingRegistrationPromise,併爲原來的ChannelFuture對象添加了一個回調方法,並在回調中更改PendingRegistrationPromise的狀態,並且PendingRegistrationPromise會繼續被傳遞到上層。當底層的Promise狀態被設置而且回調,就會進入該回調方法。從而將I/O狀態繼續向外傳遞。

DefaultChannelPromise的結果傳遞實現原理

咱們已經瞭解清楚了Promise和Future的異步模型。再來看看底層是如何實現的。以最經常使用的DefaultChannelPromise爲例,內部很是簡單,咱們主要看它的父類DefaultPromise:

// result字段的原子更新器
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    // 緩存執行結果的字段
    private volatile Object result;
    // promise所在的線程
    private final EventExecutor executor;
    // 一個或者多個回調方法
    private Object listeners;
    // 阻塞線程數量計數器  
    private short waiters;

設置狀態

以設置成功狀態爲例(setSuccess):

@Override
    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            // 調用回調方法
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

    private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setValue0(Object objResult) {
        // 原子修改result字段爲objResult
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            checkNotifyWaiters();
            return true;
        }
        return false;
    }

    private synchronized void checkNotifyWaiters() {
        if (waiters > 0) {
            // 若是有其餘線程在等待該promise的結果,則喚醒他們
            notifyAll();
        }
    }

設置promise的狀態其實就是原子地修改result字段爲傳入的執行結果。值得注意的是,result字段帶有volatile關鍵字來確保多線程之間的可見性。另外,設置完畢狀態後,會嘗試喚醒全部在阻塞等待該promise返回結果的線程。

其餘設置狀態方法再也不贅言,基本上大同小異。

阻塞線程以等待執行結果

上文提到其餘線程會阻塞等待該promise返回結果,具體實現以sync方法爲例:

@Override
    public Promise<V> sync() throws InterruptedException {
        // 阻塞等待
        await();
        // 若是有異常則拋出
        rethrowIfFailed();
        return this;
    }

    @Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            // 若是已經完成,直接返回
            return this;
        }
        // 能夠被中斷
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        //檢查死循環
        checkDeadLock();

        synchronized (this) {
            while (!isDone()) {
                // 遞增計數器(用於記錄有多少個線程在等待該promise返回結果)
                incWaiters();
                try {
                    // 阻塞等待結果
                    wait();
                } finally {
                    // 遞減計數器
                    decWaiters();
                }
            }
        }
        return this;
    }

全部調用sync方法的線程,都會被阻塞,直到promise被設置爲成功或者失敗。這也解釋了爲什麼Netty客戶端或者服務端啓動的時候通常都會調用sync方法,本質上都是阻塞當前線程而異步地等待I/O結果返回,以下:

Bootstrap bootstrap = new Bootstrap();
    ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10))
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 管道中添加基於換行符分割字符串的解析器
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    // 管道中添加字符串編碼解碼器
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                    // 管道中添加服務端處理邏輯
                    ch.pipeline().addLast(new MyClientEchoHandler());
                }
            }).connect("127.0.0.1", 9898).sync();

    future.channel().closeFuture().sync();

回調機制

@Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            // 添加回調方法
            addListener0(listener);
        }

        if (isDone()) {
            // 若是I/O操做已經結束,直接觸發回調
            notifyListeners();
        }

        return this;
    }

    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            // 只有一個回調方法直接賦值
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
            // 將回調方法添加到DefaultFutureListeners內部維護的listeners數組中
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            // 若是有多個回調方法,新建一個DefaultFutureListeners以保存更多的回調方法
            listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
        }
    }

從上邊能夠看到,添加回調方法完成以後,會當即檢查promise是否已經完成;若是promise已經完成,則立刻調用回調方法。

總結

Netty的Promise和Future機制是基於Java併發包下的Future<V>開發的。其中Future支持阻塞等待、添加回調方法、判斷執行狀態等,而Promise主要是支持狀態設置相關方法。當底層I/O操做經過Promise改變執行狀態,咱們能夠經過同步等待的Future當即獲得結果。

所以,就像永順大牛標題所言,在Netty的異步模型裏,Promise和Future就像是雙子星通常緊密相連。但我以爲這二者更像是量子糾纏裏的兩個電子,由於改變其中一個方的狀態,另一方可以立刻感知。

至此,Promise和Future的核心原理已經分析完畢。

相關文章
相關標籤/搜索