Netty源碼分析第7章(編碼器和寫數據)---->第5節: Future和Promies

 

Netty源碼分析第七章: 編碼器和寫數據html

 

第五節: Future和Promisejava

 

Netty中的Future, 其實相似於jdk的Future, 用於異步獲取執行結果設計模式

Promise則至關於一個被觀察者, 其中promise對象會一直跟隨着channel的讀寫事件, 並跟蹤着事件狀態, 而後執行相應的回調數組

這種設計思路也就是java設計模式的觀察者模式promise

首先咱們看一段寫在handler中的業務代碼:併發

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture future = ctx.writeAndFlush("test data"); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()){ System.out.println("寫出成功"); }else{ System.out.println("寫出失敗"); } } }); }

熟悉netty的小夥伴估計對這段代碼並不陌生, 首先調用writeAndFlush方法將數據寫出, 而後返回的future進行添加Listener, 而且重寫回調函數異步

這裏舉一個最簡單的示例, 在回調函數中判斷future的狀態成功與否, 成功的話就打印"寫出成功", 不然節打印"寫出失敗"ide

這裏若是寫在handler中一般是NioEventLoop線程執行的, 在future返回以後纔會執行添加listener的操做, 若是在用戶線程中writeAndFlush是異步執行的, 在添加監聽的時候有可能寫出操做沒有執行完畢, 等寫出操做執行完畢以後纔會執行回調函數

以上邏輯在代碼中如何體現的呢?咱們首先跟到writeAndFlush的方法中去oop

這裏會走到AbstractChannelHandlerContext中的writeAndFlush方法中:

public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); }

這裏的邏輯以前剖析過, 想必你們並不陌生

這裏關注newPromise()方法, 跟進去:

public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); }

這裏直接建立了DefaultChannelPromise這個對象並傳入了當前channel和當前channel綁定NioEventLoop對象

在DefaultChannelPromise構造方法中, 也會將channel和NioEventLoop對象綁定在自身成員變量中

回到writeAndFlush方法繼續跟:

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); return promise; } write(msg, true, promise); return promise; }

這裏的邏輯也不陌生, 注意這裏最後返回了promise, 其實就是咱們上一步建立DefaultChannelPromise對象

DefaultChannelPromise實現了ChannelFuture接口, 因此方法若是返回該對象能夠被ChannelFuture類型接收

咱們繼續跟write方法:

private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }

這裏的邏輯咱們一樣不陌生, 若是nioEventLoop線程, 咱們繼續調invokeWriteAndFlush方法, 若是不是nioEventLoop線程則將writeAndFlush事件封裝成task, 交給eventLoop線程異步

這裏若是是異步執行, 則到這一步以後, 咱們的業務代碼中, writeAndFlush就會返回並添加監聽, 有關添加監聽的邏輯稍後分析

走到這裏, 不管同步異步, 都會執行到invokeWriteAndFlush方法:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }

這裏也是咱們熟悉的邏輯, 咱們看到在invokeWrite0方法中傳入了咱們剛纔建立的DefaultChannelPromise

後續邏輯想必你們都比較熟悉, 經過事件傳播, 最終會調用head節點的write方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }

這裏最終調用unsafe的write方法, 並傳入了promise對象

跟到AbstractUnsafe的write方法中:

public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); //負責緩衝寫進來的byteBuf
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //插入寫隊列
 outboundBuffer.addMessage(msg, size, promise); }

這裏的邏輯以前小節也剖析過, 這裏咱們首先關注兩個部分, 首先看在catch中safeSetFailure這步

由於是catch塊, 說明發生了異常, 寫到緩衝區不成功, safeSetFailure就是設置寫出失敗的狀態

咱們跟到safeSetFailure方法中:

protected final void safeSetFailure(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } }

這裏看if判斷, 首先咱們的promise是DefaultChannelPromise, 因此!(promise instanceof VoidChannelPromise)爲true

