Netty源碼分析第4章(pipeline)---->第5節: 傳播outbound事件

 

Netty源碼分析第五章: pipelinehtml

 

第五節: 傳播outBound事件promise

瞭解了inbound事件的傳播過程, 對於學習outbound事件傳輸的流程, 也不會太困難異步

在咱們業務代碼中, 有可能使用wirte方法往寫數據:oop

public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().write("test data"); }

固然, 直接調用write方法是不能往對方channel中寫入數據的, 由於這種方式只能寫入到緩衝區, 還要調用flush方法才能將緩衝區數據刷到channel中, 或者直接調用writeAndFlush方法, 有關邏輯, 咱們會在後面章節中詳細講解, 這裏只是以wirte方法爲例爲了演示outbound事件的傳播的流程源碼分析

這裏咱們一樣給出兩種寫法:學習

public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫法1
    ctx.channel().write("test data"); //寫法2
    ctx.write("test data"); }

這兩種寫法有什麼區別, 咱們首先跟到第一種寫法中去:this

ctx.channel().write("test data");

這裏獲取ctx所綁定的channelspa

咱們跟到AbstractChannel的write方法中:線程

public ChannelFuture write(Object msg) { return pipeline.write(msg); }

這裏pipeline是DefaultChannelPipelinecode

跟到其write方法中:

public final ChannelFuture write(Object msg) { //從tail節點開始(從最後的節點往前寫)
    return tail.write(msg); }

這裏調用tail節點write方法, 這裏咱們應該能分析到, outbound事件, 是經過tail節點開始往上傳播的, 帶着這點猜測, 咱們繼往下看

 

其實tail節點並無重寫write方法, 最終會調用其父類AbstractChannelHandlerContextwrite方法

AbstractChannelHandlerContextwrite方法:

public ChannelFuture write(Object msg) { return write(msg, newPromise()); }

咱們看到這裏有個newPromise()這個方法, 這裏是建立一個Promise對象, 有關Promise的相關知識咱們會在之後的章節剖析

咱們繼續跟write:

public ChannelFuture write(final Object msg, final ChannelPromise promise) { //代碼省略
    write(msg, false, promise); return promise; }

繼續跟write:

private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調flush
 next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }

這裏跟咱們上一小節剖析過channelRead方法有點相似, 可是事件傳輸的方向有所不一樣, 這裏findContextOutbound()是獲取上一個標註outbound事件的HandlerContext

跟到findContextOutbound中:

private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }

這裏的邏輯咱們似曾相識, 跟咱們上一小節的findContextInbound()方法有點像, 只是過程是反過來的

在這裏, 會找到當前context的上一個節點, 若是標註的事件不是outbound事件, 則繼續往上找, 意思就是找到上一個標註outbound事件的節點

 

 

回到write方法:

AbstractChannelHandlerContext next = findContextOutbound();

這裏將找到節點賦值到next屬性中

由於咱們以前分析的write事件是從tail節點傳播的, 因此上一個節點就有多是用戶自定的handler所屬的context

 

而後判斷是否爲當前eventLoop線程, 若是是否是, 則封裝成task異步執行, 若是不是, 則繼續判斷是否調用了flush方法, 由於咱們這裏沒有調用, 因此會執行到next.invokeWrite(m, promise),

咱們繼續跟invokeWrite:

private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } }

這裏會判斷當前handler的狀態是不是添加狀態, 這裏返回的是true, 將會走到invokeWrite0(msg, promise)這一步

繼續跟invokeWrite0:

private void invokeWrite0(Object msg, ChannelPromise promise) { try { //調用當前handler的wirte()方法
        ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }

這裏的邏輯也似曾相識, 調用了當前節點包裝的handlerwrite方法, 若是用戶沒有重寫write方法, 則會交給其父類處理

咱們跟到ChannelOutboundHandlerAdapterwrite方法中看:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }

這裏調用了當前ctxwrite方法, 這種寫法和咱們小節開始的寫法是相同的, 咱們回顧一下:

public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫法1
    ctx.channel().write("test data"); //寫法2
    ctx.write("test data"); }

咱們跟到其write方法中, 這裏走到的是AbstractChannelHandlerContext類的write方法:

private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調flush
 next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }

又是咱們所熟悉邏輯, 找到當前節點的上一個標註事件爲outbound事件的節點, 繼續執行invokeWrite方法, 根據以前的剖析, 咱們知道最終會執行到上一個handlerwrite方法中

走到這裏已經不難理解, ctx.channel().write("test data")實際上是從tail節點開始傳播寫事件, ctx.write("test data")是從自身開始傳播寫事件

 

因此, handler中若是重寫了write方法要傳遞write事件, 必定採用ctx.write("test data")這種方式或者交給其父類處理處理, 而不能採用ctx.channel().write("test data")這種方式, 由於會形成每次事件傳輸到這裏都會從tail節點從新傳輸, 致使不可預知的錯誤

 

 

若是用代碼中沒有重寫handlerwrite方法, 則事件會一直往上傳輸, 當傳輸完全部的outbound節點以後, 最後會走到head節點的wirte方法中

咱們跟到HeadContextwrite方法中:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }

咱們看到write事件最終會流向這裏, 經過unsafe對象進行最終的寫操做

 

有關inbound事件和outbound事件的傳輸, 可經過下圖進行說明:

4-5-1

 

上一節: 傳播inbound事件

下一節: 傳播異常事件

相關文章
相關標籤/搜索