深刻理解 Netty編碼流程及WriteAndFlush()的實現

編碼器的執行時機

首先, 咱們想經過服務端,往客戶端發送數據, 一般咱們會調用**ctx.writeAndFlush(數據)**的方式, 入參位置的數據多是基本數據類型,也可能對象html

其次,編碼器一樣屬於handler,只不過他是特化的專門用於編碼做用的handler, 在咱們的消息真正寫入jdk底層的ByteBuffer時前,數據須要通過編碼處理, 不是說不進行編碼就發送不出去,而是不通過編碼,客戶端可能接受到的是亂碼java

而後,咱們知道,ctx.writeAndFlush(數據)它實際上是出站處理器特有的行爲,所以註定了它須要在pipeline中進行傳遞,從哪裏進行傳遞呢? 從tail節點開始,一直傳播到header以前的咱們本身添加的自定義的解碼器git

WriteAndFlush()的邏輯

咱們跟進源碼WriteAndFlush()相對於Write(),它的flush字段是truegithub

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            //todo 由於flush 爲 true
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }

因而就會這樣設計模式

  • 逐個調用handler的write()
  • 逐個調用handler的flush()

知道這一點很重要,這意味這咱們知道了,事件傳播分紅兩波進行, 一波write,一波flush, 這兩波事件傳播的大致流程我寫在這裏, 在下面api

writepromise

  • 將ByteBuf 轉換成DirctBuffer
  • 將消息(DirctBuffer)封裝進entry 插入寫隊列
  • 設置寫狀態

flush緩存

  • 刷新標誌,設置寫狀態
  • 變量buffer隊列,過濾Buffer
  • 調用jdk底層的api,把ByteBuf寫入jdk原生的ByteBuffer

自定義一個簡單的編碼器

/**
 * @Author: Changwu
 * @Date: 2019/7/21 20:49
 */
public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol> {

    // todo write動做會傳播到 MyPersonEncoder的write方法, 可是咱們沒有重寫, 因而就執行 父類 MessageToByteEncoder的write, 咱們進去看
    @Override
    protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyPersonEncoder....");
        // 消息頭  長度
        out.writeInt(msg.getLength());
        // 消息體
        out.writeBytes(msg.getContent());
    }
}

選擇繼承MessageToByteEncoder<T> 從消息到字節的編碼器數據結構

繼續跟進

ok,如今來到了咱們自定義的 解碼器MyPersonEncoder ,架構

可是,並沒看到正在傳播的writeAndFlush(),不要緊, 咱們本身的解碼器繼承了MessageToByteEncoder,這個父類中實現了writeAndFlush(),源碼以下:解析寫在源碼後面

// todo 看他的write方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {// todo 1  判斷當前是否能夠處理這個對象
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            // todo 2 內存分配
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                // todo 3 調用本類的encode(), 這個方法就是咱們本身實現的方法
                encode(ctx, cast, buf);
            } finally {
                // todo 4 釋放
                ReferenceCountUtil.release(cast);
            }

            if (buf.isReadable()) {
                // todo 5. 往前傳遞
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            // todo 釋放
            buf.release();
        }
    }
  • 將咱們發送的消息msg,封裝進了 ByteBuf 中
  • 編碼: 執行encode()方法,這是個抽象方法,由咱們自定義的編碼器實現
    • 咱們的實現很簡單,分別往Buf裏面寫入下面兩次數據
      • int類型的消息的長度
      • 消息體
  • 將msg釋放
  • 繼續向前傳遞 write()事件
  • 最終,釋放第一步建立的ByteBuf

小結

到這裏爲止,編碼器的執行流程已經完成了,咱們能夠看到,和解碼器的架構邏輯類似,相似於模板設計模式,對咱們來講,只不過是作了個填空題


其實到上面的最後一步 釋放第一步建立的ByteBuf以前 ,消息已經被寫到jdk底層的 ByteBuffer 中了,怎麼作的呢? 別忘了它的上一步, 繼續向前傳遞write()事件,再往前其實就是HeaderContext了,和HeaderContext直接關聯的就是unsafe類, 這並不奇怪,咱們都知道,netty中不管是客戶端仍是服務端channel底層的數據讀寫,都依賴unsafe