重點分析promise.tryFailure(cause), 這裏是設置失敗狀態, 這裏會調用DefaultPromise的tryFailure方法

跟進tryFailure方法:

public boolean tryFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return true; } return false; }

再跟到setFailure0(cause)中:

private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }

這裏在if塊中的cas操做, 會將參數objResult的值設置到DefaultPromise的成員變量result中, 表示當前操做爲異常狀態

回到tryFailure方法:

這裏關注notifyListeners()這個方法, 這個方法是執行添加監聽的回調函數, 當writeAndFlush和addListener是異步執行的時候, 這裏有可能添加已經添加, 因此經過這個方法能夠調用添加監聽後的回調

若是writeAndFlush和addListener是同步執行的時候, 也就是都在NioEventLoop線程中執行的時候, 那麼走到這裏addListener還沒執行, 因此這裏不能回調添加監聽的回調函數, 那麼回調是何時執行的呢?咱們在剖析addListener步驟的時候會給你們分析

 

具體執行回調咱們再講解添加監聽的時候進行剖析

以上就是記錄異常狀態的大概邏輯

回到AbstractUnsafe的write方法:

咱們再關注這一步:

outboundBuffer.addMessage(msg, size, promise);

跟到addMessage方法中:

public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); //代碼省略
}

咱們只須要關注包裝Entry的newInstance方法, 該方法傳入promise對象

跟到newInstance中:

static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size; entry.total = total; entry.promise = promise; return entry; }

這裏將promise設置到Entry的成員變量中了, 也就是說, 每一個Entry都關聯了惟一的一個promise

咱們回到AbstractChannelHandlerContext的invokeWriteAndFlush方法中:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }

咱們剛纔分析了write操做中promise的傳遞以及狀態設置的大概過程, 咱們繼續看在flush中promise的操做過程

這裏invokeFlush0()並無傳入promise對象, 是由於咱們剛纔分析過, promise對象會綁定在緩衝區中entry的成員變量中, 能夠經過其成員變量拿到promise對象

invokeFlush0()咱們以前也分析過, 經過事件傳遞, 最終會調用HeadContext的flush方法:

public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }

最後跟到AbstractUnsafe的flush方法:

public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }

這塊邏輯以前已分析過, 繼續看flush0方法:

protected void flush0() { //代碼省略
    try { doWrite(outboundBuffer); } catch (Throwable t) { //代碼省略
    } finally { inFlush0 = false; } }

篇幅緣由咱們省略大段代碼

咱們繼續跟進doWrite方法:

protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { Object msg = in.current(); if (msg == null) { clearOpWrite(); return; } if (msg instanceof ByteBuf) { //代碼省略
            boolean done = false; //代碼省略
            if (done) { //移除當前對象
 in.remove(); } else { break; } } else if (msg instanceof FileRegion) { //代碼省略
        } else { throw new Error(); } } incompleteWrite(setOpWrite); }

這裏也省略了大段代碼, 咱們重點關注in.remove()這裏, 以前介紹過, 若是done爲true, 說明刷新事件已完成, 則移除當前entry節點

咱們跟到remove()方法中:

public boolean remove() { Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } e.recycle(); return true; }

這裏咱們看這一步:

ChannelPromise promise = e.promise;

以前咱們剖析promise對象會綁定在entry中, 而這步就是從entry中獲取promise對象

等remove操做完成, 會執行到這一步:

safeSuccess(promise);

這一步正好和咱們剛纔分析的safeSetFailure相反, 這裏是設置成功狀態

跟到safeSuccess方法中:

private static void safeSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise)) { PromiseNotificationUtil.trySuccess(promise, null, logger); } }

再跟到trySuccess方法中:

public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) { if (!p.trySuccess(result) && logger != null) { //代碼省略
 } }

這裏再繼續跟if中的trySuccess方法, 最後會走到DefaultPromise的trySuccess方法:

public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; }

這裏跟到setSuccess0方法中:

private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); }

