Netty源碼分析第七章: 編碼器和寫數據html
概述:promise
上一小章咱們介紹瞭解碼器, 這一章咱們介紹編碼器緩存
其實編碼器和解碼器比較相似, 編碼器也是一個handler, 而且屬於outbounfHandle, 就是將準備發出去的數據進行攔截, 攔截以後進行相應的處理以後再次進發送處理, 若是理解了解碼器, 那麼編碼器的相關內容理解起來也比較容易異步
第一節: writeAndFlush的事件傳播oop
咱們以前在學習pipeline的時候, 講解了write事件的傳播過程, 但在實際使用的時候, 咱們一般不會調用channel的write方法, 由於該方法只會寫入到發送數據的緩存中, 並不會直接寫入channel中, 若是想寫入到channel中, 還須要調用flush方法源碼分析
實際使用過程當中, 咱們用的更多的是writeAndFlush方法, 這方法既能將數據寫到發送緩存中, 也能刷新到channel中學習
咱們看一個最簡單的使用的場景:this
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.channel().writeAndFlush("test data"); }
學過netty的同窗們對此確定不陌生, 經過這種方式, 能夠將數據發送到channel中, 對方能夠收到響應編碼
咱們跟到writeAndFlush方法中, 首先會走到AbstractChannel的writeAndFlush:spa
public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); }
繼續跟到DefualtChannelPipeline中的writeAndFlush方法中:
public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }
這裏咱們看到, writeAndFlush是從tail節點進行傳播, 有關事件傳播, 咱們再pipeline中進行過剖析, 相信這個不會陌生
繼續跟, 會跟到AbstractChannelHandlerContext中的writeAndFlush方法:
public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); }
繼續跟:
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled
return promise; } write(msg, true, promise); return promise; }
繼續跟write方法:
private void write(Object msg, boolean flush, ChannelPromise promise) { //findContextOutbound()尋找前一個outbound節點 //最後到head節點結束
AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調flush
next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
這裏的邏輯咱們也不陌生, 找到下一個節點, 由於writeAndFlush是從tail節點開始的, 而且是outBound的事件, 因此這裏會找到tail節點的上一個outBoundHandler, 有多是編碼器, 也有多是咱們業務處理的handler
if (executor.inEventLoop()) 判斷是不是eventLoop線程, 若是不是, 則封裝成task經過nioEventLoop異步執行, 咱們這裏先按照是eventLoop線程分析
首先, 這裏經過flush判斷是否調用了flush, 這裏顯然是true, 由於咱們調用的方法是writeAndFlush
咱們跟到invokeWriteAndFlush中:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //寫入
invokeWrite0(msg, promise); //刷新
invokeFlush0(); } else { writeAndFlush(msg, promise); } }
這裏就真相大白了, 其實在writeAndFlush中, 首先調用write, write完成以後再調用flush方法進行的刷新
首先跟到invokeWrite0方法中:
private void invokeWrite0(Object msg, ChannelPromise promise) { try { //調用當前handler的wirte()方法
((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
該方法咱們在pipeline中已經進行過度析, 就是調用當前handler的write方法, 若是當前handler中write方法是繼續往下傳播, 在會繼續傳播寫事件, 直到傳播到head節點, 最後會走到HeadContext的write方法中
跟到HeadContext的write方法中:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
這裏經過當前channel的unsafe對象對將當前消息寫到緩存中, 寫入的過程, 咱們以後的小節進行分析
回到到invokeWriteAndFlush方法中:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //寫入
invokeWrite0(msg, promise); //刷新
invokeFlush0(); } else { writeAndFlush(msg, promise); } }
咱們再看invokeFlush0方法:
private void invokeFlush0() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { notifyHandlerException(t); } }
一樣, 這裏會調用當前handler的flush方法, 若是當前handler的flush方法是繼續傳播flush事件, 則flush事件會繼續往下傳播, 直到最後會調用head節點的flush方法, 若是咱們熟悉pipeline的話, 對這裏的邏輯不會陌生
跟到HeadContext的flush方法中:
public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
這裏一樣, 會經過當前channel的unsafe對象經過調用flush方法將緩存的數據刷新到channel中, 有關刷新的邏輯, 咱們會在之後的小節進行剖析
以上就是writeAndFlush的相關邏輯, 總體上比較簡單, 熟悉pipeline的同窗應該很容易理解