Netty 源碼深度解析(九) - 編碼

概述

一個問題



編碼器實現了ChannelOutboundHandler,並將出站數據從 一種格式轉換爲另外一種格式,和咱們方纔學習的解碼器的功能正好相反。Netty 提供了一組類, 用於幫助你編寫具備如下功能的編碼器:java

  • 將消息編碼爲字節
  • 將消息編碼爲消息
    咱們將首先從抽象基類 MessageToByteEncoder 開始來對這些類進行考察
1 抽象類 MessageToByteEncoder

MessageToByteEncoder API
解碼器一般須要在Channel關閉以後產生最後一個消息(所以也就有了 decodeLast()方法)
這顯然不適於編碼器的場景——在鏈接被關閉以後仍然產生一個消息是毫無心義的react

1.1 ShortToByteEncoder

其接受一Short 型實例做爲消息,編碼爲Short的原子類型值,並寫入ByteBuf,隨後轉發給ChannelPipeline中的下一個 ChannelOutboundHandler
每一個傳出的 Short 值都將會佔用 ByteBuf 中的 2 字節
ShortToByteEncoder
編程

1.2 Encoder

Netty 提供了一些專門化的 MessageToByteEncoder,可基於此實現本身的編碼器
WebSocket08FrameEncoder類提供了一個很好的實例
promise

2 抽象類 MessageToMessageEncoder

你已經看到了如何將入站數據從一種消息格式解碼爲另外一種
爲了完善這幅圖,將展現 對於出站數據將如何從一種消息編碼爲另外一種。MessageToMessageEncoder類的 encode()方法提供了這種能力
MessageToMessageEncoderAPI
爲了演示,使用IntegerToStringEncoder 擴展了 MessageToMessageEncoder緩存

  • 編碼器將每一個出站 Integer 的 String 表示添加到了該 List 中

    IntegerToStringEncoder的設計

關於有趣的 MessageToMessageEncoder 的專業用法,請查看 io.netty.handler. codec.protobuf.ProtobufEncoder類,它處理了由 Google 的 Protocol Buffers 規範所定義 的數據格式。markdown

一個java對象最後是如何轉變成字節流,寫到socket緩衝區中去的

pipeline中的標準鏈表結構
java對象編碼過程
write:寫隊列
flush:刷新寫隊列
writeAndFlush: 寫隊列並刷新數據結構

pipeline中的標準鏈表結構

標準的pipeline鏈式結構
數據從head節點流入,先拆包,而後解碼成業務對象,最後通過業務Handler處理,調用write,將結果對象寫出去
而寫的過程先經過tail節點,而後經過encoder節點將對象編碼成ByteBuf,最後將該ByteBuf對象傳遞到head節點,調用底層的Unsafe寫到JDK底層管道併發

Java對象編碼過程

爲何咱們在pipeline中添加了encoder節點,java對象就轉換成netty能夠處理的ByteBuf,寫到管道里?socket

咱們先看下調用write的code

業務處理器接受到請求以後,作一些業務處理,返回一個useride

  • 而後,user在pipeline中傳遞
    AbstractChannel#
    DefaultChannelPipeline#
    AbstractChannelHandlerContext#
    AbstractChannelHandlerContext#

  • 情形一
    AbstractChannelHandlerContext#
    AbstractChannelHandlerContext#

  • 情形二
    AbstractChannelHandlerContext#

    AbstractChannelHandlerContext#invokeWrite0
    AbstractChannelHandlerContext#invokeFlush0
    handler 若是不覆蓋 flush 方法,就會一直向前傳遞直到 head 節點

落到 Encoder節點,下面是 Encoder 的處理流程

按照簡單自定義協議,將Java對象 User 寫到傳入的參數 out中,這個out究竟是什麼?

需知User對象,從BizHandler傳入到 MessageToByteEncoder時,首先傳到 write

1. 判斷當前Handelr是否能處理寫入的消息(匹配對象)



  • 判斷該對象是不是該類型參數匹配器實例可匹配到的類型
    TypeParameterMatcher#
    具體實例

2 分配內存


3 編碼實現

  • 調用encode,這裏就調回到 Encoder 這個Handler
  • 其爲抽象方法,所以自定義實現類實現編碼方法

4 釋放對象

  • 既然自定義Java對象轉換成ByteBuf了,那麼這個對象就已經無用,釋放掉 (當傳入的msg類型是ByteBuf時,就不須要本身手動釋放了)