這裏的邏輯咱們剛纔剖析過了, 這裏參數傳入一個信號SUCCESS, 表示設置成功狀

再繼續跟setValue方法:

private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }

一樣, 在if判斷中, 經過cas操做將參數傳入的SUCCESS對象賦值到DefaultPromise的屬性result中, 咱們看這個屬性:

 private volatile Object result; 

這裏是Object類型, 也就是能夠賦值成任何類型

SUCCESS是一個Signal類型的對象, 這裏咱們能夠簡單理解成一種狀態, SUCCESS表示一種成功的狀態

經過上述cas操做, result的值將賦值成SUCCESS

咱們回到trySuccess方法:

public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; }

設置完成功狀態以後, 則會經過notifyListeners()執行監聽中的回調

咱們看用戶代碼:

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture future = ctx.writeAndFlush("test data"); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()){ System.out.println("寫出成功"); }else{ System.out.println("寫出失敗"); } } }); }

在回調中會判斷future.isSuccess(), promise設置爲成功狀態這裏會返回true, 從而打印寫出成功"

跟到isSuccess方法中, 這裏會調用DefaultPromise的isSuccess方法:

public boolean isSuccess() { Object result = this.result; return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); }

咱們看到首先會拿到result對象, 而後判斷result不爲空, 而且不是UNCANCELLABLE, 而且不屬於CauseHolder對象

咱們剛纔分析若是promise設置爲成功裝載, 則result爲SUCCESS, 因此這裏條件成立, 能夠執行 if (future.isSuccess()) 中if塊的邏輯

 

和設置錯誤狀態的邏輯同樣, 這裏也有一樣的問題, 若是writeAndFlush是和addListener是異步操做, 那麼執行到回調的時候, 可能addListener已經添加完成, 因此能夠正常的執行回調

那麼若是writeAndFlush是和addListener是同步操做, writeAndFlush在執行回調的時候, addListener並無執行, 因此沒法執行回調方法, 那麼回調方法是如何執行的呢, 咱們看addListener這個方法:

addListener傳入ChannelFutureListener對象, 並重寫了operationComplete方法, 也就是執行回調的方法

這裏會執行到DefaultChannelPromise的addListener方法, 跟進去

public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.addListener(listener); return this; }

跟到父類的addListener中:

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }

這裏經過addListener0方法添加listener, 由於添加listener有可能會在不一樣的線程中操做, 好比用戶線程和NioEventLoop線程, 爲了防止併發問題, 這裏簡單粗暴的加了個synchronized關鍵字

跟到addListener0方法中:

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener); } }

若是是第一次添加listener, 則成員變量listeners爲null, 這樣就把參數傳入的GenericFutureListener賦值到成員變量listeners

若是是第二次添加listener, listeners不爲空, 會走到else if判斷, 由於第一次添加的listener是GenericFutureListener類型, 並非DefaultFutureListeners類型, 因此else if判斷返回false, 進入到else塊中

else塊中, 經過new的方式建立一個DefaultFutureListeners對象並賦值到成員變量listeners中

DefaultFutureListeners的構造方法中, 第一個參數傳入DefaultPromise中的成員變量listeners, 也就是第一次添加的GenericFutureListener對象, 第二個參數爲第二次添加的GenericFutureListener對象, 這裏經過兩個GenericFutureListener對象包裝成一個DefaultFutureListeners對象

咱們看listeners的定義:

private Object listeners;

這裏是個Object類型, 因此能夠保存任何類型的對象

再看DefaultFutureListeners的構造方法:

DefaultFutureListeners( GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) { listeners = new GenericFutureListener[2]; //第0個
    listeners[0] = first; //第1個
    listeners[1] = second; size = 2; //代碼省略
}

在DefaultFutureListeners類中也定義了一個成員變量listeners, 類型爲GenericFutureListener數組

構造方法中初始化listeners這個數組, 而且數組中第一個值賦值爲咱們第一次添加的GenericFutureListener, 第二個賦值爲咱們第二次添加的GenericFutureListener

