Netty源碼解析3-Pipeline

請戳GitHub原文: github.com/wangzhiwubi…java

更多文章關注:多線程/集合/分佈式/Netty/NIO/RPCgit

Channel實現概覽

在Netty裏,Channel是通信的載體,而ChannelHandler負責Channel中的邏輯處理。github

那麼ChannelPipeline是什麼呢?我以爲能夠理解爲ChannelHandler的容器:一個Channel包含一個ChannelPipeline,全部ChannelHandler都會註冊到ChannelPipeline中,並按順序組織起來。面試

在Netty中,ChannelEvent是數據或者狀態的載體,例如傳輸的數據對應MessageEvent,狀態的改變對應ChannelStateEvent。當對Channel進行操做時,會產生一個ChannelEvent,併發送到ChannelPipeline。ChannelPipeline會選擇一個ChannelHandler進行處理。這個ChannelHandler處理以後,可能會產生新的ChannelEvent,並流轉到下一個ChannelHandler。編程

插播一則廣告

  • 全網惟一一個從0開始幫助Java開發者轉作大數據領域的公衆號~
  • 公衆號大數據技術與架構或者搜索import_bigdata關注,大數據學習路線最新更新,已經有不少小夥伴加入了~

channel pipeline

例如,一個數據最開始是一個MessageEvent,它附帶了一個未解碼的原始二進制消息ChannelBuffer,而後某個Handler將其解碼成了一個數據對象,並生成了一個新的MessageEvent,並傳遞給下一步進行處理。網絡

到了這裏,能夠看到,其實Channel的核心流程位於ChannelPipeline中。因而咱們進入ChannelPipeline的深層夢境裏,來看看它具體的實現。多線程

ChannelPipeline的主流程

Netty的ChannelPipeline包含兩條線路:Upstream和Downstream。Upstream對應上行,接收到的消息、被動的狀態改變,都屬於Upstream。Downstream則對應下行,發送的消息、主動的狀態改變,都屬於Downstream。ChannelPipeline接口包含了兩個重要的方法:sendUpstream(ChannelEvent e)sendDownstream(ChannelEvent e),就分別對應了Upstream和Downstream。架構

對應的,ChannelPipeline裏包含的ChannelHandler也包含兩類:ChannelUpstreamHandlerChannelDownstreamHandler。每條線路的Handler是互相獨立的。它們都很簡單的只包含一個方法:ChannelUpstreamHandler.handleUpstreamChannelDownstreamHandler.handleDownstream併發

Netty官方的javadoc裏有一張圖(ChannelPipeline接口裏),很是形象的說明了這個機制(我對原圖進行了一點修改,加上了ChannelSink,由於我以爲這部分對理解代碼流程會有些幫助):框架

channel pipeline

什麼叫ChannelSink呢?ChannelSink包含一個重要方法ChannelSink.eventSunk,能夠接受任意ChannelEvent。"sink"的意思是"下沉",那麼"ChannelSink"好像能夠理解爲"Channel下沉的地方"?實際上,它的做用確實是這樣,也能夠換個說法:"處於末尾的萬能Handler"。最初讀到這裏,也有些困惑,這麼理解以後,就感受簡單許多。只有Downstream包含ChannelSink,這裏會作一些創建鏈接、綁定端口等重要操做。爲何UploadStream沒有ChannelSink呢?我只能認爲,一方面,不符合"sink"的意義,另外一方面,也沒有什麼處理好作的吧!

這裏有個值得注意的地方:在一條「流」裏,一個ChannelEvent並不會主動的"流"經全部的Handler,而是由上一個Handler顯式的調用ChannelPipeline.sendUp(Down)stream產生,並交給下一個Handler處理。也就是說,每一個Handler接收到一個ChannelEvent,並處理結束後,若是須要繼續處理,那麼它須要調用sendUp(Down)stream新發起一個事件。若是它再也不發起事件,那麼處理就到此結束,即便它後面仍然有Handler沒有執行。這個機制能夠保證最大的靈活性,固然對Handler的前後順序也有了更嚴格的要求。

順便說一句,在Netty 3.x裏,這個機制會致使大量的ChannelEvent對象建立,所以Netty 4.x版本對此進行了改進。twitter的finagle框架實踐中,就提到從Netty 3.x升級到Netty 4.x,能夠大大下降GC開銷。有興趣的能夠看看這篇文章:blog.twitter.com/2013/netty-…

下面咱們從代碼層面來對這裏面發生的事情進行深刻分析,這部分涉及到一些細節,須要打開項目源碼,對照來看,會比較有收穫。

深刻ChannelPipeline內部

DefaultChannelPipeline的內部結構

ChannelPipeline的主要的實現代碼在DefaultChannelPipeline類裏。列一下DefaultChannelPipeline的主要字段:

public class DefaultChannelPipeline implements ChannelPipeline {
    
