netty中的promise和future源碼分析(一)

netty中的操做是異步操做,須要使用future來表示異步操做的結果。netty在jdk中的future的基礎上進行了擴展。接下來採用自頂向下的方式對future和promise的源碼進行分析:javascript

clipboard.png

(一)jdk中future和netty中future的比較

jdk中future:

// 取消異步操做
boolean cancel(boolean mayInterruptIfRunning);
// 異步操做是否取消
boolean isCancelled();
// 異步操做是否完成,正常終止、異常、取消都是完成
boolean isDone();
// 阻塞直到取得異步操做結果
V get() throws InterruptedException, ExecutionException;
// 同上,但最長阻塞時間爲timeout
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

jdk中future的特色:
1.不管結果是成功仍是失敗仍是取消,返回的都是isdone();
2.並且咱們在異步操做觸發和結束的時候比較關心其餘的一些操做,在jdk的future中沒法進行補充。因此netty對future作了擴展。java

netty中future(如下爲擴展內容):

// 異步操做完成且正常終止
boolean isSuccess();
// 異步操做是否能夠取消
boolean isCancellable();
// 異步操做失敗的緣由
Throwable cause();
// 添加一個監聽者,異步操做完成時回調,類比javascript的回調函數
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 阻塞直到異步操做完成
Future<V> await() throws InterruptedException;
// 同上,但異步操做失敗時拋出異常
Future<V> sync() throws InterruptedException;
// 非阻塞地返回異步結果,若是還沒有完成返回null
V getNow();

netty中future的特色:
1.操做結果分爲success,fail,canceled三種;
2.而且經過addlisteners()方法能夠添加回調操做,即觸發或者完成時須要進行的操做;
3.await()和sync(),能夠以阻塞的方式等待異步完成;getnow()能夠得到異步操做的結果,若是還未完成則返回Null;promise

綜合以上的分析,給出一張表示future的狀態圖來增強對future的理解:

clipboard.png

ps:future只有兩種狀態,unconpleted和conpleted.異步

(二)abstractfuture

abstractfuture類實現future接口,在其中主要實現了get()方法,以阻塞的方式來取得異步操做的結果,其功能與jdk中future的get()方法相似。ide

abstractfuture源碼:

@Override函數

public V get() throws InterruptedException, ExecutionException {
    //阻塞直到異步任務完成
    await();
    
    Throwable cause = cause();
    if (cause == null) {
    //得到異步操做結果
        return getNow();
    }
    //操做失敗則拋出異常
    if (cause instanceof CancellationException) {
        throw (CancellationException) cause;
    }
    throw new ExecutionException(cause);

}
該類中get(long timeout, TimeUnit unit) 實現方式與get()大體相同。oop

(三)completefuture

completedfuture表示已經完成異步操做,該類在異步操做結束時建立,用戶使用addlistener()方法提供異步操做方法。this

completefuture的狀態

@Override
public boolean isDone() {
    return true;
}

completefuture表示已經完成異步操做,因此isdone()方法返回true;而且sync()方法和await()方法會當即返回。spa

@Override
public Future<V> sync() throws InterruptedException {
    return this;
}

觸發操做的執行者(eventexecutor)

private final EventExecutor executor;

protected CompleteFuture(EventExecutor executor) {
    this.executor = executor;
}

protected EventExecutor executor() {
    return executor;
}

completefuture中維護了一個eventexecutor實例,用來執行listener中的任務。線程

觸發操做的執行過程:

第一步addlistener():

#completefuture類
 public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listener == null) {
        throw new NullPointerException("listener");
    }
    DefaultPromise.notifyListener(executor(), this, listener);
    return this;

}

調用addlistener()方法後,DefaultPromise會調用靜態方法notifyListener(),來執行listener中的操做。

#DefaultPromise類
protected static void notifyListener(
        EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
    //省略
    ……
    notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
}

而後再看看notifyListenerWithStackOverFlowProtection()

#DefaultPromise類
private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
                                                              final Future<?> future,
                                                              final GenericFutureListener<?> listener) {
    if (executor.inEventLoop()) {
        //省略,這段代碼表示在本線程中執行
        ……
    }
    //在外部觸發,則將其封裝成runnable任務
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListener0(future, listener);
        }
    });
}

接下來的safeexecute()和notifylistener0()就很簡單了,

#DefaultPromise類
private static void safeExecute(EventExecutor executor, Runnable task) {
    try {
    //將任務添加到任務隊列中等待執行
        executor.execute(task);
    } 
    //省略
    ……
}

private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
    //能夠清楚的看到,執行到了listener中的operationcomplete(future)方法
        l.operationComplete(future);
    } catch (Throwable t) {
        logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
    }
}

completefuture類總結:
1.conpletefuture中保存了eventexecutor的信息,用來執行listener中的任務。
2.調用了future的addlistener()方法後,將listener中的操做封裝成runnble任務扔到eventexecutor中的任務隊列中等待執行
3.completefuture表示已經完成異步操做,狀態是isdone。

(四)channelfuture

這是一個繼承future的接口,顧名思義,該接口與通道操做有關,因此在channelfuture接口中,除了覆蓋future的功能外,只提供了一個channel()抽象方法。

Channel channel();

(五)completechannelfuture

completechannelfuture類實現channelfuture接口,繼承completefuture類。

abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture

異步操做結果的獲取

尖括號中的泛型表示返回結果的類型,此處是void,表示不關心返回的結果。

@Override
public Void getNow() {
    return null;
}

getnow()方法的返回結果爲null,結合前面對abstractfuture類中get()方法的分析,能夠得知在completechannelfuture跟get相關的方法返回的結果都是null.

completechannelfuture的初始化

類中的channel字段:

private final Channel channel;

類的初始化:

protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
    super(executor);
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;
}

eventexecutor的管理:

@Override
protected EventExecutor executor() {
    EventExecutor e = super.executor();
    if (e == null) {
        return channel().eventLoop();
    } else {
        return e;
    }
}

若是父類中eventexecutor不爲空,則返回父類中的eventexecutor,不然返回channel中保存的eventexecutor.

其餘

completechannelfuture中的addlistener()和await(),sync()等方法的特色和completefuture類類似。

(六)Succeededchannelfuture/FailedChannelFuture

這兩個類繼承自completechannelfuture,一個表示的狀態是success,另外一個表示操做失敗。

#FailedChannelFuture
@Override
public Throwable cause() {
    return cause;
}

#Succeededchannelfuture
@Override
public boolean isSuccess() {
    return true;
}

到此爲止,netty中的future分析完畢。

相關文章
相關標籤/搜索