下面開始分析,WriteAndFlush()底層的兩波任務細節

第一波事件傳遞 write()

咱們跟進HenderContext的write() ,而HenderContext的中依賴的是unsafe.wirte()因此直接去 AbstractChannel的Unsafe 源碼以下:

@Override
    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) { // todo 緩存 寫進來的 buffer
            ReferenceCountUtil.release(msg);
            return;
        }

        int size;
        try {
            // todo buffer  Dirct化 , (咱們查看 AbstractNioByteBuf的實現)
            msg = filterOutboundMessage(msg);

            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }
        // todo 插入寫隊列  將 msg 插入到 outboundBuffer
        // todo outboundBuffer 這個對象是 ChannelOutBoundBuf類型的,它的做用就是起到一個容器的做用
        // todo 下面看, 是如何將 msg 添加進 ChannelOutBoundBuf中的
        outboundBuffer.addMessage(msg, size, promise);
    }

參數位置的msg,就是通過咱們自定義解碼器的父類進行包裝了的ByteBuf類型消息

這個方法主要作了三件事

  • 第一: filterOutboundMessage(msg); 將ByteBuf轉換成DirctByteBuf

當咱們進入查看他的實現時,idea會提示,它的子類重寫了這個方法, 是誰重寫的呢? 是AbstractNioByteChannel 這個類實際上是屬於客戶端陣營的類,和服務端的AbstractNioMessageChannel相提並論

源碼以下:

protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }

        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        return msg;
    }

    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
  • 第二件事: 將轉換後的DirectBuffer插入到寫隊列中

什麼是寫隊列 ? 做用是啥?

它其實就是一個netty自定義的容器,使用的單向鏈表的結構,爲何要有這個容器呢? 回想一下,服務端須要向客戶端發送消息,消息進而被封裝進ByteBuf,可是呢, 往客戶端寫的方法有兩個

  • write()
  • writeAndFlush()

這個方法的區別是有的,前者只是進行了寫,(寫到了ByteBuf) 卻沒有將內容刷新到ByteBuffer,沒有刷新到緩存中,就沒辦法進一步把它寫入jdk原生的ByteBuffer中, 而 writeAndFlush()就比較方便,先把msg寫入ByteBuf,而後直接刷進socket,一套帶走,打完收工

可是若是客戶端恰恰就是不使用writeAndFlush(),而使用前者,那麼盛放消息的ByteBuf被傳遞到handler的最開始的位置,怎麼辦? unsafe也沒法把它寫給客戶端, 難道丟棄不成?

因而寫隊列就解決了這個問題,它以鏈表當作數據結構,新傳播過來的ByteBuf就會被他封裝成一個一個的節點(entry)進行維護,爲了區分這個鏈表中,哪一個節點是被使用過的,哪一個節點是沒有使用過的,他就用三個標記指針進行標記,以下:

  • flushedEntry 被刷新過的entry
  • tailEntry 尾節點
  • unflushedEntry 未被刷的entry

下面咱們看一下,它如何將一個新的節點,添加到寫隊列

addMessage(Object msg, int size, ChannelPromise promise) 添加寫隊列

public void addMessage(Object msg, int size, ChannelPromise promise) {
    // todo 將上面的三者封裝成實體
    // todo 調用工廠方法, 建立  Entry  , 在 當前的ChannelOutboundBuffer 中每個單位都是一個 Entry, 用它進一步包裝 msg
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);

    // todo 調整三個指針, 去上面查看這三個指針的定義
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    // todo 跟進這個方法
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

看他的源碼,其實就是簡單的針對鏈表進行插入的操做,尾插入法, 一直往最後的位置插入,鏈表的頭被標記成unflushedEntry 這兩個節點之間entry,表示是能夠被flush的節點

在每次添加新的 節點後都調用incrementPendingOutboundBytes(entry.pendingSize, false)方法, 這個方法的做用是設置寫狀態, 設置怎樣的狀態呢? 咱們看它的源碼, 能夠看到,它會記錄下累計的ByteBuf的容量,一旦超出了閾值,就會傳播channel不可寫的事件

  • 這也是write()的第三件事
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    // todo TOTAL_PENDING_SIZE_UPDATER 當前緩存中 存在的代寫的 字節
    // todo 累加
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    // todo 判斷 新的將被寫的 buffer的容量不能超過  getWriteBufferHighWaterMark() 默認是 64*1024  64字節
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        // todo 超過64 字節,進入這個方法
        setUnwritable(invokeLater);
    }
}

