Netty 源碼解析(三): Netty 的 Future 和 Promise

今天是猿燈塔「365篇原創計劃」第三篇。 java

接下來的時間燈塔君持續更新Netty系列一共九篇編程

Netty 源碼解析(一): 開始 promise

Netty 源碼解析(二): Netty 的 Channel 異步

當前:Netty 源碼解析(三): Netty 的 Future 和 Promiseide

Netty 源碼解析(四): Netty 的 ChannelPipeline異步編程

Netty 源碼解析(五): Netty 的線程池分析函數

Netty 源碼解析(六): Channel 的 register 操做oop

Netty 源碼解析(七): NioEventLoop 工做流程this

Netty 源碼解析(八): 回到 Channel 的 register 操做spa

Netty 源碼解析(九): connect 過程和 bind 過程分析

今天呢!燈塔君跟你們講: 

Netty 的 Future 和 Promise

Netty 中的異步編程: Future 和 Promise

Netty 中很是多的異步調用,因此在介紹更多 NIO 相關的內容以前,咱們來看看它的異步接口是怎麼使用的。

前面咱們在介紹 Echo 例子的時候,已經用過了 ChannelFuture 這個接口了:

爭取在看完本節後,讀者能搞清楚上面的這幾行劃線部分是怎麼走的。

關於 Future 接口,我想你們應該都很熟悉,用得最多的就是在使用 Java 的線程池 ThreadPoolExecutor 的時候了。在 submit 一個任務到線程池中的時候,返回的就是一個 Future 實例,經過它來獲取提交的任務的執行狀態和最終的執行結果,咱們最經常使用它的 isDone()get() 方法。

下面是 JDK  中的 Future 接口 java.util.concurrent.Future:

public interface Future<V> {
    // 取消該任務
    boolean cancel(boolean mayInterruptIfRunning);
    // 任務是否已取消
    boolean isCancelled();
    // 任務是否已完成
    boolean isDone();
    // 阻塞獲取任務執行結果
    V get() throws InterruptedException, ExecutionException;
    // 帶超時參數的獲取任務執行結果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Netty 中的 Future 接口(同名)繼承了 JDK 中的 Future 接口,而後添加了一些方法:

// io.netty.util.concurrent.Future

publicinterface Future<V\> extends java.util.concurrent.Future<V\> {  
// 是否成功  
boolean isSuccess();  
// 是否可取消  
boolean isCancellable();  
// 若是任務執行失敗,這個方法返回異常信息  
Throwable cause();  
// 添加 Listener 來進行回調  
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);  
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);  
  
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);  
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);  
  
// 阻塞等待任務結束,若是任務失敗,將「致使失敗的異常」從新拋出來  
Future<V> sync() throws InterruptedException;  
// 不響應中斷的 sync(),這個你們應該都很熟了  
Future<V> syncUninterruptibly();  
  
// 阻塞等待任務結束,和 sync() 功能是同樣的,不過若是任務失敗,它不會拋出執行過程當中的異常  
Future<V> await() throws InterruptedException;  
Future<V> awaitUninterruptibly();  
boolean await(long timeout, TimeUnit unit) throws InterruptedException;  
boolean await(long timeoutMillis) throws InterruptedException;  
boolean awaitUninterruptibly(long timeout, TimeUnit unit);  
boolean awaitUninterruptibly(long timeoutMillis);  
  
// 獲取執行結果,不阻塞。咱們都知道 java.util.concurrent.Future 中的 get() 是阻塞的  
V getNow();  
  
// 取消任務執行,若是取消成功,任務會由於 CancellationException 異常而致使失敗  
//      也就是 isSuccess()==false,同時上面的 cause() 方法返回 CancellationException 的實例。  
// mayInterruptIfRunning 說的是:是否對正在執行該任務的線程進行中斷(這樣才能中止該任務的執行),  
//       彷佛 Netty 中 Future 接口的各個實現類,都沒有使用這個參數  
@Override  
boolean cancel(boolean mayInterruptIfRunning);  
}

看完上面的 Netty 的 Future 接口,咱們能夠發現,它加了 sync() 和 await() 用於阻塞等待,還加了 Listeners,只要任務結束去回調 Listener 們就能夠了,那麼咱們就不必定要主動調用 isDone() 來獲取狀態,或經過 get() 阻塞方法來獲取值。

