Future Promise 模式(netty源碼9)

netty源碼死磕9  html

Future Promise 模式詳解java

1. Future/Promise 模式

1.1. ChannelFuture的由來


因爲Netty中的Handler 處理都是異步IO操做,結果是未知的。編程

Netty繼承和擴展了JDK Future的API,定義了自身的Future系列類型,實現異步操做結果的獲取和監控。promise


其中,最爲重要的是ChannelFuture 。網絡

代碼以下:異步

public interface ChannelFuture extends Future<Void> {

    //...

    Channel channel();

    @Override

    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

      @Override

    ChannelFuture sync() throws InterruptedException;

    //...

}


之因此命名爲ChannelFuture,表示跟Channel的操做有關。socket

ChannelFuture用於獲取Channel相關的操做結果,添加事件監聽器,取消IO操做,同步等待。ide


1.2. 來自Netty的官方建議


java.util.concurrent.Future是Java提供的接口,提供了對異步操做的簡單幹預。oop

Future接口定義了isDone()、isCancellable(),用來判斷異步執行狀態。Future接口的get方法,能夠用來獲取結果。get方法首先會判斷任務是否執行完成,若是完成就返回結果,不然阻塞線程,直到任務完成。學習


Netty官方文檔直接說明——Netty的網絡操做都是異步的,Netty源碼上大量使用了Future/Promise模式。


若是用戶操做調用了sync或者await方法,會在對應的future對象上阻塞用戶線程,例如future.channel().closeFuture().sync()。


Netty 的Future 接口,在繼承了java.util.concurrent.Future的基礎上,增長了一系列監聽器方法,好比addListener()、removeListener() 等等。Netty強烈建議,經過添加監聽器的方式獲取IO結果,而不是經過JDK Future的同步等待的方式去獲取IO結果。


1.3. Netty 的 Future 接口


Netty擴展了Java的Future,增長了監聽器Listener接口,經過監聽器可讓異步執行更加有效率,不須要經過get來等待異步執行結束,而是經過監聽器回調來精確地控制異步執行結束的時間點。

這一點,正好是Netty在Future模式的最主要的改進。

public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();

    boolean isCancellable();

    Throwable cause();

    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> sync() throws InterruptedException;

    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    boolean awaitUninterruptibly(long timeoutMillis);

    V getNow();

    boolean cancel(boolean mayInterruptIfRunning);

}

1.4. ChannelFuture使用的實例

Netty的出站和入站操做,都是異步的。

以最爲經典的NIO出站操做——write出站爲例,說一下ChannelFuture的使用。

代碼以下:

ChannelFuture future = ctx.channel().write(msg);
future.addListener(
        new ChannelFutureListener()
        {
            @Override
            public void operationComplete(ChannelFuture future)
            {
                // write操做完成後的回調代碼
            }
        });


在write操做調用後,Netty並無完成對Java NIO底層鏈接的寫入操做,出站操做是異步執行的。

若是須要獲取IO結果,可使用回調的方式。


使用ChannelFuture的異步完成後的回調,須要搭配使用另外的一個接口ChannelFutureListener ,他從父接口哪裏繼承了一個被回調到的operationComplete操做完成的方法。

ChannelFutureListener 的父親接口是GenericFutureListener 接口。

定義以下:

public interface GenericFutureListener
                 <F extends Future<?>> extends EventListener
{
      void operationComplete(F future) throws Exception;
}


異步操做完成後的回調代碼,放在operationComplete方法中的實現中,就能夠了。


1.5. Netty的 Promise接口


Netty的Future,只是增長了監聽器。整個異步的狀態,是不能進行設置和修改的。

換句話說,Future是隻讀的,是不能夠寫的。

因而,Netty的 Promise接口擴展了Netty的Future接口,它表示一種可寫的Future,就是能夠設置異步執行的結果。

部分源碼以下:

public interface Promise<V> extends Future<V> {

    Promise<V> setSuccess(V result);

    Promise<V> setFailure(Throwable cause);

    boolean setUncancellable();

   //....

}
在IO操做過程,若是順利完成、或者發生異常,均可以設置Promise的結果,而且通知Promise的Listener們。


而ChannelPromise接口,則繼承擴展了Promise和ChannelFuture。因此,ChannelPromise既綁定了Channel,又具有了監聽器的功能,還能夠設置IO操做的結果,是Netty實際編程使用的最多的接口。


在AbstratChannel的代碼中,至關多的IO操做,都會返回ChannelPromise類型實例做爲調用的返回值。 經過這個返回值,客戶程序能夠用於讀取IO操做的結果,執行IO操做真正完成後的回調。


1.6. ChannelPromise的監控流程

在AbstractChannel中,定義了幾個對Channel的異步狀態進行監控的Promise和Future成員,用於監控Channel的鏈接是否成功,鏈接是否關閉。

源碼以下:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

        //鏈接成功的監控
        private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);

        //鏈接關閉的監控
        private final CloseFuture closeFuture = new CloseFuture(this);

//...
}
對於每一個Channel對象,都會有惟一的一個CloseFuture 成員,用來表示關閉的異步干預。若是要監控Channel的關閉,或者同步等待Channel關閉。

通常狀況下,在應用程序中使用以下的代碼:

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();


通常來講,編寫以上代碼的都是在Main線程中用來啓動ServerBootStrap的,因此Main線程會被阻塞,保證服務端Channel的正常運行。

