Netty源碼分析第7章(編碼器和寫數據)---->第1節: writeAndFlush的事件傳播

 

Netty源碼分析第七章: 編碼器和寫數據html

 

 

概述:promise

 

        上一小章咱們介紹瞭解碼器, 這一章咱們介紹編碼器緩存

        其實編碼器和解碼器比較相似, 編碼器也是一個handler, 而且屬於outbounfHandle, 就是將準備發出去的數據進行攔截, 攔截以後進行相應的處理以後再次進發送處理, 若是理解了解碼器, 那麼編碼器的相關內容理解起來也比較容易異步

 

第一節: writeAndFlush的事件傳播oop

 

咱們以前在學習pipeline的時候, 講解了write事件的傳播過程, 但在實際使用的時候, 咱們一般不會調用channelwrite方法, 由於該方法只會寫入到發送數據的緩存中, 並不會直接寫入channel, 若是想寫入到channel, 還須要調用flush方法源碼分析

實際使用過程當中, 咱們用的更多的是writeAndFlush方法, 這方法既能將數據寫到發送緩存中, 也能刷新到channel學習

 

咱們看一個最簡單的使用的場景:this

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.channel().writeAndFlush("test data"); }

學過netty的同窗們對此確定不陌生, 經過這種方式, 能夠將數據發送到channel, 對方能夠收到響應編碼

 

咱們跟到writeAndFlush方法中, 首先會走到AbstractChannelwriteAndFlush:spa

public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); }

繼續跟到DefualtChannelPipeline中的writeAndFlush方法中:

public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }

這裏咱們看到, writeAndFlush是從tail節點進行傳播, 有關事件傳播, 咱們再pipeline中進行過剖析, 相信這個不會陌生

繼續跟, 會跟到AbstractChannelHandlerContext中的writeAndFlush方法:

public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); }

繼續跟:

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled
        return promise; } write(msg, true, promise); return promise; }

繼續跟write方法:

private void write(Object msg, boolean flush, ChannelPromise promise) { //findContextOutbound()尋找前一個outbound節點 //最後到head節點結束
    AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調flush
 next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }

這裏的邏輯咱們也不陌生, 找到下一個節點, 由於writeAndFlush是從tail節點開始的, 而且是outBound的事件, 因此這裏會找到tail節點的上一個outBoundHandler, 有多是編碼器, 也有多是咱們業務處理的handler

 if (executor.inEventLoop()) 判斷是不是eventLoop線程, 若是不是, 則封裝成task經過nioEventLoop異步執行, 咱們這裏先按照是eventLoop線程分析

首先, 這裏經過flush判斷是否調用了flush, 這裏顯然是true, 由於咱們調用的方法是writeAndFlush

 

咱們跟到invokeWriteAndFlush:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //寫入
 invokeWrite0(msg, promise); //刷新
 invokeFlush0(); } else { writeAndFlush(msg, promise); } }

這裏就真相大白了, 其實在writeAndFlush, 首先調用write, write完成以後再調用flush方法進行的刷新

首先跟到invokeWrite0方法中:

private void invokeWrite0(Object msg, ChannelPromise promise) { try { //調用當前handler的wirte()方法
        ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }

該方法咱們在pipeline中已經進行過度析, 就是調用當前handlerwrite方法, 若是當前handlerwrite方法是繼續往下傳播, 在會繼續傳播寫事件, 直到傳播到head節點, 最後會走到HeadContextwrite方法中

HeadContextwrite方法中:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }

 

這裏經過當前channelunsafe對象對將當前消息寫到緩存中, 寫入的過程, 咱們以後的小節進行分析

 

回到到invokeWriteAndFlush方法中:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //寫入
 invokeWrite0(msg, promise); //刷新
 invokeFlush0(); } else { writeAndFlush(msg, promise); } }

咱們再看invokeFlush0方法:

private void invokeFlush0() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { notifyHandlerException(t); } }

一樣, 這裏會調用當前handlerflush方法, 若是當前handlerflush方法是繼續傳播flush事件, flush事件會繼續往下傳播, 直到最後會調用head節點的flush方法, 若是咱們熟悉pipeline的話, 對這裏的邏輯不會陌生

 

跟到HeadContextflush方法中:

public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }

這裏一樣, 會經過當前channelunsafe對象經過調用flush方法將緩存的數據刷新到channel, 有關刷新的邏輯, 咱們會在之後的小節進行剖析

 

以上就是writeAndFlush的相關邏輯, 總體上比較簡單, 熟悉pipeline的同窗應該很容易理解

 

上一節: 分隔符解碼器

下一節: MessageToByteEncoder

相關文章
相關標籤/搜索