小結:

到目前爲止,第一波write()事件已經完成了,咱們能夠看到了,這個事件的功能就是使用ChannelOutBoundBuf將write事件傳播過去的單個ByteBuf維護起來,等待 flush事件的傳播

第二波事件傳遞 flush()

咱們從新回到,AbstractChannel中,看他的第二波flush事件的傳播狀態, 源碼以下:它也是主要作了下面的三件事

  • 添加刷新標誌,設置寫狀態
  • 遍歷buffer隊列,過濾能夠flush的buffer
  • 調用jdk底層的api,進行自旋寫
// todo 最終傳遞到 這裏
@Override
public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    // todo 添加刷新標誌, 設置寫狀態
    outboundBuffer.addFlush();

    // todo  遍歷buffer隊列, 過濾byteBuf
    flush0();
}

添加刷新標誌,設置寫狀態

什麼是添加刷新標誌呢? 其實就是更改鏈表中的指針位置,三個指針之間的能夠完美的把entry劃分出曾經flush過的和未flush節點

ok,繼續

下面看一下如何設置狀態,addflush() 源碼以下:

* todo 給 ChannelOutboundBuffer 添加緩存, 這意味着, 原來添加進 ChannelOutboundBuffer 中的全部 Entry, 所有會被標記爲 flushed 過
 */
public void addFlush() {
// todo 默認讓 entry 指向了 unflushedEntry ==> 其實鏈表中的最左邊的 未被使用過的 entry
// todo
Entry entry = unflushedEntry;

if (entry != null) {
    if (flushedEntry == null) {
        // there is no flushedEntry yet, so start with the entry
        flushedEntry = entry;
    }
    do {
        flushed ++;
        if (!entry.promise.setUncancellable()) {
            // Was cancelled so make sure we free up memory and notify about the freed bytes
            int pending = entry.cancel();
            // todo 跟進這個方法
            decrementPendingOutboundBytes(pending, false, true);
        }
        entry = entry.next;
    } while (entry != null);

    // All flushed so reset unflushedEntry
    unflushedEntry = null;
}
}

目標是移動指針,改變每個節點的狀態, 哪個指針呢? 是 flushedEntry , 它指向讀被flush的節點,也就是說,它左邊的,都被處理過了

下面的代碼,是選出一開始位置, 由於, 若是flushedEntry == null,說明沒有任何一個曾經被flush過的節點,因而就將開始的位置定位到最左邊開始,

if (flushedEntry == null) {
    // there is no flushedEntry yet, so start with the entry
    flushedEntry = entry;
}

緊接着一個do-while循環,從最後一個被flushedEntry的地方,到尾部,挨個遍歷每個節點, 由於這些節點要被flush進緩存,咱們須要把write時累加的他們的容量減掉, 源碼以下

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }
    // todo 每次 減去 -size
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    // todo   默認 getWriteBufferLowWaterMark() -32kb
    // todo   newWriteBufferSize<32 就把不可寫狀態改成可寫狀態
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        setWritable(invokeLater);
    }
}

一樣是使用原子類作到的這件事, 此外,通過減小的容量,若是小於了32kb就會傳播 channel可寫的事件

遍歷buffer隊列, 過濾byteBuf

這是flush的重頭戲,它實現了將數據寫入socket的操做

咱們跟進它的源碼,doWrite(ChannelOutboundBuffer in) 這是本類AbstractChannel的抽象方法, 寫如的邏輯方法,被設計成抽象的,具體往那個channel寫,和具體的實現有關, 當前咱們想往客戶端寫, 它的實現是AbstractNioByteChannel,咱們進入它的實現,源碼以下

boolean setOpWrite = false;
        // todo 總體是無限循環, 過濾ByteBuf