上面的代碼中,channel.closeFuture()不作任何操做,只是簡單的返回channel對象中的closeFuture對象。而CloseFuture的sync方法,會將當前線程阻塞在CloseFuture上。


那麼,f.channel().closeFuture().sync() 實際是如何工做的呢?


1.7. CloseFuture的sync 同步方法


CloseFuture繼承了DefaultPromise的sync同步方法。

DefaultPromise的代碼以下:

public class DefaultPromise<V> extends AbstractFuture<V>
                                                 implements Promise<V>
{

     private volatile Object result;
      //...
    @Override
    public Promise<V>  sync() throws InterruptedException {

       await();
        //...
    }

   @Override
    public Promise<V> await() throws InterruptedException {

        //...
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
               try {
                    wait();  //阻塞了,死等
                } finally {
                   decWaiters();
                }
            }
        }
        return this;
    }
//...

}


從源碼能夠看出,sync方法,調用了await方法。

在await方法中,CloseFuture 使用java 基礎的synchronized 方法進行線程同步;而且,使用CloseFuture.wait / notify 這組來自Object根類中的古老方法進行線程之間的等待和喚醒。


在await方法,不斷的自旋,判斷當前的 CloseFuture 實例的結果是否已經完成,若是沒有完成 !isDone() ,就不斷的等待。一直到 isDone() 的值爲true。

isDone() 的源碼以下:

@Override
public boolean isDone() {
        return isDone0(result);
}

private static boolean isDone0(Object result) {
        return result != null && result != UNCANCELLABLE;
}


CloseFuture的 isDone() 的條件是否可以知足,和Channel的close 關閉鏈接的出站操做有關。


下一步,咱們來看 isDone() 的條件,如何纔可以知足?



1.8. close 出站處理流程


在Netty中,close 關閉鏈接的操做,屬於全部的出站操做的一種。關於Netty出站處理的流程,在前面的文字中,已經很是詳細的介紹了。這裏再也不贅述,只是簡單的列出一個流程圖。

close 關閉鏈接的出站操做,其流程以下圖所示:

wps6221.tmp


一溜兒下來,最終會落地到unsafe.doClose 方法。

看看unsafe.doClose,是如何與CloseFuture的 isDone() 的條件進行關聯的。


1.9. unsafe.doClose


unsafe.doClose 方法中,設置了CloseFuture 的result值。

unsafe.doClose 源碼以下:


protected abstract class AbstractUnsafe implements Unsafe

{

   private void close(final ChannelPromise promise,…) {
        //…
        try {
            // Close the channel
            doClose0(promise);
        } finally {
            // Fail all the queued messages.
            outboundBuffer.failFlushed(cause, notify);
            outboundBuffer.close(closeCause);
        }
         //……
    }
    }
   private void doClose0(ChannelPromise promise) {
    try {
        doClose();
        closeFuture.setClosed();
        safeSetSuccess(promise);
    } catch (Throwable t) {
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
  }

  //……

}



1.10. closeFuture.setClosed()


在closeFuture.setClosed() 設置關閉的結果的過程當中,主要完成如下三個工做:

1  設置result的值

2  notifyAll,喚醒在本Promise上等待的線程

3  回調listener

closeFuture.setClosed()的主要源碼以下:

boolean setClosed() {
     return super.trySuccess();
}

@Override

//這個定義在父類中
public boolean trySuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
         return true;
    }
    return false;
}

private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}


上面的 notifyListeners()調用,就是用來喚醒了等待在closeFuture 實例對象上的等待線程。

到了這裏,終於鬆了一口氣了。

以前經過 f.channel().closeFuture().sync()  同步操做,阻塞在哪兒的Main線程,終於經過channel.close() 方法,給喚醒了。


1.11. 警戒死鎖:Reactor線程不能sync


在上面的源碼中,最終觸發future對象的notify動做的線程,都是eventLoop線程(Reactor線程)。

通常狀況下,Channel的出站和入站操做,也都是在eventLoop線程的輪詢任務中完成的。


例如由於不管是用戶直接關閉channel,或者eventLoop的輪詢狀態關閉channel,都會在eventLoop的線程內完成notify動做。notify那些經過sync操做,正在等待CloseFuture的哪些阻塞線程。


因此不要在Reactor線程內調用future對象的sync或者await方法。若是在Reactor線程進行sync或者await,會有可能引發死鎖。


爲何呢?

在Reactor線程進行sync時,會進入等待狀態,等待Future(DefaultPromise)的 isDone 的條件知足。經過前面的例子,咱們已經看到了,而Future的isDone的條件,又須要Reactor線程的出站或者入站操做來知足。這是,Reactor線程既然已經處於等待狀態,怎麼可能再進行其餘的出站或者入站操做呢?至關於本身等本身,這就是典型的死鎖。


在實際開發中,因爲應用程序代碼都是編寫在自定義的channelHandler處理器中,而channelHandler是在eventLoop線程(Reactor線程)內執行的。因此,不能在channelHandler中調用Future(DefaultPromise)的sync或者await兩個同步方法。


正確的作法是:經過給Future(DefaultPromise) 增長listeners監聽器 的方式,來干預異步操做的過程,處理異步操做的結果

這樣,能夠避免使用Future帶來的死鎖。



無編程不創客,無案例不學習。瘋狂創客圈,一大波高手正在交流、學習中!

瘋狂創客圈 Netty 死磕系列 10多篇深度文章博客園 總入口】  QQ羣:104131248

相關文章
相關標籤/搜索