Netty源碼分析第四章: pipelinehtml
第四節: 傳播inbound事件ide
有關於inbound事件, 在概述中作過簡單的介紹, 就是以本身爲基準, 流向本身的事件, 好比最多見的channelRead事件, 就是對方發來數據流的所觸發的事件, 己方要對這些數據進行處理, 這一小節, 以激活channelRead爲例講解有關inbound事件的處理流程oop
在業務代碼中, 咱們本身的handler每每會經過重寫channelRead方法來處理對方發來的數據, 那麼對方發來的數據是如何走到channelRead方法中了呢, 也是咱們這一小節要剖析的內容源碼分析
在業務代碼中, 傳遞channelRead事件方式是經過fireChannelRead方法進行傳播的this
這裏給你們看兩種寫法:spa
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1:
ctx.fireChannelRead(msg); //寫法2
ctx.pipeline().fireChannelRead(msg); }
這裏重寫了channelRead方法, 而且方法體內繼續經過fireChannelRead方法進行傳播channelRead事件, 那麼這兩種寫法有什麼異同?線程
咱們先以寫法2爲例, 將這種寫法進行剖析debug
這裏首先獲取當前context的pipeline對象, 而後經過pipeline對象調用自身的fireChannelRead方法進行傳播, 由於默認建立的DefaultChannelpipelinecode
咱們跟到DefaultChannelpipeline的fireChannelRead方法中:htm
public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
這裏首先調用的是AbstractChannelHandlerContext類的靜態方法invokeChannelRead, 參數傳入head節點和事件的消息
咱們跟進invokeChannelRead方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這裏的Object m m一般就是咱們傳入的msg, 而next, 目前是head節點, 而後再判斷是否爲當前eventLoop線程, 若是不是則將方法包裝成task交給eventLoop線程處理
咱們跟到invokeChannelRead方法中:
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
首先經過invokeHandler()判斷當前handler是否已添加, 若是添加, 則執行當前handler的chanelRead方法, 其實這裏咱們基本上就明白了, 經過fireChannelRead方法傳遞事件的過程當中, 其實就是找到相關handler執行其channelRead方法, 因爲咱們在這裏的handler就是head節點, 因此咱們跟到HeadContext的channelRead方法中:
HeadContext的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //向下傳遞channelRead事件
ctx.fireChannelRead(msg); }
在這裏咱們看到, 這裏經過fireChannelRead方法繼續往下傳遞channelRead事件, 而這種調用方式, 就是咱們剛纔分析用戶代碼的第一種調用方式:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1:
ctx.fireChannelRead(msg); //寫法2
ctx.pipeline().fireChannelRead(msg); }
這裏直接經過context對象調用fireChannelRead方法, 那麼和使用pipeline調用有什麼區別的, 我會回到HeadConetx的channelRead方法, 咱們來剖析ctx.fireChannelRead(msg)這句, 你們就會對這個問題有答案了, 跟到ctx的fireChannelRead方法中, 這裏會走到AbstractChannelHandlerContext類中的fireChannelRead方法中
跟到AbstractChannelHandlerContext類中的fireChannelRead方法:
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
這裏咱們看到, invokeChannelRead方法中傳入了一個findContextInbound()參數, 而這findContextInbound方法其實就是找到當前Context的下一個節點
跟到findContextInbound方法:
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
這裏的邏輯也比較簡單, 是經過一個doWhile循環, 找到當前handlerContext的下一個節點, 這裏要注意循環的終止條件, while (!ctx.inbound)表示下一個context標誌的事件不是inbound的事件, 則循環繼續往下找, 言外之意就是要找到下一個標註inbound事件的節點
有關事件的標註, 以前的小節已經剖析過了, 若是是用戶定義的handler, 是經過handler繼承的接口而定的, 若是tail或者head, 那麼是在初始化的時候就已經定義好, 這裏再也不贅述
回到AbstractChannelHandlerContext類的fireChannelRead方法中:
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
找到下一個節點後, 繼續調用invokeChannelRead方法, 傳入下一個和消息對象:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); //第一次執行next其實就是head
EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這裏的邏輯咱們又不陌生了, 由於咱們傳入的是當前context的下一個節點, 因此這裏會調用下一個節點invokeChannelRead方法, 因咱們剛纔剖析的是head節點, 因此下一個節點有多是用戶添加的handler的包裝類HandlerConext的對象
這裏咱們跟進invokeChannelRead方法中去:
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { //發生異常的時候在這裏捕獲異常
notifyHandlerException(t); } } else { fireChannelRead(msg); } }
又是咱們熟悉的邏輯, 調用了自身handler的channelRead方法, 若是是用戶自定義的handler, 則會走到用戶定義的channelRead()方法中去, 因此這裏就解釋了爲何經過傳遞channelRead事件, 最終會走到用戶重寫的channelRead方法中去
一樣, 也解釋了該小節最初提到過的兩種寫法的區別:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1:
ctx.fireChannelRead(msg); //寫法2
ctx.pipeline().fireChannelRead(msg); }
寫法1是經過當前節點往下傳播事件
寫法2是經過頭節點往下傳遞事件
因此, 在handler中若是若是要在channelRead方法中傳遞channelRead事件, 必定要採用寫法2的方式向下傳遞, 或者交給其父類處理, 若是採用1的寫法則每次事件傳輸到這裏都會繼續從head節點傳輸, 從而陷入死循環或者發生異常
這裏有一點須要注意, 若是用戶代碼中channelRead方法, 若是沒有顯示的調用ctx.fireChannelRead(msg)那麼事件則不會再往下傳播, 則事件會在這裏終止, 因此若是咱們寫業務代碼的時候要考慮有關資源釋放的相關操做
若是ctx.fireChannelRead(msg)則事件會繼續往下傳播, 若是每個handler都向下傳播事件, 固然, 根據咱們以前的分析channelRead事件只會在標識爲inbound事件的HandlerConetext中傳播, 傳播到最後, 則最終會調用到tail節點的channelRead方法
咱們跟到tailConext的channelRead方法中:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); }
咱們跟進到onUnhandledInboundMessage方法中:
protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg); } finally { //釋放資源
ReferenceCountUtil.release(msg); } }
這裏作了釋放資源的相關的操做
至此, channelRead事件傳輸相關羅輯剖析完整, 其實對於inbound事件的傳輸流程都會遵循這一邏輯, 小夥伴們能夠自行剖析其餘inbound事件的傳輸流程