因此它其實有兩種使用範式

順便說下 sync() 和 await() 的區別:sync() 內部會先調用 await() 方法,等 await() 方法返回後,會檢查下這個任務是否失敗,若是失敗,從新將致使失敗的異常拋出來。也就是說,若是使用 await(),任務拋出異常後,await() 方法會返回,可是不會拋出異常,而 sync() 方法返回的同時會拋出異常。

咱們也能夠看到,Future 接口沒有和 IO 操做關聯在一塊兒,仍是比較_純淨_的接口。

接下來,咱們來看 Future 接口的子接口 ChannelFuture,這個接口用得最多,它將和 IO 操做中的 Channel 關聯在一塊兒了,用於異步處理 Channel 中的事件。

publicinterface ChannelFuture extends Future<Void\> {  
  
// ChannelFuture 關聯的 Channel  
Channel channel();  
  
// 覆寫如下幾個方法,使得它們返回值爲 ChannelFuture 類型   
@Override  
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);  
@Override  
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);  
@Override  
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);  
@Override  
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);  
  
@Override  
ChannelFuture sync() throws InterruptedException;  
@Override  
ChannelFuture syncUninterruptibly();  
  
@Override  
ChannelFuture await() throws InterruptedException;  
@Override  
ChannelFuture awaitUninterruptibly();  
  
// 用來標記該 future 是 void 的,  
// 這樣就不容許使用 addListener(...), sync(), await() 以及它們的幾個重載方法  
boolean isVoid();  
}

咱們看到,ChannelFuture 接口相對於 Future 接口,除了將 channel 關聯進來沒有增長什麼東西。還有個 isVoid() 方法算是不那麼重要的存在吧。其餘幾個都是方法覆寫,爲了讓返回值類型變爲 ChannelFuture,而不是原來的 Future。

這裏有點跳,咱們來介紹下 Promise 接口,它和 ChannelFuture 接口無關,而是和前面的 Future 接口相關,Promise 這個接口很是重要。

Promise 接口和 ChannelFuture 同樣,也繼承了 Netty 的 Future 接口,而後加了一些 Promise 的內容:

publicinterface Promise<V\> extends Future<V\> {  
// 標記該 future 成功及設置其執行結果,而且會通知全部的 listeners。  
// 若是該操做失敗,將拋出異常(失敗指的是該 future 已經有告終果了,成功的結果,或者失敗的結果)  
Promise<V> setSuccess(V result);  
  
// 和 setSuccess 方法同樣,只不過若是失敗,它不拋異常,返回 false  
boolean trySuccess(V result);  
  
// 標記該 future 失敗,及其失敗緣由。  
// 若是失敗,將拋出異常(失敗指的是已經有告終果了)  
Promise<V> setFailure(Throwable cause);  
  
// 標記該 future 失敗,及其失敗緣由。  
// 若是已經有結果,返回 false,不拋出異常  
boolean tryFailure(Throwable cause);  
  
// 標記該 future 不能夠被取消  
boolean setUncancellable();  
  
// 這裏和 ChannelFuture 同樣,對這幾個方法進行覆寫,目的是爲了返回 Promise 類型的實例  
@Override  
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);  
@Override  
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);  
  
@Override  
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);  
@Override  
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);  
  
@Override  
Promise<V> await() throws InterruptedException;  
@Override  
Promise<V> awaitUninterruptibly();  
  
@Override  
Promise<V> sync() throws InterruptedException;  
@Override  
Promise<V> syncUninterruptibly();  
}

可能有些讀者對 Promise 的概念不是很熟悉,這裏簡單說兩句。

我以爲只要明白一點,Promise 實例內部是一個任務,任務的執行每每是異步的,一般是一個線程池來處理任務。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 未來會被某個執行任務的線程在執行完成之後調用,同時那個線程在調用 setSuccess(result) 或 setFailure(t) 後會回調 listeners 的回調函數(固然,回調的具體內容不必定要由執行任務的線程本身來執行,它能夠建立新的線程來執行,也能夠將回調任務提交到某個線程池來執行)。並且,一旦 setSuccess(...) 或 setFailure(...) 後,那些 await() 或 sync() 的線程就會從等待中返回。

