Netty 源碼解析(四): Netty 的 ChannelPipeline

今天是猿燈塔「365篇原創計劃」第四篇。java

接下來的時間燈塔君持續更新Netty系列一共九篇面試

Netty 源碼解析(一): 開始服務器

Netty 源碼解析(二): Netty 的 Channel微信

Netty 源碼解析(三): Netty 的 Future 和 Promise併發

當前:Netty 源碼解析(四): Netty 的 ChannelPipelineide

Netty 源碼解析(五): Netty 的線程池分析oop

Netty 源碼解析(六): Channel 的 register 操做源碼分析

Netty 源碼解析(七): NioEventLoop 工做流程this

Netty 源碼解析(八): 回到 Channel 的 register 操做spa

Netty 源碼解析(九): connect 過程和 bind 過程分析

今天呢!燈塔君跟你們講:

Netty 的 ChannelPipeline

ChannelPipeline和Inbound、Outbound

我想不少讀者應該或多或少都有 Netty 中 pipeline 的概念。前面咱們說了,使用 Netty 的時候,咱們一般就只要寫一些自定義的 handler 就能夠了,咱們定義的這些 handler 會組成一個 pipeline,用於處理 IO 事件,這個和咱們平時接觸的 Filter 或 Interceptor 表達的差很少是一個意思。

每一個 Channel 內部都有一個 pipeline,pipeline 由多個 handler 組成,handler 之間的順序是很重要的,由於 IO 事件將按照順序順次通過 pipeline 上的 handler,這樣每一個 handler 能夠專一於作一點點小事,由多個 handler 組合來完成一些複雜的邏輯。

從圖中,咱們知道這是一個雙向鏈表。

首先,咱們看兩個重要的概念:InboundOutbound。在 Netty 中,IO 事件被分爲 Inbound 事件和 Outbound 事件。

Outboundout 指的是 出去,有哪些 IO 事件屬於此類呢?好比 connect、write、flush 這些 IO 操做是往外部方向進行的,它們就屬於 Outbound 事件。

其餘的,諸如 accept、read 這種就屬於 Inbound 事件。

好比客戶端在發起請求的時候,須要 1️⃣connect 到服務器,而後 2️⃣write 數據傳到服務器,再而後 3️⃣read 服務器返回的數據,前面的 connect 和 write 就是 out 事件,後面的 read 就是 in 事件。

好比不少初學者看不懂下面的這段代碼,這段代碼用於服務端的 childHandler 中:

pipeline.addLast(new StringDecoder());

pipeline.addLast(new StringEncoder());

pipeline.addLast(new BizHandler());

初學者確定都納悶,覺得這個順序寫錯了,應該是先 decode 客戶端過來的數據,而後用 BizHandler 處理業務邏輯,最後再 encode 數據而後返回給客戶端,因此添加的順序應該是 1 -> 3 -> 2 纔對。

其實這裏的三個 handler 是分組的,分爲 Inbound(1 和 3) 和 Outbound(2):

  1. pipeline.addLast(new StringDecoder());
  2. pipeline.addLast(new StringEncoder());
  3. pipeline.addLast(new BizHandler());
  • 客戶端鏈接進來的時候,讀取(read)客戶端請求數據的操做是 Inbound 的,e 操做是 Outbound 的,此時使用的是 2。
  • 處理完數據後,返回給客戶端數據的 write 操做是 Outbound 的,此時使用的是 2。

因此雖然添加順序有點怪,可是執行順序實際上是按照 1 -> 3 -> 2 進行的。

若是咱們在上面的基礎上,加上下面的第四行,這是一個 OutboundHandler:

  1. pipeline.addLast(new OutboundHandlerA());

那麼執行順序是否是就是 1 -> 3 -> 2 -> 4 呢?答案是:不是的。

對於 Inbound 操做,按照添加順序執行每一個 Inbound 類型的 handler;而對於 Outbound 操做,是反着來的,從後往前,順次執行 Outbound 類型的 handler。

因此,上面的順序應該是先 1 後 3,它們是 Inbound 的,而後是 4,最後纔是 2,它們兩個是 Outbound 的。說實話,這種組織方式對新手應該非常頭疼。

那咱們在開發的時候怎麼寫呢?其實也很簡單,從最外層開始寫,一步步寫到業務處理層,把 Inbound 和 Outbound 混寫在一塊兒。好比 encode 和 decode 是屬於最外層的處理邏輯,先寫它們。假設 decode 之後是字符串,那再進來一層應該能夠寫進來和出去的日誌。再進來一層能夠寫 字符串 <=> 對象 的相互轉換。而後就應該寫業務層了。

