本文地址: https://juejin.im/post/5df771...
本文的 Netty源碼使用的是 4.1.31.Final
版本,不一樣版本會有一些差別.java
在說JDK的異步Future以前,先簡單介紹一下JDK自帶的Future機制.git
首先先上一段代碼github
public class JDKFuture { static ExecutorService executors = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(16)); public static void main(String[] args) throws Exception{ int cnt = 1; Future[] jdkFuture=new Future[cnt]; Object jdkFutureResult; for(int i = 0;i < cnt; i++){ jdkFuture[i] = executors.submit(new JDKCallable(i)); } System.out.println(String.format("%s 在 %s 即將獲取任務執行結果", Thread.currentThread(), new Date())); jdkFutureResult = jdkFuture[0].get(); System.out.println(String.format("%s 在 %s 任務結果獲取完畢 %s", Thread.currentThread(), new Date(), jdkFutureResult)); executors.shutdown(); } static class JDKCallable implements Callable{ int index; JDKCallable(int ind){ this.index = ind; } public Object call() throws Exception { try { System.out.println(String.format("線程 [%s] 提交任務[%s]", Thread.currentThread(), this.index)); // 耗時2秒,模擬耗時操做 Thread.sleep(2000); System.out.println(String.format("線程 [%s] 執行任務[%s]執行完畢", Thread.currentThread(), this.index)); }catch(InterruptedException e){ e.printStackTrace(); } return String.format("任務%s執行結果",this.index); } } }
輸出結果爲:promise
線程 [Thread[pool-1-thread-1,5,main]] 提交任務[0] Thread[main,5,main] 在 Mon Dec 16 16:40:38 CST 2019 即將獲取任務執行結果 線程 [Thread[pool-1-thread-1,5,main]] 執行任務[0]執行完畢 Thread[main,5,main] 在 Mon Dec 16 16:40:40 CST 2019 任務結果獲取完畢 任務0執行結果
能夠看到主線程在使用 future.get()
的時候,由於子線程還未處理完返回結果而致使主線程活生生的等了2秒鐘(耗時操做),這也是JDK自帶的Future機制不夠完善的地方.由於jdk自身的future機制不夠完善,因此Netty自實現了一套Future機制.緩存
Netty的Future是異步的,那他是怎麼實現的呢?接下來就從源碼開始探究.框架
先看一下 Netty 的 Future
和 Promise
這兩個接口異步
/** * The result of an asynchronous operation * 異步操做的結果 * 對狀態的判斷、添加listener、獲取結果 */ public interface Future<V> extends java.util.concurrent.Future<V> { boolean isSuccess(); boolean isCancellable(); Throwable cause(); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> sync() throws InterruptedException; Future<V> syncUninterruptibly(); Future<V> await() throws InterruptedException; Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); V getNow(); @Override boolean cancel(boolean mayInterruptIfRunning); }
Promise是一個特殊的Future,它可寫,可寫意味着能夠修改裏面的結果.async
/** * Special {@link Future} which is writable. * 一個可寫的特殊的Future * 繼承 Future, 繼承的方法就不列出 */ public interface Promise<V> extends Future<V> { /** * Marks this future as a success and notifies all * listeners. * If it is success or failed already it will throw an {@link IllegalStateException}. * 將這個 future 標記爲 success 而且通知全部的 listeners * 若是已經成功或者失敗將會拋出異常 */ Promise<V> setSuccess(V result); /** * Marks this future as a success and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a success. Otherwise {@code false} because this future is * already marked as either a success or a failure. * 嘗試設置結果,成功返回true, 失敗 false, 上面的方法設置失敗會拋出異常 */ boolean trySuccess(V result); // 這2個跟上面的差很少 Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); /** * Make this future impossible to cancel. * * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done * without being cancelled. {@code false} if this future has been cancelled already. */ boolean setUncancellable(); }
看到這裏都同窗都默認是用netty寫過程序的~,還沒寫過的話能夠看看官方文檔或者個人另外一篇Netty使用.ide
接下來就開始源碼的解讀.函數
總所周知!
,咱們使用Netty開發的時候,寫出數據用的是 writeAndFlush(msg)
, 至於 write(msg)
嘛, 不就是少了個 flush (沒錯,是我比較懶).
在你們知道 channel().write
和 ctx.write
的區別後, 咱們就從 channel().write
開始講起.
不行,我感受仍是要說一下一些補充的,否則內心不舒服.
Netty中有一個pipeline
,也就是事件調用鏈,開發的時候在調用鏈裏面加入本身處理事件的handle,可是在這條 pipeline 中, Netty給咱們加上了 Head
和 tail
這兩個handle,方便Netty框架處理事件.
先看 DefaultChannelPipeline 的初始化,在初始化代碼裏給咱們添加了2個handle, head 和 tail, 這2個東西頗有用,爲何這麼說呢?詳情看後面解答
protected DefaultChannelPipeline(Channel channel) { this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel"); this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null); this.voidPromise = new VoidChannelPromise(channel, true); // ChannelInboundHandler this.tail = new DefaultChannelPipeline.TailContext(this); // ChannelInboundHandler && ChannelOutboundHandler this.head = new DefaultChannelPipeline.HeadContext(this); this.head.next = this.tail; this.tail.prev = this.head; }
沒錯,仍是從 channel().write(msg)
開始提及(爲何我要用仍是).
跟蹤代碼 channel().write(), 首先會調用到 DefaultChannelPipeline的 writeAndFlush 方法.
public final ChannelFuture writeAndFlush(Object msg) { return this.tail.writeAndFlush(msg); }
this.tail 就是上面構造函數裏面初始化的 tailHandle
, 而 write
是出棧事件, 會從 tailHandle 開始往前傳遞,最後傳遞到 headHandle
(怎麼感受好像提早劇透了).
public ChannelFuture writeAndFlush(Object msg) { // 這裏new了一個 promise, 而後這個promise將會一直傳遞,一直傳遞..... return this.writeAndFlush(msg, this.newPromise()); }
接下來來到了AbstractChannelHandlerContext
的 writeAndFlush.
/** * 執行 write and flush 操做 * @param msg * @param promise */ private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { // 這個方法在 ChannelHandler#handlerAdded 調用後,纔會返回 true if (invokeHandler()) { // write 繼續傳遞 invokeWrite0(msg, promise); // flush data invokeFlush0(); } else { writeAndFlush(msg, promise); } } private void write(Object msg, boolean flush, ChannelPromise promise) { // 查找下一個 OutboundHandle, 由於是要輸出 AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); // 下一個 OutboundHandle 所在的線程 EventExecutor executor = next.executor(); // 若是在是同一個線程(因爲Netty的channel在一個ThreadPool中只綁定一個Thread, 不一樣線程的話也意味着是不一樣線程池) if (executor.inEventLoop()) { // 在同一個線程池(這裏意味着同一個線程)中, if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { // 在不一樣線程池(不一樣線程池那天然就是不一樣線程),須要建立一個任務,提交到下一個線程池 final AbstractWriteTask task; if (flush) { // 提交給下一個線程池 && flush task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } // 由於是 write 事件, so 接下來提交任務到下一個 OutboundHandle(出棧) 所在的線程, 由它執行 if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. // 任務提交失敗,取消任務 task.cancel(); } } }
接下來本篇文章最重要的地方了, HeadContext !
HeadContext的write和flush方法 實際上都是調用 unsafe的方法實現.
// 若是是 writeAndFlush ,調用 write後會調用flush @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 這個調用 AbstrachUnsafe.write unsafe.write(msg, promise); } // 這是 unsafe 的 write 方法 @Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; // outboundBuffer = null 代表 channel已經關閉而且須要將 future 結果設置爲 false if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } // 將 msg添加進 buffer 中 outboundBuffer.addMessage(msg, size, promise); }
若是是WriteAndFlush, 則在調用write後,會調用Head的flush方法,同 write是調用AbstractUnsafe的flush
/** * write 以後再調用這個 flush */ @Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // buffer 標記爲能夠被 flush outboundBuffer.addFlush(); // 接下來就是真正的 flush flush0(); }
ChannelOutboundBuffer 是個啥呢?
ChannelOutboundBuffer 簡單來講就是存儲當前channel寫出的數據
, 而且在調用flush的時候將他們都寫出去.
跟着源碼一直走,在flush0以後,最終會調用到 AbstractNioMessageChannel#doWrite
方法.(上面還有doRead方法,是接收數據的時候調用的)
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { // Object msg = in.current(); if (msg == null) { // Wrote all messages. // 判斷寫事件 if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break; } try { // 循環寫出數據 boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { // 真正的寫出數據 // 最終會調用 javaChannel().send(nioData, mi); // 很眼熟吧,這個是java nio的方法,註冊的時候也是javaChannel().register() if (doWriteMessage(msg, in)) { done = true; break; } } // 成功寫出,從 buffer 中移除剛纔寫出的數據 if (done) { in.remove(); } else { // Did not write all messages. // 寫出失敗 if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break; } } catch (Exception e) { // 出錯後是否繼續寫出後面的數據 if (continueOnWriteError()) { in.remove(e); } else { throw e; } } } }
到上面位置,數據是寫出去了,那promise的相關做用呢?沒看出來啊?
說實話,這個藏得挺深,竟然! 放在了 buffer.remove() 裏
!
public boolean remove() { // 剛寫出去數據的Entry Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; // 這個就是writeAndFlush 的時候 new 的 DefaultPromise() ChannelPromise promise = e.promise; int size = e.pendingSize; // buffer 中移除 removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. ReferenceCountUtil.safeRelease(msg); // !!! 劃重點 !!! // 這裏設置了 promise 的結果, 調用了 trySuccess, 通知全部 listener // !!! 劃重點 !!! safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } // recycle the entry // 重置Entry的信息,方便重用. // 跟 Entry entry = Entry.newInstance(msg, size, total(msg), promise); 相對應, newInstance 是獲取一個緩存的 Entry e.recycle(); return true; }
promise 通知全部 listener 是在寫數據成功,而且在 buffer.remove()
調用的時候在裏面 safeSuccess(promise)
, 最終調用 Promise 的 trySuccess()
從而觸發 notifyListeners()
通知全部 listeners.
這個是在 Promise#trySuccess的時候調用的,通知全部listeners操做已經完成.
/** * 通知監聽者,任務已經完成 */ private void notifyListeners() { // 獲取future所屬線程(池) EventExecutor executor = executor(); // 執行通知是當前線程 則直接回調信息 // currentThread == this.executor if (executor.inEventLoop()) { // 獲取 ThreadLocal 變量 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); // listen 的層級數 final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { // 通知全部的 listener notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } // 若是 executor 不是當前線程, 則交給 future 所屬 executor 去執行 // 意思是添加通知的 executor 多是前面的 executor , 而後到後面的 executor 也就是當前線程才執行通知 // 此時將通知交回給以前的 executor // 執行通知的不是當前線程, 封裝成一個任務, 由以前提交的線程完成通知(回調) safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }
Netty 的 Future 異步機制是在操做完成後,將通知封裝成Task,由Promise所屬線程(Executors)執行(回調).
最後的最後,很是感謝大家能看到這裏!!大家的閱讀都是對做者的一次確定!!!
以爲文章有幫助的看官順手點個贊再走唄(終於暴露了我就是來騙讚的(◒。◒)),大家的每一個贊對做者來講都很是重要(異常真實),都是對做者寫做的一次確定(double)!!!
這一篇的內容到這就結束了,期待下一篇 還能有幸碰見你!
See you later!