        private volatile Channel channel;
        private volatile ChannelSink sink;
        private volatile DefaultChannelHandlerContext head;
        private volatile DefaultChannelHandlerContext tail;
        private final Map<String, DefaultChannelHandlerContext> name2ctx =
            new HashMap<String, DefaultChannelHandlerContext>(4);
    }
複製代碼

這裏須要介紹一下ChannelHandlerContext這個接口。顧名思義,ChannelHandlerContext保存了Netty與Handler相關的的上下文信息。而我們這裏的DefaultChannelHandlerContext,則是對ChannelHandler的一個包裝。一個DefaultChannelHandlerContext內部,除了包含一個ChannelHandler,還保存了"next"和"prev"兩個指針,從而造成一個雙向鏈表。

所以,在DefaultChannelPipeline中,咱們看到的是對DefaultChannelHandlerContext的引用,而不是對ChannelHandler的直接引用。這裏包含"head"和"tail"兩個引用,分別指向鏈表的頭和尾。而name2ctx則是一個按名字索引DefaultChannelHandlerContext用戶的一個map,主要在按照名稱刪除或者添加ChannelHandler時使用。

sendUpstream和sendDownstream

前面提到了,ChannelPipeline接口的兩個重要的方法:sendUpstream(ChannelEvent e)sendDownstream(ChannelEvent e)全部事件的發起都是基於這兩個方法進行的。Channels類有一系列fireChannelBound之類的fireXXXX方法,其實都是對這兩個方法的facade包裝。

下面來看一下這兩個方法的實現。先看sendUpstream(對代碼作了一些簡化,保留主邏輯):

public void sendUpstream(ChannelEvent e) {
        DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
        head.getHandler().handleUpstream(head, e);
    }
    
    private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
        DefaultChannelHandlerContext realCtx = ctx;
        while (!realCtx.canHandleUpstream()) {
            realCtx = realCtx.next;
            if (realCtx == null) {
                return null;
            }
        }
        return realCtx;
    }
複製代碼

這裏最終調用了ChannelUpstreamHandler.handleUpstream來處理這個ChannelEvent。有意思的是,這裏咱們看不到任何"將Handler向後移一位"的操做,可是咱們總不能每次都用同一個Handler來進行處理啊?實際上,咱們更爲經常使用的是ChannelHandlerContext.handleUpstream方法(實現是DefaultChannelHandlerContext.sendUpstream方法):

public void sendUpstream(ChannelEvent e) {
		DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
		DefaultChannelPipeline.this.sendUpstream(next, e);
	}
複製代碼

能夠看到,這裏最終仍然調用了ChannelPipeline.sendUpstream方法,可是它會將Handler指針後移

咱們接下來看看DefaultChannelHandlerContext.sendDownstream:

public void sendDownstream(ChannelEvent e) {
		DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
		if (prev == null) {
			try {
				getSink().eventSunk(DefaultChannelPipeline.this, e);
			} catch (Throwable t) {
				notifyHandlerException(e, t);
			}
		} else {
			DefaultChannelPipeline.this.sendDownstream(prev, e);
		}
	}
複製代碼

與sendUpstream好像不大相同哦?這裏有兩點:一是到達末尾時,就如夢境二所說,會調用ChannelSink進行處理;二是這裏指針是往前移的,因此咱們知道了:

**UpstreamHandler是從前日後執行的,DownstreamHandler是從後往前執行的。**在ChannelPipeline裏添加時須要注意順序了!

DefaultChannelPipeline裏還有些機制,像添加/刪除/替換Handler,以及ChannelPipelineFactory等,比較好理解,就不細說了。

回到現實:Pipeline解決的問題

好了,深刻分析完代碼,有點頭暈了,咱們回到最開始的地方,來想想,Netty的Pipeline機制解決了什麼問題?

我認爲至少有兩點:

一是提供了ChannelHandler的編程模型,基於ChannelHandler開發業務邏輯,基本不須要關心網絡通信方面的事情,專一於編碼/解碼/邏輯處理就能夠了。Handler也是比較方便的開發模式,在不少框架中都有用到。

二是實現了所謂的"Universal Asynchronous API"。這也是Netty官方標榜的一個功能。用過OIO和NIO的都知道,這兩套API風格相差極大,要從一個遷移到另外一個成本是很大的。即便是NIO,異步和同步編程差距也很大。而Netty屏蔽了OIO和NIO的API差別,經過Channel提供對外接口,並經過ChannelPipeline將其鏈接起來,所以替換起來很是簡單。

universal API

理清了ChannelPipeline的主流程,咱們對Channel部分的大體結構算是弄清楚了。但是到了這裏,咱們依然對一個鏈接具體怎麼處理沒有什麼概念,下篇文章,咱們會分析一下,在Netty中,捷徑如何處理鏈接的創建、數據的傳輸這些事情。

參考資料:

請戳GitHub原文: https://github.com/wangzhiwubigdata/God-Of-BigData

                   關注公衆號,內推,面試,資源下載,關注更多大數據技術~
                   大數據成神之路~預計更新500+篇文章,已經更新60+篇~ 
複製代碼