到這裏,我想你們應該都知道 Inbound 和 Outbound 了吧?下面咱們來介紹它們的接口使用。

定義處理 Inbound 事件的 handler 須要實現 ChannelInboundHandler,定義處理 Outbound 事件的 handler 須要實現 ChannelOutboundHandler。最下面的三個類,是 Netty 提供的適配器,特別的,若是咱們但願定義一個 handler 能同時處理 Inbound 和 Outbound 事件,能夠經過繼承中間的 ChannelDuplexHandler 的方式,好比 LoggingHandler 這種既能夠用來處理 Inbound 也能夠用來處理 Outbound 事件的 handler。

有了 Inbound 和 Outbound 的概念之後,咱們來開始介紹 Pipeline 的源碼。

咱們說過,一個 Channel 關聯一個 pipeline,NioSocketChannel 和 NioServerSocketChannel 在執行構造方法的時候,都會走到它們的父類 AbstractChannel 的構造方法中:

`protected AbstractChannel(Channel parent) {

this.parent = parent;

// 給每一個 channel 分配一個惟一 id

id = newId();

// 每一個 channel 內部須要一個 Unsafe 的實例

unsafe = newUnsafe();

// 每一個 channel 內部都會建立一個 pipeline

pipeline = newChannelPipeline();

}`

上面的三行代碼中,id 比較不重要,Netty 中的 Unsafe 實例其實挺重要的,這裏簡單介紹一下。

在 JDK 的源碼中,sun.misc.Unsafe 類提供了一些底層操做的能力,它設計出來是給 JDK 中的源碼使用的,好比 AQS、ConcurrentHashMap 等,咱們在以前的併發包的源碼分析中也看到了不少它們使用 Unsafe 的場景,這個 Unsafe 類不是給咱們的代碼使用的,是給 JDK 源碼使用的(須要的話,咱們也是能夠獲取它的實例的)。

Unsafe 類的構造方法是 private 的,可是它提供了 getUnsafe() 這個靜態方法:

Unsafe unsafe = Unsafe.getUnsafe();

你們能夠試一下,上面這行代碼編譯沒有問題,可是執行的時候會拋 java.lang.SecurityException 異常,由於它就不是給咱們的代碼用的。

可是若是你就是想獲取 Unsafe 的實例,能夠經過下面這個代碼獲取到:

Field f = Unsafe.class.getDeclaredField("theUnsafe");

f.setAccessible(true);

Unsafe unsafe = (Unsafe) f.get(null);

Netty 中的 Unsafe 也是一樣的意思,它封裝了 Netty 中會使用到的 JDK 提供的 NIO 接口,好比將 channel 註冊到 selector 上,好比 bind 操做,好比 connect 操做等,這些操做都是稍微偏底層一些。Netty 一樣也是不但願咱們的業務代碼使用 Unsafe 的實例,它是提供給 Netty 中的源碼使用的。

不過,對於咱們源碼分析來講,咱們仍是會有不少時候須要分析 Unsafe 中的源碼的

關於 Unsafe,咱們後面用到了再說,這裏只要知道,它封裝了大部分須要訪問 JDK 的 NIO 接口的操做就行了。這裏咱們繼續將焦點放在實例化 pipeline 上:

`protected DefaultChannelPipeline newChannelPipeline() {

return new DefaultChannelPipeline(this);

}`

這裏開始調用 DefaultChannelPipeline 的構造方法,並把當前 channel 的引用傳入:

`protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}`

這裏實例化了 tail 和 head 這兩個 handler。tail 實現了 ChannelInboundHandler 接口,

而 head 實現了 ChannelOutboundHandler 和 ChannelInboundHandler 兩個接口,

而且最後兩行代碼將 tail 和 head 鏈接起來:複製代碼

注意,在不一樣的版本中,源碼也略有差別,head 不必定是 in + out,你們知道這點就行了。

還有,從上面的 head 和 tail 咱們也能夠看到,其實 pipeline 中的每一個元素是 ChannelHandlerContext 的實例,而不是 ChannelHandler 的實例,context 包裝了一下 handler,可是,後面咱們都會用 handler 來描述一個 pipeline 上的節點,而不是使用 context,但願讀者知道這一點。