for (;;) {
    // todo 獲取第一個 flushedEntity, 這個entity中 有咱們須要的 byteBuf
    Object msg = in.current();
    if (msg == null) {
        // Wrote all messages.
        clearOpWrite();
        // Directly return here so incompleteWrite(...) is not called.
        return;
    }

    if (msg instanceof ByteBuf) {
        // todo 第三部分,jdk底層, 進行自旋的寫
        ByteBuf buf = (ByteBuf) msg;
        int readableBytes = buf.readableBytes();
        if (readableBytes == 0) {
            // todo 當前的 ByteBuf 中,沒有可寫的, 直接remove掉
            in.remove();
            continue;
        }

        boolean done = false;
        long flushedAmount = 0;
        if (writeSpinCount == -1) {
            // todo 獲取自旋鎖, netty使用它進行
            writeSpinCount = config().getWriteSpinCount();
        }
        // todo 這個for循環是在自旋嘗試往 jdk底層的 ByteBuf寫入數據
        for (int i = writeSpinCount - 1; i >= 0; i --) {

            // todo  把 對應的 buf , 寫到socket中
            // todo localFlushedAmount就是 本次 往jdk底層的 ByteBuffer 中寫入了多少字節
            int localFlushedAmount = doWriteBytes(buf);

            if (localFlushedAmount == 0) {
                setOpWrite = true;
                break;
            }
            // todo 累加一共寫了多少字節
            flushedAmount += localFlushedAmount;
            // todo 若是buf中的數據所有寫完了, 設置完成的狀態, 退出循環
            if (!buf.isReadable()) {
                done = true;
                break;
            }
        }

        in.progress(flushedAmount);

        // todo 自旋結束,寫完了  done = true
        if (done) {
            // todo 跟進去
            in.remove();
        } else {
            // Break the loop and so incompleteWrite(...) is called.
            break;
        }
    ....

這一段代碼也是很是長, 它的主要邏輯以下:

經過一個無限循環,保證能夠拿到全部的節點上的ByteBuf,經過這個函數獲取節點, Object msg = in.current(); 咱們進一步看它的實現,以下,它只會取出咱們標記的節點

public Object current() {
        Entry entry = flushedEntry;
        if (entry == null) {
            return null;
        }

        return entry.msg;
    }

下一步, 使用jdk的自旋鎖,循環16次,嘗試往jdk底層的ByteBuffer中寫數據, 調用函數doWriteBytes(buf);他是本類的抽象方法, 具體的實現是,客戶端chanel的封裝類NioSocketChannel實現的源碼以下:

// todo
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    // todo 將字節數據, 寫入到 java 原生的 channel中
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

這個readBytes()依然是抽象方法,由於前面咱們曾經把從ByteBuf轉化成了Dirct類型的, 因此它的實現類是PooledDirctByteBuf 繼續跟進以下: 終於見到了親切的一幕

// todo
    @Override
    public int readBytes(GatheringByteChannel out, int length) throws IOException {
        checkReadableBytes(length);
        //todo  關鍵的就是 getBytes()  跟進去
        int readBytes = getBytes(readerIndex, out, length, true);
        readerIndex += readBytes;
        return readBytes;
    }
    
    跟進getBytes(){
        index = idx(index);
        // todo 將netty 的 ByteBuf 塞進 jdk的    ByteBuffer tmpBuf;
        tmpBuf.clear().position(index).limit(index + length);
        // todo 調用jdk的write()方法
        return out.write(tmpBuf);
    }

此外,被使用過的節點會被remove()掉, 源碼以下, 也是針對鏈表的操做

private void removeEntry(Entry e) {
        if (-- flushed == 0) { // todo 若是是最後一個節點, 把全部的指針所有設爲 null
            // processed everything
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else { //todo 若是 不是最後一個節點, 把當前節點,移動到最後的 節點
            flushedEntry = e.next;
        }
    }

小結

到這裏, 第二波任務的傳播就完成了

write

  • 將buffer 轉換成DirctBuffer
  • 將消息entry 插入寫隊列
  • 設置寫狀態

flush

  • 刷新標誌,設置寫狀態
  • 變量buffer隊列,過濾Buffer
  • 調用jdk底層的api,把ByteBuf寫入jdk原生的ByteBuffer

原文出處:https://www.cnblogs.com/ZhuChangwu/p/11228433.html

相關文章
相關標籤/搜索