5 傳播數據

//112 若是buf中寫入了數據,就把buf傳到下一個節點,直到 header 節點

6 釋放內存

//115 不然,釋放buf,將空數據傳到下一個節點
// 120 若是當前節點不能處理傳入的對象,直接扔給下一個節點處理
// 127 當buf在pipeline中處理完以後,釋放

Encoder處理傳入的Java對象

  • 判斷當前Handler是否能處理寫入的消息
    • 若是能處理,進入下面的流程
    • 不然,直接扔給下一個節點處理
  • 將對象強制轉換成Encoder 能夠處理的 Response對象
  • 分配一個ByteBuf
  • 調用encoder,即進入到 Encoder 的 encode方法,該方法是用戶代碼,用戶將數據寫入ByteBuf
  • 既然自定義Java對象轉換成ByteBuf了,那麼這個對象就已經無用了,釋放掉(當傳入的msg類型是ByteBuf時,無需本身手動釋放)
  • 若是buf中寫入了數據,就把buf傳到下一個節點,不然,釋放buf,將空數據傳到下一個節點
  • 最後,當buf在pipeline中處理完以後,釋放節點

總結就是,Encoder節點分配一個ByteBuf,調用encode方法,將Java對象根據自定義協議寫入到ByteBuf,而後再把ByteBuf傳入到下一個節點,在咱們的例子中,最終會傳入到head節點

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

這裏的msg就是前面在Encoder節點中,載有java對象數據的自定義ByteBuf對象

write - 寫buffer隊列


ChannelOutboundInvoker#

write(Object msg, boolean flush, ChannelPromise promise)



HeadContext in DefaultChannelPipeline#write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
Unsafe in Channel#write(Object msg, ChannelPromise promise)
如下過程分三步講解

direct ByteBuf


AbstractChannel#filterOutboundMessage(Object msg)

  • 首先,調用assertEventLoop確保該方法的調用是在reactor線程中
  • 而後,調用 filterOutboundMessage(),將待寫入的對象過濾,把非ByteBuf對象和FileRegion過濾,把全部的非直接內存轉換成直接內存DirectBuffer

    AbstractNioChannel#newDirectBuffer

插入寫隊列

  • 接下來,估算出須要寫入的ByteBuf的size

  • 最後,調用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,因此,接下來,咱們須要重點看一下這個方法幹了什麼事情
    ChannelOutboundBuffer

想要理解上面這段代碼,須掌握寫緩存中的幾個消息指針

ChannelOutboundBuffer 裏面的數據結構是一個單鏈表結構,每一個節點是一個 Entry,Entry 裏面包含了待寫出ByteBuf 以及消息回調 promise下面分別是

三個指針的做用

  • flushedEntry
    表第一個被寫到OS Socket緩衝區中的節點
    ChannelOutboundBuffer
  • unFlushedEntry
    表第一個未被寫入到OS Socket緩衝區中的節點
    ChannelOutboundBuffer
  • tailEntry
    ChannelOutboundBuffer緩衝區的最後一個節點
    ChannelOutboundBuffer

圖解過程

  • 初次調用write 即 addMessage

    fushedEntry指向空,unFushedEntrytailEntry都指向新加入節點

  • 第二次調用 addMessage

  • 第n次調用 addMessage

可得,調用n次addMessage

  • flushedEntry指針一直指向null,表此時還沒有有節點需寫到Socket緩衝區
  • unFushedEntry後有n個節點,表當前還有n個節點還沒有寫到Socket緩衝區

設置寫狀態

ChannelOutboundBuffer#addMessage

  • 統計當前有多少字節須要須要被寫出
    ChannelOutboundBuffer#addMessage(Object msg, int size, ChannelPromise promise)

  • 當前緩衝區中有多少待寫字節
    ChannelOutboundBuffer#


ChannelConfig#getWriteBufferHighWaterMark()

  • 因此默認不能超過64k
    WriteBufferWaterMark

  • 自旋鎖+CAS 操做,經過 pipeline 將事件傳播到channelhandler 中監控

flush:刷新buffer隊列

添加刷新標誌並設置寫狀態

  • 無論調用channel.flush(),仍是ctx.flush(),最終都會落地到pipeline中的head節點
    DefaultChannelPipeline#flush

  • 以後進入到AbstractUnsafe
    AbstractChannel#flush()

  • flush方法中,先調用
    ChannelOutboundBuffer#addFlush
    ChannelOutboundBuffer#decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability)

    和以前那個實例相同,再也不贅述

  • 結合前面的圖來看,上述過程即
    首先拿到 unflushedEntry 指針,而後將flushedEntry指向unflushedEntry所指向的節點,調用完畢後

