Netty源碼分析第6章(解碼器)---->第1節: ByteToMessageDecoder

 

Netty源碼分析第六章: 解碼器html

 

概述:ide

 

        在咱們上一個章節遺留過一個問題, 就是若是Server在讀取客戶端的數據的時候, 若是一次讀取不完整, 就觸發channelRead事件, 那麼Netty是如何處理這類問題的, 在這一章中, 會對此作詳細剖析源碼分析

 

        以前的章節咱們學習過pipeline, 事件在pipeline中傳遞, handler能夠將事件截取並對其處理, 而以後剖析的編解碼器, 其實就是一個handler, 截取byteBuf中的字節, 而後組建成業務須要的數據進行繼續傳播學習

 

        編碼器, 一般是OutBoundHandler, 也就是以自身爲基準, 對那些對外流出的數據作處理, 因此也叫編碼器, 將數據通過編碼發送出去編碼

 

        解碼器, 一般是inboundHandler, 也就是以自身爲基準, 對那些流向自身的數據作處理, 因此也叫解碼器, 將對向的數據接收以後通過解碼再進行使用spa

 

        一樣, 在netty的編碼器中, 也會對半包和粘包問題作相應的處理指針

        什麼是半包, 顧名思義, 就是不完整的數據包, 由於netty在輪詢讀事件的時候, 每次將channel中讀取的數據, 不必定是一個完整的數據包, 這種狀況, 就叫半包netty

        粘包一樣也不難理解, 若是client往server發送數據包, 若是發送頻繁頗有可能會將多個數據包的數據都發送到通道中, 若是在server在讀取的時候可能會讀取到超過一個完整數據包的長度, 這種狀況叫粘包code

        有關半包和粘包, 入下圖所示:server

 

6-0-1

         netty對半包的或者粘包的處理其實也很簡單, 經過以前的學習, 咱們知道, 每一個handler是和channel惟一綁定的, 一個handler只對應一個channel, 因此將channel中的數據讀取時候通過解析, 若是不是一個完整的數據包, 則解析失敗, 將這塊數據包進行保存, 等下次解析時再和這個數據包進行組裝解析, 直到解析到完整的數據包, 纔會將數據包進行向下傳遞

 

         具體流程是在代碼中如何體現的呢?咱們進入到源碼分析中

 

第一節: ByteToMessageDecoder

 

 

ByteToMessageDecoder解碼器, 顧名思義, 是一個將Byte解析成消息的解碼器,

