首先, 咱們想經過服務端,往客戶端發送數據, 一般咱們會調用**ctx.writeAndFlush(數據)
**的方式, 入參位置的數據多是基本數據類型,也可能對象html
其次,編碼器一樣屬於handler,只不過他是特化的專門用於編碼做用的handler, 在咱們的消息真正寫入jdk底層的ByteBuffer時前,數據須要通過編碼處理, 不是說不進行編碼就發送不出去,而是不通過編碼,客戶端可能接受到的是亂碼java
而後,咱們知道,ctx.writeAndFlush(數據)
它實際上是出站處理器特有的行爲,所以註定了它須要在pipeline中進行傳遞,從哪裏進行傳遞呢? 從tail節點開始,一直傳播到header以前的咱們本身添加的自定義的解碼器
中git
WriteAndFlush()
的邏輯咱們跟進源碼WriteAndFlush()
相對於Write()
,它的flush字段是truegithub
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { //todo 由於flush 爲 true next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); }
因而就會這樣設計模式
write()
flush()
知道這一點很重要,這意味這咱們知道了,事件傳播分紅兩波進行, 一波write,一波flush, 這兩波事件傳播的大致流程我寫在這裏, 在下面api
writepromise
flush緩存
ByteBuffer
/** * @Author: Changwu * @Date: 2019/7/21 20:49 */ public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol> { // todo write動做會傳播到 MyPersonEncoder的write方法, 可是咱們沒有重寫, 因而就執行 父類 MessageToByteEncoder的write, 咱們進去看 @Override protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception { System.out.println("MyPersonEncoder...."); // 消息頭 長度 out.writeInt(msg.getLength()); // 消息體 out.writeBytes(msg.getContent()); } }
選擇繼承MessageToByteEncoder<T>
從消息到字節的編碼器數據結構
ok,如今來到了咱們自定義的 解碼器MyPersonEncoder
,架構
可是,並沒看到正在傳播的writeAndFlush()
,不要緊, 咱們本身的解碼器繼承了MessageToByteEncoder
,這個父類中實現了writeAndFlush()
,源碼以下:解析寫在源碼後面
// todo 看他的write方法 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {// todo 1 判斷當前是否能夠處理這個對象 @SuppressWarnings("unchecked") I cast = (I) msg; // todo 2 內存分配 buf = allocateBuffer(ctx, cast, preferDirect); try { // todo 3 調用本類的encode(), 這個方法就是咱們本身實現的方法 encode(ctx, cast, buf); } finally { // todo 4 釋放 ReferenceCountUtil.release(cast); } if (buf.isReadable()) { // todo 5. 往前傳遞 ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { // todo 釋放 buf.release(); } }
encode()
方法,這是個抽象方法,由咱們自定義的編碼器實現
write()
事件到這裏爲止,編碼器的執行流程已經完成了,咱們能夠看到,和解碼器的架構邏輯類似,相似於模板設計模式,對咱們來講,只不過是作了個填空題
其實到上面的最後一步 釋放第一步建立的ByteBuf
以前 ,消息已經被寫到jdk底層的 ByteBuffer 中了,怎麼作的呢? 別忘了它的上一步, 繼續向前傳遞write()
事件,再往前其實就是HeaderContext
了,和HeaderContext
直接關聯的就是unsafe類, 這並不奇怪,咱們都知道,netty中不管是客戶端仍是服務端channel底層的數據讀寫,都依賴unsafe
下面開始分析,
WriteAndFlush()
底層的兩波任務細節
write()
咱們跟進HenderContext的write()
,而HenderContext的中依賴的是unsafe.wirte()
因此直接去 AbstractChannel
的Unsafe 源碼以下:
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // todo 緩存 寫進來的 buffer ReferenceCountUtil.release(msg); return; } int size; try { // todo buffer Dirct化 , (咱們查看 AbstractNioByteBuf的實現) msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } // todo 插入寫隊列 將 msg 插入到 outboundBuffer // todo outboundBuffer 這個對象是 ChannelOutBoundBuf類型的,它的做用就是起到一個容器的做用 // todo 下面看, 是如何將 msg 添加進 ChannelOutBoundBuf中的 outboundBuffer.addMessage(msg, size, promise); }
參數位置的msg,就是通過咱們自定義解碼器的父類進行包裝了的ByteBuf
類型消息
這個方法主要作了三件事
filterOutboundMessage(msg);
將ByteBuf轉換成DirctByteBuf
當咱們進入查看他的實現時,idea會提示,它的子類重寫了這個方法, 是誰重寫的呢? 是AbstractNioByteChannel
這個類實際上是屬於客戶端陣營的類,和服務端的AbstractNioMessageChannel
相提並論
源碼以下:
protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
DirectBuffer
插入到寫隊列中它其實就是一個netty自定義的容器,使用的單向鏈表的結構,爲何要有這個容器呢? 回想一下,服務端須要向客戶端發送消息,消息進而被封裝進ByteBuf
,可是呢, 往客戶端寫的方法有兩個
這個方法的區別是有的,前者只是進行了寫,(寫到了ByteBuf) 卻沒有將內容刷新到ByteBuffer
,沒有刷新到緩存中,就沒辦法進一步把它寫入jdk原生的ByteBuffer
中, 而 writeAndFlush()
就比較方便,先把msg寫入ByteBuf
,而後直接刷進socket,一套帶走,打完收工
可是若是客戶端恰恰就是不使用writeAndFlush()
,而使用前者,那麼盛放消息的ByteBuf
被傳遞到handler的最開始的位置,怎麼辦? unsafe也沒法把它寫給客戶端, 難道丟棄不成?
因而寫隊列就解決了這個問題,它以鏈表當作數據結構,新傳播過來的ByteBuf
就會被他封裝成一個一個的節點(entry)進行維護,爲了區分這個鏈表中,哪一個節點是被使用過的,哪一個節點是沒有使用過的,他就用三個標記指針進行標記,以下:
下面咱們看一下,它如何將一個新的節點,添加到寫隊列
addMessage(Object msg, int size, ChannelPromise promise)
添加寫隊列public void addMessage(Object msg, int size, ChannelPromise promise) { // todo 將上面的三者封裝成實體 // todo 調用工廠方法, 建立 Entry , 在 當前的ChannelOutboundBuffer 中每個單位都是一個 Entry, 用它進一步包裝 msg Entry entry = Entry.newInstance(msg, size, total(msg), promise); // todo 調整三個指針, 去上面查看這三個指針的定義 if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 // todo 跟進這個方法 incrementPendingOutboundBytes(entry.pendingSize, false); }
看他的源碼,其實就是簡單的針對鏈表進行插入的操做,尾插入法, 一直往最後的位置插入,鏈表的頭被標記成unflushedEntry
這兩個節點之間entry,表示是能夠被flush的節點
在每次添加新的 節點後都調用incrementPendingOutboundBytes(entry.pendingSize, false)
方法, 這個方法的做用是設置寫狀態, 設置怎樣的狀態呢? 咱們看它的源碼, 能夠看到,它會記錄下累計的ByteBuf
的容量,一旦超出了閾值,就會傳播channel不可寫的事件
write()
的第三件事private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } // todo TOTAL_PENDING_SIZE_UPDATER 當前緩存中 存在的代寫的 字節 // todo 累加 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); // todo 判斷 新的將被寫的 buffer的容量不能超過 getWriteBufferHighWaterMark() 默認是 64*1024 64字節 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { // todo 超過64 字節,進入這個方法 setUnwritable(invokeLater); } }
到目前爲止,第一波write()
事件已經完成了,咱們能夠看到了,這個事件的功能就是使用ChannelOutBoundBuf
將write事件傳播過去的單個ByteBuf
維護起來,等待 flush事件的傳播
flush()
咱們從新回到,AbstractChannel
中,看他的第二波flush事件的傳播狀態, 源碼以下:它也是主要作了下面的三件事
// todo 最終傳遞到 這裏 @Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // todo 添加刷新標誌, 設置寫狀態 outboundBuffer.addFlush(); // todo 遍歷buffer隊列, 過濾byteBuf flush0(); }
什麼是添加刷新標誌呢? 其實就是更改鏈表中的指針位置,三個指針之間的能夠完美的把entry
劃分出曾經flush過的和未flush節點
ok,繼續
下面看一下如何設置狀態,addflush() 源碼以下:
* todo 給 ChannelOutboundBuffer 添加緩存, 這意味着, 原來添加進 ChannelOutboundBuffer 中的全部 Entry, 所有會被標記爲 flushed 過 */ public void addFlush() { // todo 默認讓 entry 指向了 unflushedEntry ==> 其實鏈表中的最左邊的 未被使用過的 entry // todo Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); // todo 跟進這個方法 decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }
目標是移動指針,改變每個節點的狀態, 哪個指針呢? 是 flushedEntry
, 它指向讀被flush的節點,也就是說,它左邊的,都被處理過了
下面的代碼,是選出一開始位置, 由於, 若是flushedEntry == null,說明沒有任何一個曾經被flush過的節點,因而就將開始的位置定位到最左邊開始,
if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; }
緊接着一個do-while循環,從最後一個被flushedEntry
的地方,到尾部,挨個遍歷每個節點, 由於這些節點要被flush進緩存,咱們須要把write時累加的他們的容量減掉, 源碼以下
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } // todo 每次 減去 -size long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); // todo 默認 getWriteBufferLowWaterMark() -32kb // todo newWriteBufferSize<32 就把不可寫狀態改成可寫狀態 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } }
一樣是使用原子類作到的這件事, 此外,通過減小的容量,若是小於了32kb就會傳播 channel可寫的事件
這是flush的重頭戲,它實現了將數據寫入socket的操做
咱們跟進它的源碼,doWrite(ChannelOutboundBuffer in)
這是本類AbstractChannel
的抽象方法, 寫如的邏輯方法,被設計成抽象的,具體往那個channel寫,和具體的實現有關, 當前咱們想往客戶端寫, 它的實現是AbstractNioByteChannel
,咱們進入它的實現,源碼以下
boolean setOpWrite = false; // todo 總體是無限循環, 過濾ByteBuf for (;;) { // todo 獲取第一個 flushedEntity, 這個entity中 有咱們須要的 byteBuf Object msg = in.current(); if (msg == null) { // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } if (msg instanceof ByteBuf) { // todo 第三部分,jdk底層, 進行自旋的寫 ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); if (readableBytes == 0) { // todo 當前的 ByteBuf 中,沒有可寫的, 直接remove掉 in.remove(); continue; } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { // todo 獲取自旋鎖, netty使用它進行 writeSpinCount = config().getWriteSpinCount(); } // todo 這個for循環是在自旋嘗試往 jdk底層的 ByteBuf寫入數據 for (int i = writeSpinCount - 1; i >= 0; i --) { // todo 把 對應的 buf , 寫到socket中 // todo localFlushedAmount就是 本次 往jdk底層的 ByteBuffer 中寫入了多少字節 int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } // todo 累加一共寫了多少字節 flushedAmount += localFlushedAmount; // todo 若是buf中的數據所有寫完了, 設置完成的狀態, 退出循環 if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); // todo 自旋結束,寫完了 done = true if (done) { // todo 跟進去 in.remove(); } else { // Break the loop and so incompleteWrite(...) is called. break; } ....
這一段代碼也是很是長, 它的主要邏輯以下:
經過一個無限循環,保證能夠拿到全部的節點上的ByteBuf
,經過這個函數獲取節點, Object msg = in.current();
咱們進一步看它的實現,以下,它只會取出咱們標記的節點
public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; }
下一步, 使用jdk的自旋鎖,循環16次,嘗試往jdk底層的ByteBuffer中寫數據, 調用函數doWriteBytes(buf);
他是本類的抽象方法, 具體的實現是,客戶端chanel的封裝類NioSocketChannel
實現的源碼以下:
// todo @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); // todo 將字節數據, 寫入到 java 原生的 channel中 return buf.readBytes(javaChannel(), expectedWrittenBytes); }
這個readBytes()
依然是抽象方法,由於前面咱們曾經把從ByteBuf
轉化成了Dirct類型的, 因此它的實現類是PooledDirctByteBuf
繼續跟進以下: 終於見到了親切的一幕
// todo @Override public int readBytes(GatheringByteChannel out, int length) throws IOException { checkReadableBytes(length); //todo 關鍵的就是 getBytes() 跟進去 int readBytes = getBytes(readerIndex, out, length, true); readerIndex += readBytes; return readBytes; } 跟進getBytes(){ index = idx(index); // todo 將netty 的 ByteBuf 塞進 jdk的 ByteBuffer tmpBuf; tmpBuf.clear().position(index).limit(index + length); // todo 調用jdk的write()方法 return out.write(tmpBuf); }
private void removeEntry(Entry e) { if (-- flushed == 0) { // todo 若是是最後一個節點, 把全部的指針所有設爲 null // processed everything flushedEntry = null; if (e == tailEntry) { tailEntry = null; unflushedEntry = null; } } else { //todo 若是 不是最後一個節點, 把當前節點,移動到最後的 節點 flushedEntry = e.next; } }
到這裏, 第二波任務的傳播就完成了
write
flush
ByteBuffer
原文出處:https://www.cnblogs.com/ZhuChangwu/p/11228433.html