因此這裏就有兩種編程方式,一種是用 await(),等 await() 方法返回後,獲得 promise 的執行結果,而後處理它;另外一種就是提供 Listener 實例,咱們不太關心任務何時會執行完,只要它執行完了之後會去執行 listener 中的處理方法就行。

接下來,咱們再來看下 ChannelPromise,它繼承了前面介紹的 ChannelFuture 和 Promise 接口。

ChannelPromise 接口在 Netty 中使用得比較多,由於它綜合了 ChannelFuture 和 Promise 兩個接口:

/\*\*  
 \* Special {@link ChannelFuture} which is writable.  
 \*/  
publicinterface ChannelPromise extends ChannelFuture, Promise<Void\> {  
// 覆寫 ChannelFuture 中的 channel() 方法,其實這個方法一點沒變  
@Override  
Channel channel();  
  
// 下面幾個方法是覆寫 Promise 中的接口,爲了返回值類型是 ChannelPromise  
@Override  
ChannelPromise setSuccess(Void result);  
ChannelPromise setSuccess();  
boolean trySuccess();  
@Override  
ChannelPromise setFailure(Throwable cause);  
  
// 到這裏你們應該都熟悉了,下面幾個方法的覆寫也是爲了獲得 ChannelPromise 類型的實例  
@Override  
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);  
@Override  
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);  
@Override  
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);  
@Override  
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);  
  
@Override  
ChannelPromise sync() throws InterruptedException;  
@Override  
ChannelPromise syncUninterruptibly();  
@Override  
ChannelPromise await() throws InterruptedException;  
@Override  
ChannelPromise awaitUninterruptibly();  
  
/\*\*  
     \* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.  
     \*/  
// 咱們忽略這個方法吧。  
ChannelPromise unvoid();  
}

咱們能夠看到,它綜合了 ChannelFuture 和 Promise 中的方法,只不過經過覆寫將返回值都變爲 ChannelPromise 了而已,沒有增長什麼新的功能

小結一下,咱們上面介紹了幾個接口,Future 以及它的子接口 ChannelFuture 和 Promise,而後是 ChannelPromise 接口同時繼承了 ChannelFuture 和 Promise。

我把這幾個接口的主要方法列在一塊兒,這樣你們看得清晰些:

接下來,咱們須要來一個實現類,這樣才能比較直觀地看出它們是怎麼使用的,由於上面的這些都是接口定義,具體還得看實現類是怎麼工做的。

下面,咱們來介紹下 DefaultPromise 這個實現類,這個類很經常使用,它的源碼也不短,咱們先介紹幾個關鍵的內容,而後介紹一個示例使用。

首先,咱們看下它有哪些屬性:

publicclass DefaultPromise<V\> extends AbstractFuture<V\> implements Promise<V\> {  
// 保存執行結果  
privatevolatile Object result;  
// 執行任務的線程池,promise 持有 executor 的引用,這個其實有點奇怪了  
// 由於「任務」其實不必知道本身在哪裏被執行的  
privatefinal EventExecutor executor;  
// 監聽者,回調函數,任務結束後(正常或異常結束)執行  
private Object listeners;  
  
// 等待這個 promise 的線程數(調用sync()/await()進行等待的線程數量)  
privateshort waiters;  
  
// 是否正在喚醒等待線程,用於防止重複執行喚醒,否則會重複執行 listeners 的回調方法  
privateboolean notifyingListeners;  
    ......  
}
能夠看出,此類實現了 Promise,可是沒有實現 ChannelFuture,因此它和 Channel 聯繫不起來。

別急,咱們後面會碰到另外一個類 DefaultChannelPromise 的使用,這個類是綜合了 ChannelFuture 和 Promise 的,可是它的實現其實大部分都是繼承自這裏的 DefaultPromise 類的。

說完上面的屬性之後,你們能夠看下 setSuccess(V result)trySuccess(V result)setFailure(Throwable cause)tryFailure(Throwable cause) 這幾個方法:

看出 setSuccess(result) 和 trySuccess(result) 的區別了嗎?

