編碼器實現了ChannelOutboundHandler
,並將出站數據從 一種格式轉換爲另外一種格式,和咱們方纔學習的解碼器的功能正好相反。Netty 提供了一組類, 用於幫助你編寫具備如下功能的編碼器:java
解碼器一般須要在Channel
關閉以後產生最後一個消息(所以也就有了 decodeLast()
方法)
這顯然不適於編碼器的場景——在鏈接被關閉以後仍然產生一個消息是毫無心義的react
其接受一Short
型實例做爲消息,編碼爲Short
的原子類型值,並寫入ByteBuf
,隨後轉發給ChannelPipeline
中的下一個 ChannelOutboundHandler
每一個傳出的 Short 值都將會佔用 ByteBuf 中的 2 字節
編程
Netty 提供了一些專門化的 MessageToByteEncoder
,可基於此實現本身的編碼器WebSocket08FrameEncoder
類提供了一個很好的實例
promise
你已經看到了如何將入站數據從一種消息格式解碼爲另外一種
爲了完善這幅圖,將展現 對於出站數據將如何從一種消息編碼爲另外一種。MessageToMessageEncoder
類的 encode()
方法提供了這種能力
爲了演示,使用IntegerToStringEncoder
擴展了 MessageToMessageEncoder
緩存
關於有趣的 MessageToMessageEncoder 的專業用法,請查看 io.netty.handler. codec.protobuf.ProtobufEncoder
類,它處理了由 Google 的 Protocol Buffers 規範所定義 的數據格式。markdown
pipeline中的標準鏈表結構
java對象編碼過程
write:寫隊列
flush:刷新寫隊列
writeAndFlush: 寫隊列並刷新數據結構
數據從head節點流入,先拆包,而後解碼成業務對象,最後通過業務Handler
處理,調用write
,將結果對象寫出去
而寫的過程先經過tail
節點,而後經過encoder
節點將對象編碼成ByteBuf
,最後將該ByteBuf
對象傳遞到head
節點,調用底層的Unsafe寫到JDK底層管道併發
爲何咱們在pipeline中添加了encoder節點,java對象就轉換成netty能夠處理的ByteBuf,寫到管道里?socket
咱們先看下調用write的code
業務處理器接受到請求以後,作一些業務處理,返回一個user
ide
而後,user在pipeline中傳遞
情形一
情形二
handler 若是不覆蓋 flush 方法,就會一直向前傳遞直到 head 節點
落到 Encoder
節點,下面是 Encoder
的處理流程
按照簡單自定義協議,將Java對象 User 寫到傳入的參數 out中,這個out究竟是什麼?
需知User
對象,從BizHandler
傳入到 MessageToByteEncoder
時,首先傳到 write
encode
,這裏就調回到 Encoder
這個Handler
中ByteBuf
了,那麼這個對象就已經無用,釋放掉 (當傳入的msg
類型是ByteBuf
時,就不須要本身手動釋放了)//112 若是buf中寫入了數據,就把buf傳到下一個節點,直到 header 節點
//115 不然,釋放buf,將空數據傳到下一個節點
// 120 若是當前節點不能處理傳入的對象,直接扔給下一個節點處理
// 127 當buf在pipeline中處理完以後,釋放
Handler
是否能處理寫入的消息
Encoder
能夠處理的 Response
對象ByteBuf
encoder
,即進入到 Encoder 的 encode方法,該方法是用戶代碼,用戶將數據寫入ByteBuf總結就是,Encoder
節點分配一個ByteBuf
,調用encode
方法,將Java對象根據自定義協議寫入到ByteBuf,而後再把ByteBuf傳入到下一個節點,在咱們的例子中,最終會傳入到head節點
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
這裏的msg就是前面在Encoder節點中,載有java對象數據的自定義ByteBuf對象
如下過程分三步講解
assertEventLoop
確保該方法的調用是在reactor
線程中filterOutboundMessage()
,將待寫入的對象過濾,把非ByteBuf
對象和FileRegion
過濾,把全部的非直接內存轉換成直接內存DirectBuffer
想要理解上面這段代碼,須掌握寫緩存中的幾個消息指針
ChannelOutboundBuffer 裏面的數據結構是一個單鏈表結構,每一個節點是一個 Entry,Entry 裏面包含了待寫出ByteBuf 以及消息回調 promise下面分別是
ChannelOutboundBuffer
緩衝區的最後一個節點初次調用write 即 addMessage
後fushedEntry
指向空,unFushedEntry
和 tailEntry
都指向新加入節點
第二次調用 addMessage
後
第n次調用 addMessage
後
可得,調用n次addMessage
後
flushedEntry
指針一直指向null
,表此時還沒有有節點需寫到Socket緩衝區unFushedEntry
後有n個節點,表當前還有n個節點還沒有寫到Socket緩衝區統計當前有多少字節須要須要被寫出
當前緩衝區中有多少待寫字節
無論調用channel.flush()
,仍是ctx.flush()
,最終都會落地到pipeline
中的head
節點
以後進入到AbstractUnsafe
flush方法中,先調用
結合前面的圖來看,上述過程即
首先拿到 unflushedEntry
指針,而後將flushedEntry
指向unflushedEntry
所指向的節點,調用完畢後
接下來,調用 flush0()
發現這裏的核心代碼就一個 doWrite
protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { // 拿到第一個須要flush的節點的數據 Object msg = in.current(); if (msg instanceof ByteBuf) { boolean done = false; long flushedAmount = 0; // 拿到自旋鎖迭代次數 if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } // 自旋,將當前節點寫出 for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); // 寫完以後,將當前節點刪除 if (done) { in.remove(); } else { break; } } } }
第一步,調用current()
先拿到第一個須要flush
的節點的數據
第二步,拿到自旋鎖的迭代次數
第三步 調用 JDK 底層 API 進行自旋寫
自旋的方式將ByteBuf
寫到JDK NIO的Channel
強轉爲ByteBuf,若發現沒有數據可讀,直接刪除該節點
拿到自旋鎖迭代次數
在併發編程中使用自旋鎖能夠提升內存使用率和寫的吞吐量,默認值爲16
繼續看源碼
javaChannel()
,代表 JDK NIO Channel 已介入這次事件
獲得向JDK 底層已經寫了多少字節
從 Netty 的 bytebuf 寫到 JDK 底層的 bytebuffer
第四步,刪除該節點
節點的數據已經寫入完畢,接下來就須要刪除該節點
首先拿到當前被flush
掉的節點(flushedEntry
所指)
而後拿到該節點的回調對象 ChannelPromise
, 調用 removeEntry()
移除該節點
這裏是邏輯移除,只是將flushedEntry指針移到下個節點,調用後
隨後,釋放該節點數據的內存,調用safeSuccess
回調,用戶代碼能夠在回調裏面作一些記錄,下面是一段Example
ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { // 回調 } })
最後,調用 recycle
,將當前節點回收
writeAndFlush
在某個Handler
中被調用以後,最終會落到 TailContext
節點
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { write(msg, true, promise); return promise; }
最終,經過一個boolean
變量,表示是調用invokeWriteAndFlush
,仍是invokeWrite
,invokeWrite
即是咱們上文中的write過程
能夠看到,最終調用的底層方法和單獨調用write
和flush
同樣的
由此看來,invokeWriteAndFlush
基本等價於write
以後再來一次flush
write
並無將數據寫到Socket緩衝區中,而是寫到了一個單向鏈表的數據結構中,flush
纔是真正的寫出writeAndFlush
等價於先將數據寫到netty的緩衝區,再將netty緩衝區中的數據寫到Socket緩衝區中,寫的過程與併發編程相似,用自旋鎖保證寫成功ByteBuf
,將Java對象轉換爲ByteBuf
,而後再把ByteBuf
繼續向前傳遞,若沒有再重寫了,最終會傳播到 head 節點,其中緩衝區列表拿到緩存寫到 JDK 底層 ByteBuffer