回到addListener0方法中:

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener); } }

通過兩次添加listener, 屬性listeners的值就變成了DefaultFutureListeners類型的對象, 若是第三次添加listener, 則會走到else if塊中, DefaultFutureListeners對象經過調用add方法繼續添加listener

跟到add方法中:

public void add(GenericFutureListener<? extends Future<?>> l) { GenericFutureListener<? extends Future<?>>[] listeners = this.listeners; final int size = this.size; if (size == listeners.length) { this.listeners = listeners = Arrays.copyOf(listeners, size << 1); } listeners[size] = l; this.size = size + 1; //代碼省略
}

這裏的邏輯也比較簡單, 就是爲當前的數組對象listeners中追加新的GenericFutureListener對象, 若是listeners容量不足則進行擴容操做

根據以上邏輯, 就完成了listener的添加邏輯

那麼再看咱們剛纔遺留的問題, 若是writeAndFlush和addListener是同步進行的, writeAndFlush執行回調時尚未addListener尚未執行回調, 那麼回調是如何執行的呢?

回到DefaultPromise的addListener中:

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }

咱們分析完了addListener0方法, 再往下看

這個會有if判斷isDone(), isDone方法, 就是程序執行到這一步的時候, 判斷刷新事件是否執行完成

跟到isDone方法中:

public boolean isDone() { return isDone0(result); }

繼續跟isDone0, 這裏傳入了成員變量result

private static boolean isDone0(Object result) { return result != null && result != UNCANCELLABLE; }

這裏判斷result不爲null而且不爲UNCANCELLABLE, 則就表示完成

由於成功的狀態是SUCCESS, 因此flush成功這裏會返回true

回到 addListener中:

若是執行完成, 就經過notifyListeners()方法執行回調, 這也解釋剛纔的問題, 在同步操做中, writeAndFlush在執行回調時並無添加listener, 因此添加listener的時候會判斷writeAndFlush的執行狀態, 若是狀態時完成, 則會這裏執行回調

一樣, 在異步操做中, 走到這裏writeAndFlush可能還沒完成, 因此這裏不會執行回調, 由writeAndFlush執行回調

因此, 不管writeAndFlush和addListener誰先完成, 均可以執行到回調方法

跟到notifyListeners()方法中:

private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }

這裏首先判斷是不是eventLoop線程, 若是是eventLoop線程則執行if塊中的邏輯, 若是不是eventLoop線程, 則把執行回調的邏輯封裝成task丟到EventLoop的任務隊列中異步執行

咱們重點關注notifyListenersNow()方法, 跟進去:

private void notifyListenersNow() { Object listeners; synchronized (this) { if (notifyingListeners || this.listeners == null) { return; } notifyingListeners = true; listeners = this.listeners; this.listeners = null; } for (;;) { if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners); } //代碼省略
 } }

在無限for循環中, 首先首先判斷listeners是否是DefaultFutureListeners類型, 根據咱們以前的邏輯, 若是隻添加了一個listener, 則listeners是GenericFutureListener類型

一般在添加的時候只會添加一個listener, 因此咱們跟到else塊中的notifyListener0方法:

private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } }

咱們看到, 這裏執行了GenericFutureListener的中咱們重寫的回調函數operationComplete

 

以上就是執行回調的相關邏輯

 

 

第七章總結

 

        這一章講解了有關writeflush的相關邏輯, 並分析了有關添加監聽和異步寫數據的相關步驟

        通過學習, 同窗們應該掌握以下知識:

        write操做是如何將ByteBuf添加到發送緩衝區的

        flush操做是如何將ByteBuf寫出到chanel中的

        抽象編碼器MessageToByteEncoder中如何定義了編碼器的骨架邏輯

        writeAndFlushaddListener在同步和異步操做中是如何執行回調的

 

上一節: 刷新buffer隊列

下一節: FastThreadLocal的使用和建立

相關文章
相關標籤/搜索