上面幾個方法都很是簡單,先設置好值,而後執行監聽者們的回調方法。notifyListeners() 方法感興趣的讀者也能夠看一看,不過它還涉及到 Netty 線程池的一些內容,咱們尚未介紹到線程池,這裏就不展開了。上面的代碼,在 setSuccess0 或 setFailure0 方法中都會喚醒阻塞在 sync() 或 await() 的線程

另外,就是能夠看下 sync() 和 await() 的區別,其餘的我以爲隨便看看就行了。

@Override  
public Promise<V> sync() throws InterruptedException {  
    await();  
// 若是任務是失敗的,從新拋出相應的異常  
    rethrowIfFailed();  
returnthis;  
}

接下來,咱們來寫個實例代碼吧:

  public static void main(String\[\] args) {  
  
        // 構造線程池  
        EventExecutor executor = new DefaultEventExecutor();  
  
        // 建立 DefaultPromise 實例  
        Promise promise = new DefaultPromise(executor);  
  
        // 下面給這個 promise 添加兩個 listener  
        promise.addListener(new GenericFutureListener<Future<Integer>>() {  
            @Override  
            public void operationComplete(Future future) throws Exception {  
                if (future.isSuccess()) {  
                    System.out.println("任務結束,結果:" + future.get());  
                } else {  
                    System.out.println("任務失敗,異常:" + future.cause());  
                }  
            }  
        }).addListener(new GenericFutureListener<Future<Integer>>() {  
            @Override  
            public void operationComplete(Future future) throws Exception {  
                System.out.println("任務結束,balabala...");  
            }  
        });  
  
        // 提交任務到線程池,五秒後執行結束,設置執行 promise 的結果  
        executor.submit(new Runnable() {  
            @Override  
            public void run() {  
                try {  
                    Thread.sleep(5000);  
                } catch (InterruptedException e) {  
                }  
                // 設置 promise 的結果  
                // promise.setFailure(new RuntimeException());  
                promise.setSuccess(123456);  
            }  
        });  
  
        // main 線程阻塞等待執行結果  
        try {  
            promise.sync();  
        } catch (InterruptedException e) {  
        }  
    }

運行代碼,兩個 listener 將在 5 秒後將輸出:

任務結束,結果:123456
任務結束,balabala...

讀者這裏能夠試一下 sync() 和 await() 的區別,在任務中調用 promise.setFailure(new RuntimeException()) 試試看。

上面的代碼中,你們可能會對線程池 executor 和 promise 之間的關係感到有點迷惑。讀者應該也要清楚,具體的任務不必定就要在這個 executor 中被執行。任務結束之後,須要調用 promise.setSuccess(result) 做爲通知。

一般來講,promise 表明的 future 是不須要和線程池攪在一塊兒的,future 只關心任務是否結束以及任務的執行結果,至因而哪一個線程或哪一個線程池執行的任務,future 實際上是不關心的。

不過 Netty 畢竟不是要建立一個通用的線程池實現,而是和它要處理的 IO 息息相關的,因此咱們只不過要理解它就行了。

這節就說這麼多吧,咱們回過頭來再看一下這張圖,看看你們是否是看懂了這節內容:

咱們就說說上圖左邊的部分吧,雖然咱們還不知道 bind() 操做中具體會作什麼工做,可是咱們應該能夠猜出一二。

顯然,main 線程調用 b.bind(port) 這個方法會返回一個 ChannelFuture,bind() 是一個異步方法,當某個執行線程執行了真正的綁定操做後,那個執行線程必定會標記這個 future 爲成功(咱們假定 bind 會成功),而後這裏的 sync() 方法(main 線程)就會返回了。

若是 bind(port) 失敗,咱們知道,sync() 方法會將異常拋出來,而後就會執行到 finally 塊了。

一旦綁定端口 bind 成功,進入下面一行,f.channel() 方法會返回該 future 關聯的 channel。

channel.closeFuture() 也會返回一個 ChannelFuture,而後調用了 sync() 方法,這個 sync() 方法返回的條件是:有其餘的線程關閉了 NioServerSocketChannel,每每是由於須要停掉服務了,而後那個線程會設置 future 的狀態( setSuccess(result) 或 setFailure(cause) ),這個 sync() 方法纔會返回。

這篇文章就到這裏,但願你們對 Netty 中的異步編程有些瞭解,後續碰到源碼的時候能知道是怎麼使用的了。

相關文章
相關標籤/搜索