咱們看他的定義:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{ //類體省略 }

這裏繼承了ChannelInboundHandlerAdapter, 根據以前的學習, 咱們知道, 這是個inbound類型的handler, 也就是處理流向自身事件的handler

其次, 該類經過abstract關鍵字修飾, 說明是個抽象類, 在咱們實際使用的時候, 並非直接使用這個類, 而是使用其子類, 類定義瞭解碼器的骨架方法, 具體實現邏輯交給子類, 一樣, 在半包處理中也是由該類進行實現的

netty中不少解碼器都實現了這個類, 而且, 咱們也能夠經過實現該類進行自定義解碼器

 

咱們重點關注一下該類的一個屬性:

ByteBuf cumulation;

這個屬性, 就是有關半包處理的關鍵屬性, 從概述中咱們知道, netty會將不完整的數據包進行保存, 這個數據包就是保存在這個屬性中

以前的學習咱們知道, ByteBuf讀取完數據會傳遞channelRead事件, 傳播過程當中會調用handler的channelRead方法, ByteToMessageDecoder的channelRead方法, 就是編碼的關鍵部分

咱們看其channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //若是message是byteBuf類型 if (msg instanceof ByteBuf) { //簡單當成一個arrayList, 用於盛放解析到的對象 CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; //當前累加器爲空, 說明這是第一次從io流裏面讀取數據 first = cumulation == null; if (first) { //若是是第一次, 則將累加器賦值爲剛讀進來的對象 cumulation = data; } else { //若是不是第一次, 則把當前累加的數據和讀進來的數據進行累加 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //調用子類的方法進行解析  callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); } //記錄list長度 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); //向下傳播  fireChannelRead(ctx, out, size); out.recycle(); } } else { //不是byteBuf類型則向下傳播  ctx.fireChannelRead(msg); } }

這方法比較長, 帶你們一步步剖析

首先判斷若是傳來的數據是ByteBuf, 則進入if塊中

 CodecOutputList out = CodecOutputList.newInstance() 這裏就當成一個ArrayList就好, 用於盛放解碼完成的數據

 ByteBuf data = (ByteBuf) msg 這步將數據轉化成ByteBuf

 first = cumulation == null  這裏表示若是cumulation == null, 說明沒有存儲板半包數據, 則將當前的數據保存在屬性cumulation中

若是 cumulation != null , 說明存儲了半包數據, 則經過cumulator.cumulate(ctx.alloc(), cumulation, data)將讀取到的數據和原來的數據進行累加, 保存在屬性cumulation中

咱們看cumulator屬性:

private Cumulator cumulator = MERGE_CUMULATOR;

這裏調用了其靜態屬性MERGE_CUMULATOR, 咱們跟過去:

public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; //不能到過最大內存 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1) { buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } //將當前數據buffer  buffer.writeBytes(in); in.release(); return buffer; } };

這裏建立了Cumulator類型的靜態對象, 並重寫了cumulate方法, 這裏cumulate方法, 就是用於將ByteBuf進行拼接的方法:

方法中, 首先判斷cumulation的寫指針+in的可讀字節數是否超過了cumulation的最大長度, 若是超過了, 將對cumulation進行擴容, 若是沒超過, 則將其賦值到局部變量buffer中

而後將in的數據寫到buffer中, 將in進行釋放, 返回寫入數據後的ByteBuf

回到channelRead方法中:

最後經過callDecode(ctx, cumulation, out)方法進行解碼, 這裏傳入了Context對象, 緩衝區cumulation和集合out:

咱們跟到callDecode(ctx, cumulation, out)方法中:

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { //只要累加器裏面有數據 while (in.isReadable()) { int outSize = out.size(); //判斷當前List是否有對象 if (outSize > 0) { //若是有對象, 則向下傳播事件  fireChannelRead(ctx, out, outSize); //清空當前list  out.clear(); //解碼過程當中如ctx被removed掉就break if (ctx.isRemoved()) { break; } outSize = 0; } //當前可讀數據長度 int oldInputLength = in.readableBytes(); //子類實現 //子類解析, 解析玩對象放到out裏面  decode(ctx, in, out); if (ctx.isRemoved()) { break; } //List解析前大小 和解析後長度同樣(什麼沒有解析出來) if (outSize == out.size()) { //原來可讀的長度==解析後可讀長度 //說明沒有讀取數據(當前累加的數據並無拼成一個完整的數據包) if (oldInputLength == in.readableBytes()) { //跳出循環(下次在讀取數據才能進行後續的解析) break; } else { //沒有解析到數據, 可是進行讀取了 continue; } } //out裏面有數據, 可是沒有從累加器讀取數據 if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } }

這裏首先循環判斷傳入的ByteBuf是否有可讀字節, 若是還有可讀字節說明沒有解碼完成, 則循環繼續解碼

而後判斷集合out的大小, 若是大小大於1, 說明out中盛放了解碼完成以後的數據, 而後將事件向下傳播, 並清空out

由於咱們第一次解碼out是空的, 因此這裏不會進入if塊, 這部分咱們稍後分析, 這裏繼續往下看

經過 int oldInputLength = in.readableBytes() 獲取當前ByteBuf, 其實也就是屬性cumulation的可讀字節數, 這裏就是一個備份用於比較, 咱們繼續往下看:

decode(ctx, in, out)方法是最終的解碼操做, 這部會讀取cumulation而且將解碼後的數據放入到集合out中, 在ByteToMessageDecoder中的該方法是一個抽象方法, 讓子類進行實現, 咱們使用的netty不少的解碼都是繼承了ByteToMessageDecoder並實現了decode方法從而完成了解碼操做, 一樣咱們也能夠遵循相應的規則進行自定義解碼器, 在以後的小節中會講解netty定義的解碼器, 並剖析相關的實現細節, 這裏咱們繼續往下看:

 if (outSize == out.size()) 這個判斷表示解析以前的out大小和解析以後out大小進行比較, 若是相同, 說明並無解析出數據, 咱們進入到if塊中:

 if (oldInputLength == in.readableBytes()) 表示cumulation的可讀字節數在解析以前和解析以後是相同的, 說明解碼方法中並無解析數據, 也就是當前的數據並非一個完整的數據包, 則跳出循環, 留給下次解析, 不然, 說明沒有解析到數據, 可是讀取了, 因此跳過該次循環進入下次循環

最後判斷 if (oldInputLength == in.readableBytes()) , 這裏表明out中有數據, 可是並無從cumulation讀數據, 說明這個out的內容是非法的, 直接拋出異常

 

咱們回到channRead方法中:

咱們關注finally中的內容:

finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); } //記錄list長度 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); //向下傳播  fireChannelRead(ctx, out, size); out.recycle(); }

首先判斷cumulation不爲null, 而且沒有可讀字節, 則將累加器進行釋放, 並設置爲null

以後記錄out的長度, 經過fireChannelRead(ctx, out, size)將channelRead事件進行向下傳播, 並回收out對象

咱們跟到fireChannelRead(ctx, out, size)方法中:

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { //遍歷List for (int i = 0; i < numElements; i ++) { //逐個向下傳遞  ctx.fireChannelRead(msgs.getUnsafe(i)); } }

這裏遍歷out集合, 並將裏面的元素逐個向下傳遞

 

以上就是有關解碼的骨架邏輯

 

上一節: SocketChannel讀取數據的過程

下一節: 固定長度解碼器

相關文章
相關標籤/搜索