遍歷 buffer 隊列,過濾bytebuf

  • 接下來,調用 flush0()

  • 發現這裏的核心代碼就一個 doWrite
    AbstractChannel#

AbstractNioByteChannel
  • 繼續跟
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    boolean setOpWrite = false;
    for (;;) {
        // 拿到第一個須要flush的節點的數據
        Object msg = in.current();

        if (msg instanceof ByteBuf) {
            boolean done = false;
            long flushedAmount = 0;
            // 拿到自旋鎖迭代次數
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            // 自旋,將當前節點寫出
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
                    setOpWrite = true;
                    break;
                }

                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedAmount);

            // 寫完以後,將當前節點刪除
            if (done) {
                in.remove();
            } else {
                break;
            }
        } 
    }
}
  • 第一步,調用current()先拿到第一個須要flush的節點的數據
    ChannelOutboundBuffer#current

  • 第二步,拿到自旋鎖的迭代次數

  • 第三步 調用 JDK 底層 API 進行自旋寫
    自旋的方式將ByteBuf寫到JDK NIO的Channel
    強轉爲ByteBuf,若發現沒有數據可讀,直接刪除該節點

  • 拿到自旋鎖迭代次數

image.png

  • 在併發編程中使用自旋鎖能夠提升內存使用率和寫的吞吐量,默認值爲16
    ChannelConfig

  • 繼續看源碼

    AbstractNioByteChannel#

  • javaChannel(),代表 JDK NIO Channel 已介入這次事件
    NioSocketChannel#
    ByteBuf#readBytes(GatheringByteChannel out, int length)

  • 獲得向JDK 底層已經寫了多少字節
    PooledDirectByteBuf#

  • 從 Netty 的 bytebuf 寫到 JDK 底層的 bytebuffer

  • 第四步,刪除該節點
    節點的數據已經寫入完畢,接下來就須要刪除該節點

    首先拿到當前被flush掉的節點(flushedEntry所指)
    而後拿到該節點的回調對象 ChannelPromise, 調用 removeEntry()移除該節點

    這裏是邏輯移除,只是將flushedEntry指針移到下個節點,調用後

    隨後,釋放該節點數據的內存,調用safeSuccess回調,用戶代碼能夠在回調裏面作一些記錄,下面是一段Example

ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() {
    @Override
    public void operationComplete(Future<? super Void> future) throws Exception {
       // 回調 
    }
})

最後,調用 recycle,將當前節點回收

writeAndFlush: 寫隊列並刷新

writeAndFlush在某個Handler中被調用以後,最終會落到 TailContext節點

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

AbstractChannelHandlerContext#
AbstractChannelHandlerContext#

最終,經過一個boolean變量,表示是調用invokeWriteAndFlush,仍是invokeWriteinvokeWrite即是咱們上文中的write過程
AbstractChannelHandlerContext#
能夠看到,最終調用的底層方法和單獨調用writeflush同樣的


由此看來,invokeWriteAndFlush基本等價於write以後再來一次flush

總結
  • 調用write並無將數據寫到Socket緩衝區中,而是寫到了一個單向鏈表的數據結構中,flush纔是真正的寫出
  • writeAndFlush等價於先將數據寫到netty的緩衝區,再將netty緩衝區中的數據寫到Socket緩衝區中,寫的過程與併發編程相似,用自旋鎖保證寫成功
  • netty中的緩衝區中的ByteBuf爲DirectByteBuf

當 BizHandler 經過 writeAndFlush 方法將自定義對象往前傳播時,其實能夠拆分紅兩個過程

  • 經過 pipeline逐漸往前傳播,傳播到其中的一個 encode 節點後,其負責重寫 write 方法將自定義的對象轉化爲 ByteBuf,接着繼續調用 write 向前傳播
  • pipeline中的編碼器原理是建立一個ByteBuf,將Java對象轉換爲ByteBuf,而後再把ByteBuf繼續向前傳遞,若沒有再重寫了,最終會傳播到 head 節點,其中緩衝區列表拿到緩存寫到 JDK 底層 ByteBuffer
相關文章
相關標籤/搜索