這裏只是構造了 pipeline,而且添加了兩個固定的 handler 到其中(head + tail),還不涉及到自定義的 handler 代碼執行。咱們回過頭來看下面這段代碼:

咱們說過 childHandler 中指定的 handler 不是給 NioServerSocketChannel 使用的,是給 NioSocketChannel 使用的,因此這裏咱們不看它。

這裏調用 handler(…) 方法指定了一個 LoggingHandler 的實例,而後咱們再進去下面的 bind(…) 方法中看看這個 LoggingHandler 實例是怎麼進入到咱們以前構造的 pipeline 內的。

順着 bind() 一直往前走,bind() -> doBind() -> initAndRegister():

`final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 構造 channel 實例,同時會構造 pipeline 實例,
// 如今 pipeline 中有 head 和 tail 兩個 handler 了
channel = channelFactory.newChannel();
// 2. 看這裏
init(channel);
} catch (Throwable t) {
......
}`

上面的兩行代碼,第一行實現了構造 channel 和 channel 內部的 pipeline,咱們來看第二行 init 代碼:

`// ServerBootstrap:
@Override
void init(Channel channel) throws Exception {
......
// 拿到剛剛建立的 channel 內部的 pipeline 實例
ChannelPipeline p = channel.pipeline();
...
// 開始往 pipeline 中添加一個 handler,這個 handler 是 ChannelInitializer 的實例
p.addLast(new ChannelInitializer<Channel>() {
// 咱們之後會看到,下面這個 initChannel 方法什麼時候會被調用
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 這個方法返回咱們最開始指定的 LoggingHandler 實例
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加 LoggingHandler
pipeline.addLast(handler);
}
// 先不用管這裏的 eventLoop
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加一個 handler 到 pipeline 中:ServerBootstrapAcceptor
// 從名字能夠看到,這個 handler 的目的是用於接收客戶端請求
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
`
這裏涉及到 pipeline 中的輔助類 ChannelInitializer,咱們看到,它自己是一個 handler(Inbound 類型),可是它的做用和普通 handler 有點不同,它純碎是用來輔助將其餘的 handler 加入到 pipeline 中的。

你們能夠稍微看一下 ChannelInitializer 的 initChannel 方法,有個簡單的認識就好,此時的 pipeline 應該是這樣的:

ChannelInitializer 的 initChannel(channel) 方法被調用的時候,會往 pipeline 中添加咱們最開始指定的 LoggingHandler 和添加一個 ServerBootstrapAcceptor。可是咱們如今還不知道這個 initChannel 方法什麼時候會被調用。

上面咱們說的是做爲服務端的 NioServerSocketChannel 的 pipeline,NioSocketChannel 也是差很少的,咱們能夠看一下 Bootstrap 類的 init(channel) 方法:

`void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
...
}`

它和服務端 ServerBootstrap 要添加 ServerBootstrapAcceptor 不同,它只須要將 EchoClient 類中的 ChannelInitializer 實例加進來就能夠了,它的 ChannelInitializer 中添加了兩個 handler,LoggingHandler 和 EchoClientHandler:

很顯然,咱們須要的是像 LoggingHandler 和 EchoClientHandler 這樣的 handler,可是,它們如今還不在 pipeline 中,那麼它們何時會真正進入到 pipeline 中呢?之後咱們再揭曉。

還有,爲何 Server 端咱們指定的是一個 handler 實例,而 Client 指定的是一個 ChannelInitializer 實例?其實它們是能夠隨意搭配使用的,你甚至能夠在 ChannelInitializer 實例中添加 ChannelInitializer 的實例。

很是抱歉,這裏又要斷了,下面要先介紹線程池了,你們要記住 pipeline 如今的樣子,head + channelInitializer + tail

本節沒有介紹 handler 的向後傳播,就是一個 handler 處理完了之後,怎麼傳遞給下一個 handler 來處理?好比咱們熟悉的 JavaEE 中的 Filter 是採用在一個 Filter 實例中調用 chain.doFilter(request, response) 來傳遞給下一個 Filter 這種方式的。

咱們用下面這張圖結束本節。下圖展現了傳播的方法,但我實際上是更想讓你們看一下,哪些事件是 Inbound 類型的,哪些是 Outbound 類型的:

Outbound 類型的幾個事件你們應該比較好認,注意 bind 也是 Outbound 類型的。

365天干貨不斷微信搜索「猿燈塔」第一時間閱讀,回覆【資料】【面試】【簡歷】有我準備的一線大廠面試資料和簡歷模板
相關文章
相關標籤/搜索