Netty源碼分析第七章: 編碼器和寫數據html
第五節: Future和Promisejava
Netty中的Future, 其實相似於jdk的Future, 用於異步獲取執行結果設計模式
Promise則至關於一個被觀察者, 其中promise對象會一直跟隨着channel的讀寫事件, 並跟蹤着事件狀態, 而後執行相應的回調數組
這種設計思路也就是java設計模式的觀察者模式promise
首先咱們看一段寫在handler中的業務代碼:併發
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture future = ctx.writeAndFlush("test data"); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()){ System.out.println("寫出成功"); }else{ System.out.println("寫出失敗"); } } }); }
熟悉netty的小夥伴估計對這段代碼並不陌生, 首先調用writeAndFlush方法將數據寫出, 而後返回的future進行添加Listener, 而且重寫回調函數異步
這裏舉一個最簡單的示例, 在回調函數中判斷future的狀態成功與否, 成功的話就打印"寫出成功", 不然節打印"寫出失敗"ide
這裏若是寫在handler中一般是NioEventLoop線程執行的, 在future返回以後纔會執行添加listener的操做, 若是在用戶線程中writeAndFlush是異步執行的, 在添加監聽的時候有可能寫出操做沒有執行完畢, 等寫出操做執行完畢以後纔會執行回調函數
以上邏輯在代碼中如何體現的呢?咱們首先跟到writeAndFlush的方法中去oop
這裏會走到AbstractChannelHandlerContext中的writeAndFlush方法中:
public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); }
這裏的邏輯以前剖析過, 想必你們並不陌生
這裏關注newPromise()方法, 跟進去:
public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); }
這裏直接建立了DefaultChannelPromise這個對象並傳入了當前channel和當前channel綁定NioEventLoop對象
在DefaultChannelPromise構造方法中, 也會將channel和NioEventLoop對象綁定在自身成員變量中
回到writeAndFlush方法繼續跟:
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); return promise; } write(msg, true, promise); return promise; }
這裏的邏輯也不陌生, 注意這裏最後返回了promise, 其實就是咱們上一步建立DefaultChannelPromise對象
DefaultChannelPromise實現了ChannelFuture接口, 因此方法若是返回該對象能夠被ChannelFuture類型接收
咱們繼續跟write方法:
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
這裏的邏輯咱們一樣不陌生, 若是nioEventLoop線程, 咱們繼續調invokeWriteAndFlush方法, 若是不是nioEventLoop線程則將writeAndFlush事件封裝成task, 交給eventLoop線程異步
這裏若是是異步執行, 則到這一步以後, 咱們的業務代碼中, writeAndFlush就會返回並添加監聽, 有關添加監聽的邏輯稍後分析
走到這裏, 不管同步異步, 都會執行到invokeWriteAndFlush方法:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
這裏也是咱們熟悉的邏輯, 咱們看到在invokeWrite0方法中傳入了咱們剛纔建立的DefaultChannelPromise
後續邏輯想必你們都比較熟悉, 經過事件傳播, 最終會調用head節點的write方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
這裏最終調用unsafe的write方法, 並傳入了promise對象
跟到AbstractUnsafe的write方法中:
public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); //負責緩衝寫進來的byteBuf
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //插入寫隊列
outboundBuffer.addMessage(msg, size, promise); }
這裏的邏輯以前小節也剖析過, 這裏咱們首先關注兩個部分, 首先看在catch中safeSetFailure這步
由於是catch塊, 說明發生了異常, 寫到緩衝區不成功, safeSetFailure就是設置寫出失敗的狀態
咱們跟到safeSetFailure方法中:
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } }
這裏看if判斷, 首先咱們的promise是DefaultChannelPromise, 因此!(promise instanceof VoidChannelPromise)爲true
重點分析promise.tryFailure(cause), 這裏是設置失敗狀態, 這裏會調用DefaultPromise的tryFailure方法
跟進tryFailure方法:
public boolean tryFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return true; } return false; }
再跟到setFailure0(cause)中:
private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }
這裏在if塊中的cas操做, 會將參數objResult的值設置到DefaultPromise的成員變量result中, 表示當前操做爲異常狀態
回到tryFailure方法:
這裏關注notifyListeners()這個方法, 這個方法是執行添加監聽的回調函數, 當writeAndFlush和addListener是異步執行的時候, 這裏有可能添加已經添加, 因此經過這個方法能夠調用添加監聽後的回調
若是writeAndFlush和addListener是同步執行的時候, 也就是都在NioEventLoop線程中執行的時候, 那麼走到這裏addListener還沒執行, 因此這裏不能回調添加監聽的回調函數, 那麼回調是何時執行的呢?咱們在剖析addListener步驟的時候會給你們分析
具體執行回調咱們再講解添加監聽的時候進行剖析
以上就是記錄異常狀態的大概邏輯
回到AbstractUnsafe的write方法:
咱們再關注這一步:
outboundBuffer.addMessage(msg, size, promise);
跟到addMessage方法中:
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); //代碼省略
}
咱們只須要關注包裝Entry的newInstance方法, 該方法傳入promise對象
跟到newInstance中:
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size; entry.total = total; entry.promise = promise; return entry; }
這裏將promise設置到Entry的成員變量中了, 也就是說, 每一個Entry都關聯了惟一的一個promise
咱們回到AbstractChannelHandlerContext的invokeWriteAndFlush方法中:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
咱們剛纔分析了write操做中promise的傳遞以及狀態設置的大概過程, 咱們繼續看在flush中promise的操做過程
這裏invokeFlush0()並無傳入promise對象, 是由於咱們剛纔分析過, promise對象會綁定在緩衝區中entry的成員變量中, 能夠經過其成員變量拿到promise對象
invokeFlush0()咱們以前也分析過, 經過事件傳遞, 最終會調用HeadContext的flush方法:
public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
最後跟到AbstractUnsafe的flush方法:
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
這塊邏輯以前已分析過, 繼續看flush0方法:
protected void flush0() { //代碼省略
try { doWrite(outboundBuffer); } catch (Throwable t) { //代碼省略
} finally { inFlush0 = false; } }
篇幅緣由咱們省略大段代碼
咱們繼續跟進doWrite方法:
protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { Object msg = in.current(); if (msg == null) { clearOpWrite(); return; } if (msg instanceof ByteBuf) { //代碼省略
boolean done = false; //代碼省略
if (done) { //移除當前對象
in.remove(); } else { break; } } else if (msg instanceof FileRegion) { //代碼省略
} else { throw new Error(); } } incompleteWrite(setOpWrite); }
這裏也省略了大段代碼, 咱們重點關注in.remove()這裏, 以前介紹過, 若是done爲true, 說明刷新事件已完成, 則移除當前entry節點
咱們跟到remove()方法中:
public boolean remove() { Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } e.recycle(); return true; }
這裏咱們看這一步:
ChannelPromise promise = e.promise;
以前咱們剖析promise對象會綁定在entry中, 而這步就是從entry中獲取promise對象
等remove操做完成, 會執行到這一步:
safeSuccess(promise);
這一步正好和咱們剛纔分析的safeSetFailure相反, 這裏是設置成功狀態
跟到safeSuccess方法中:
private static void safeSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise)) { PromiseNotificationUtil.trySuccess(promise, null, logger); } }
再跟到trySuccess方法中:
public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) { if (!p.trySuccess(result) && logger != null) { //代碼省略
} }
這裏再繼續跟if中的trySuccess方法, 最後會走到DefaultPromise的trySuccess方法:
public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; }
這裏跟到setSuccess0方法中:
private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); }
這裏的邏輯咱們剛纔剖析過了, 這裏參數傳入一個信號SUCCESS, 表示設置成功狀
再繼續跟setValue方法:
private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }
一樣, 在if判斷中, 經過cas操做將參數傳入的SUCCESS對象賦值到DefaultPromise的屬性result中, 咱們看這個屬性:
private volatile Object result;
這裏是Object類型, 也就是能夠賦值成任何類型
SUCCESS是一個Signal類型的對象, 這裏咱們能夠簡單理解成一種狀態, SUCCESS表示一種成功的狀態
經過上述cas操做, result的值將賦值成SUCCESS
咱們回到trySuccess方法:
public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; }
設置完成功狀態以後, 則會經過notifyListeners()執行監聽中的回調
咱們看用戶代碼:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture future = ctx.writeAndFlush("test data"); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()){ System.out.println("寫出成功"); }else{ System.out.println("寫出失敗"); } } }); }
在回調中會判斷future.isSuccess(), promise設置爲成功狀態這裏會返回true, 從而打印寫出成功"
跟到isSuccess方法中, 這裏會調用DefaultPromise的isSuccess方法:
public boolean isSuccess() { Object result = this.result; return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); }
咱們看到首先會拿到result對象, 而後判斷result不爲空, 而且不是UNCANCELLABLE, 而且不屬於CauseHolder對象
咱們剛纔分析若是promise設置爲成功裝載, 則result爲SUCCESS, 因此這裏條件成立, 能夠執行 if (future.isSuccess()) 中if塊的邏輯
和設置錯誤狀態的邏輯同樣, 這裏也有一樣的問題, 若是writeAndFlush是和addListener是異步操做, 那麼執行到回調的時候, 可能addListener已經添加完成, 因此能夠正常的執行回調
那麼若是writeAndFlush是和addListener是同步操做, writeAndFlush在執行回調的時候, addListener並無執行, 因此沒法執行回調方法, 那麼回調方法是如何執行的呢, 咱們看addListener這個方法:
addListener傳入ChannelFutureListener對象, 並重寫了operationComplete方法, 也就是執行回調的方法
這裏會執行到DefaultChannelPromise的addListener方法, 跟進去
public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.addListener(listener); return this; }
跟到父類的addListener中:
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }
這裏經過addListener0方法添加listener, 由於添加listener有可能會在不一樣的線程中操做, 好比用戶線程和NioEventLoop線程, 爲了防止併發問題, 這裏簡單粗暴的加了個synchronized關鍵字
跟到addListener0方法中:
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener); } }
若是是第一次添加listener, 則成員變量listeners爲null, 這樣就把參數傳入的GenericFutureListener賦值到成員變量listeners
若是是第二次添加listener, listeners不爲空, 會走到else if判斷, 由於第一次添加的listener是GenericFutureListener類型, 並非DefaultFutureListeners類型, 因此else if判斷返回false, 進入到else塊中
else塊中, 經過new的方式建立一個DefaultFutureListeners對象並賦值到成員變量listeners中
DefaultFutureListeners的構造方法中, 第一個參數傳入DefaultPromise中的成員變量listeners, 也就是第一次添加的GenericFutureListener對象, 第二個參數爲第二次添加的GenericFutureListener對象, 這裏經過兩個GenericFutureListener對象包裝成一個DefaultFutureListeners對象
咱們看listeners的定義:
private Object listeners;
這裏是個Object類型, 因此能夠保存任何類型的對象
再看DefaultFutureListeners的構造方法:
DefaultFutureListeners( GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) { listeners = new GenericFutureListener[2]; //第0個
listeners[0] = first; //第1個
listeners[1] = second; size = 2; //代碼省略
}
在DefaultFutureListeners類中也定義了一個成員變量listeners, 類型爲GenericFutureListener數組
構造方法中初始化listeners這個數組, 而且數組中第一個值賦值爲咱們第一次添加的GenericFutureListener, 第二個賦值爲咱們第二次添加的GenericFutureListener
回到addListener0方法中:
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener); } }
通過兩次添加listener, 屬性listeners的值就變成了DefaultFutureListeners類型的對象, 若是第三次添加listener, 則會走到else if塊中, DefaultFutureListeners對象經過調用add方法繼續添加listener
跟到add方法中:
public void add(GenericFutureListener<? extends Future<?>> l) { GenericFutureListener<? extends Future<?>>[] listeners = this.listeners; final int size = this.size; if (size == listeners.length) { this.listeners = listeners = Arrays.copyOf(listeners, size << 1); } listeners[size] = l; this.size = size + 1; //代碼省略
}
這裏的邏輯也比較簡單, 就是爲當前的數組對象listeners中追加新的GenericFutureListener對象, 若是listeners容量不足則進行擴容操做
根據以上邏輯, 就完成了listener的添加邏輯
那麼再看咱們剛纔遺留的問題, 若是writeAndFlush和addListener是同步進行的, writeAndFlush執行回調時尚未addListener尚未執行回調, 那麼回調是如何執行的呢?
回到DefaultPromise的addListener中:
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }
咱們分析完了addListener0方法, 再往下看
這個會有if判斷isDone(), isDone方法, 就是程序執行到這一步的時候, 判斷刷新事件是否執行完成
跟到isDone方法中:
public boolean isDone() { return isDone0(result); }
繼續跟isDone0, 這裏傳入了成員變量result
private static boolean isDone0(Object result) { return result != null && result != UNCANCELLABLE; }
這裏判斷result不爲null而且不爲UNCANCELLABLE, 則就表示完成
由於成功的狀態是SUCCESS, 因此flush成功這裏會返回true
回到 addListener中:
若是執行完成, 就經過notifyListeners()方法執行回調, 這也解釋剛纔的問題, 在同步操做中, writeAndFlush在執行回調時並無添加listener, 因此添加listener的時候會判斷writeAndFlush的執行狀態, 若是狀態時完成, 則會這裏執行回調
一樣, 在異步操做中, 走到這裏writeAndFlush可能還沒完成, 因此這裏不會執行回調, 由writeAndFlush執行回調
因此, 不管writeAndFlush和addListener誰先完成, 均可以執行到回調方法
跟到notifyListeners()方法中:
private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }
這裏首先判斷是不是eventLoop線程, 若是是eventLoop線程則執行if塊中的邏輯, 若是不是eventLoop線程, 則把執行回調的邏輯封裝成task丟到EventLoop的任務隊列中異步執行
咱們重點關注notifyListenersNow()方法, 跟進去:
private void notifyListenersNow() { Object listeners; synchronized (this) { if (notifyingListeners || this.listeners == null) { return; } notifyingListeners = true; listeners = this.listeners; this.listeners = null; } for (;;) { if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners); } //代碼省略
} }
在無限for循環中, 首先首先判斷listeners是否是DefaultFutureListeners類型, 根據咱們以前的邏輯, 若是隻添加了一個listener, 則listeners是GenericFutureListener類型
一般在添加的時候只會添加一個listener, 因此咱們跟到else塊中的notifyListener0方法:
private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } }
咱們看到, 這裏執行了GenericFutureListener的中咱們重寫的回調函數operationComplete
以上就是執行回調的相關邏輯
第七章總結
這一章講解了有關write和flush的相關邏輯, 並分析了有關添加監聽和異步寫數據的相關步驟
通過學習, 同窗們應該掌握以下知識:
write操做是如何將ByteBuf添加到發送緩衝區的
flush操做是如何將ByteBuf寫出到chanel中的
抽象編碼器MessageToByteEncoder中如何定義了編碼器的骨架邏輯
writeAndFlush和addListener在同步和異步操做中是如何執行回調的