上一篇,咱們分析了NioEventLoop及其相關類的主幹邏輯代碼,咱們知道netty採用線程封閉的方式來避免多線程之間的資源競爭,最大限度地減小併發問題,減小鎖的使用,於是可以有效減低線程切換的開銷,減小cpu的使用時間。此外,咱們還簡單分析了netty對於線程組的封裝EventLoopGroup,目前通常採用roundRobin的方式在多個線程上均勻地分配channel。經過前面幾篇文章的分析,咱們已經對channel的初始化,註冊到EventLoop上,SingleThreadEventLoop的線程啓動過程以及線程中運行的代碼邏輯有了一些瞭解,此外咱們也分析了用於處理基於TCP協議的io事件的NioEventLoop類的具體的循環邏輯,經過對代碼的詳細分析,咱們瞭解了對於connect,write,read,accept事件的不一樣處理邏輯,可是對於write和read事件的處理邏輯咱們並無分析的很詳細,由於這些事件的處理涉及到netty中另外一個很重要的模塊,ChannelPipeline以及一系列相關的類如Channel, ChannelHandler, ChannelhandlerContext等的理解,netty中的事件處理採用了經典的責任鏈(responsbility chain)的的設計模式,這種設計模式使得netty的io事件處理框架易於擴展,而且爲業務邏輯提供了一個很好的抽象模型,大大下降了netty的使用難度,使得io事件的處理變得更符合思惟習慣。
好了,廢話了那麼多,其實主要是想把前面分析的幾篇的文章作一個小結和回顧,而後引出本篇的主題--netty的io事件處理鏈模式。
由於netty的代碼結構相對來講仍是很規整,它的模塊之間的邊界劃分比較明確,EventLoop做爲io事件的「發源地」,與其交互的對象是Channel類,而ChannelPipeline,ChannelhandlerContext, ChannelHandler等幾個類則是與Channel交互,他們並不直接與EventLoop交互。git
首先每個Channel在初始化的時候就會建立一個ChannelPipeline,這點咱們在前面分析NioSocketChannel的初始化時也分析到了。目前ChannelPipeline的實現只有DefaultChannelPipeline一種,因此咱們也以DefaultChannelPipeline來分析。DefaultChannelPipeline內部有一個雙向鏈表結構,這個鏈表的每一個節點都是一個AbstractChannelHandlerContext類型的節點,DefaultChannelPipeline剛初始化時就會建立兩個初始節點,分別是HeadContext和TailContext,這兩個節點也並不徹底是標記節點,他們都有各自實際的做用,github
下面,咱們以兩個最重要的事件讀事件和寫事件,來分析netty的這種鏈式處理結構究竟是怎麼運轉的。設計模式
首先,咱們須要找到一個產生讀事件並調用相關方法使得讀事件開始傳遞的例子,很天然咱們應該想到在EventLoop中會產生讀事件。
以下,就是NioEventLoop中對於讀事件的處理,經過調用NioUnsafe.read方法promise
// 處理read和accept事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
咱們繼續看NioByteUnsafe.read方法,這個方法咱們以前在分析NioEventLoop事件處理邏輯時提到過,這個方法首先會經過緩衝分配器分配一個緩衝,而後從channel(也就是socket)中將數據讀到緩衝中,每讀一個緩衝,就會觸發一個讀事件,咱們看具體的觸發讀事件的調用:多線程
do { // 分配一個緩衝 byteBuf = allocHandle.allocate(allocator); // 將通道的數據讀取到緩衝中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 若是沒有讀取到數據,說明通道中沒有待讀取的數據了, if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. // 由於沒讀取到數據,因此應該釋放緩衝 byteBuf.release(); byteBuf = null; // 若是讀取到的數據量是負數,說明通道已經關閉了 close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } // 更新Handle內部的簿記量 allocHandle.incMessagesRead(1); readPending = false; // 向channel的處理器流水線中觸發一個事件, // 讓取到的數據可以被流水線上的各個ChannelHandler處理 pipeline.fireChannelRead(byteBuf); byteBuf = null; // 這裏根據以下條件判斷是否繼續讀: // 上一次讀取到的數據量大於0,而且讀取到的數據量等於分配的緩衝的最大容量, // 此時說明通道中還有待讀取的數據 } while (allocHandle.continueReading());
爲了代碼邏輯的完整性,我這裏把整個循環的代碼都貼上來,其實咱們要關注的僅僅是pipeline.fireChannelRead(byteBuf)這一句,好了,如今咱們找到ChannelPipeline觸發讀事件的入口方法,咱們順着這個方法,順藤摸瓜就能一步步理清事件的傳遞過程了。併發
若是咱們看一下ChannelPipeline接口,這裏面的方法名都是以fire開頭的,實際就是想表達這些方法都是觸發了一個事件,而後這個事件就會在內部的處理器鏈表中傳遞。
咱們看到這裏調用了一個靜態方法,而且以頭結點爲參數,也就是說事件傳遞是從頭結點開始的。框架
public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
能夠看到,這個方法中經過調用invokeChannelRead執行處理邏輯異步
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { // 維護引用計數,主要是爲了偵測資源泄漏問題 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // 調用invokeChannelRead執行處理邏輯 next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這裏能夠看到,AbstractChannelHandlerContext經過本身內部的handler對象來實現讀數據的邏輯。這也體現了ChannelHandlerContext在整個結構中的做用,其實它是起到了在ChannelPipeline和handler之間的一箇中間人的角色,那咱們要問:既然ChannelHandlerContext不起什麼實質性的做用,那爲何要多這一個中間層呢,這樣設計的好處是什麼?我認爲這樣設計實際上是爲了盡最大可能對使用者屏蔽netty框架的細節,試想若是沒有這個context的中間角色,使用者必然要詳細地瞭解ChannelPipeline,而且還要考慮事件傳遞是找下一個節點,還要考慮下一個節點應該沿着鏈表的正序找仍是沿着鏈表 倒敘找,因此這裏ChannelHandlerContext的角色我認爲最大的做用就是封裝了鏈表的邏輯,而且封裝了不一樣類型操做的傳播方式。固然也起到了一些引用傳遞的做用,如channel引用能夠簡介地傳遞給用戶。
好了,回到正題,從前面的方法中咱們知道讀事件最早是從HeadContext節點開始的,因此咱們看一下HeadContext的channelRead方法(由於HeadContext也實現了handler方法,而且返回的就是自身)socket
private void invokeChannelRead(Object msg) { // 若是這個handler已經準備就緒,那麼就執行處理邏輯 // 不然將事件傳遞給下一個處理器節點 if (invokeHandler()) { try { // 調用內部的handler的channelRead方法 ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
這裏的調用也是一個重要的注意點,這裏調用了ChannelHandlerContext.fireChannelRead方法,這正是事件傳播的方法,fire開頭的方法的做用就是將當前的操做(或者叫事件)從當前處理節點傳遞給下一個處理節點。這樣就實現了事件在鏈表中的傳播。ide
public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); }
到這裏咱們先暫停一下,總結一下讀事件(或者是讀操做)在ChannelPipeline內部的傳播機制,其實很簡單,
此外,咱們分析一下寫數據的操做是怎麼傳播的。分析寫數據操做的入口並不想讀事件那麼好找,在netty中用戶的代碼中寫數據最終都是被放到內部的緩衝中,當NioEventLoop中監聽到底層的socket能夠寫數據的事件時,其實是吧當前緩衝中的數據發送到socket中,而對於用戶來說,是接觸不到socketChannel這一層的。
根據前面的分析,咱們知道,用戶通常都會與Channel,ChannelHandler, ChannelhandlerContext這幾種類打交道,寫數據的操做也是經過Channel的write和writeAndFlush觸發的,這兩個方法區別在於writeAndFlush在寫完數據後還會觸發一次刷寫操做,將緩衝中的數據實際寫入到socket中。
仍然是將操做交給內部的ChannelPipeline,觸發流水線操做
public ChannelFuture write(Object msg, ChannelPromise promise) { return pipeline.write(msg, promise); }
這裏能夠很清楚地看出來,寫數據的操做從爲節點開始,可是TailContext並未重寫write方法,因此最終調用的仍是AbstractChannelHandlerContext中的相應方法。
咱們沿着調用鏈往下走,發現write系列的方法實際上是將寫操做傳遞給了下一個ChannelOutboundHandler類型的處理節點,注意這裏是從尾節點向前找,遍歷鏈表的順序和讀數據正好相反。
真正調用
public final ChannelFuture write(Object msg, ChannelPromise promise) { return tail.write(msg, promise); }
從這個方法能夠明顯地看出來,write方法將寫操做交給了下一個ChannelOutboundHandler類型的處理器節點。
private void write(Object msg, boolean flush, ChannelPromise promise) { ObjectUtil.checkNotNull(msg, "msg"); try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } // 沿着鏈表向前遍歷,找到下一個ChannelOutboundHandler類型的處理器節點 final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { // 調用AbstractChannelHandlerContext.invokeWriteAndFlush方法執行真正的寫入邏輯 next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { // 若是當前是異步地寫入數據,那麼須要將寫入的邏輯封裝成一個任務添加到EventLoop的任務對隊列中 final AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } }
咱們接着看invokeWrite0方法
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } }
這裏能夠清楚地看到,最終是調用了handler的write方法執行真正的寫入邏輯,這個邏輯實際上就是有用戶本身實現的。
private void invokeWrite0(Object msg, ChannelPromise promise) { try { // 調用當前節點的handler的write方法執行真正的寫入邏輯 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
到這裏,咱們已經知道寫入的操做是怎麼從尾節點開始,也知道了經過調用當前處理節點的AbstractChannelHandlerContext.write方法能夠將寫入操做傳遞給下一個節點,那麼數據通過層層傳遞後,最終是怎麼寫到socket中的呢?回答這個問題,咱們須要看一下HeadContext的代碼!咱們知道寫入的操做是從尾節點向前傳遞的,那麼頭節點HeadContext就是傳遞的最後一個節點。
最終調用了unsafe.write方法。
在AbstractChannel.AbstractUnsafe的實現中,write方法將通過前面一系列處理器處理過的數據存放到內部的緩衝中。
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { unsafe.write(msg, promise); }
前面咱們提到,寫數據的操做除了write還有writeAndFlush,這個操做除了寫數據,還會緊接着執行一次刷寫操做。刷寫操做也會從尾節點向前傳遞,最終傳遞到頭結點HeadContext,其中的flush方法以下:
public void flush(ChannelHandlerContext ctx) { unsafe.flush(); }
在AbstractChannel.AbstractUnsafe的實現中,flush操做會將前面存儲在內部緩衝區中的數據吸入到socket中,從而完成刷寫。
本節,咱們主要經過io事件處理中最重要的兩種事件,即讀事件和寫事件爲切入點 詳細分析了netty中對於這兩種事件的處理方法。其中寫數據的事件與咱們以前在jdk nio中創建起的印象差異仍是不大的,都是對從socket中讀取的數據進行處理,可是寫事件跟jdk nio中的概念就有較大差異了,由於netty對數據的寫入作了很大的改變和優化,用戶代碼中經過channel調用相關的寫數據的方法,這個方法會觸發處理器鏈條上的全部相關的處理器對待寫入的數據進行加工,最後在頭結點HeadCOntext中被寫入channel內部的緩衝區,經過flush操做將緩衝的數據寫入socket中。 這裏面最重要的也是最值得咱們學習的一點就是責任鏈模式,顯然,這又是一次對責任鏈模式的成功運用,是的框架的擴展性大大加強,並且面向用戶的接口更加容易理解,簡單易用,向用戶屏蔽了大部分框架實現細節。