Netty源碼分析系列之writeAndFlush()上

掃描下方二維碼或者微信搜索公衆號菜鳥飛呀飛,便可關注微信公衆號,閱讀更多Spring源碼分析Java併發編程文章。java

微信公衆號

前言

前兩篇文章中分析了 netty 中解碼器相關的源碼,解碼過程是發生在讀數據這一步的,那麼讀到數據,通過解碼器解碼後,最終就會交由咱們自定義的業務處理中執行,當咱們的業務邏輯處理完成後,就須要給客戶端響應消息,這就涉及到服務端如何經過 channel 將響應消息寫出去的流程了,同時還會涉及到消息的編碼過程,由於在 TCP 協議中,數據最終是經過字節流傳輸的,而咱們一般在業務代碼中是返回一個對象,所以須要進行編碼。接下來本文將會重點分析這兩個的過程的源碼實現。編程

Demo 代碼

爲了方便描述,這裏模擬一個簡單的場景:netty 服務端在讀到客戶端發來的消息後,netty 服務端就經過咱們自定義的 ChannelHandler 來進行業務處理,並返回一個 Data 對象,Data 類是咱們自定義的一個類,而後咱們將 Data 對象經過咱們自定義的一個編碼器 DataEncoder 進行編碼,最後將消息發送出去。promise

netty 服務端啓動的 demo 代碼微信

public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.channel(NioServerSocketChannel.class)
            .group(bossGroup,workerGroup)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 向客戶端channel中添加兩個channelHandler
                    pipeline.addLast(new DataEncoder());
                    pipeline.addLast(new BusinessHandler());
                }
            });
    serverBootstrap.bind(8080).sync();
}
複製代碼

能夠看到,分別向客戶端的 channel 的 pipeline 中添加了兩個 ChannelHandler:DataEncoder 是自定義的一個編碼器,負責進行編碼,BusinessHandler 是負責進行業務處理的。所以建立出來的客戶端 channel 的 pipeline 的結構以下,這個結構很重要,後面的源碼分析全是基於這個結構來分析的。併發

Demo示例pipeline結構圖

DataEncoder 編碼器的源碼異步

/** * 實際上就是一個基於換行符的編碼器 */
public class DataEncoder extends MessageToByteEncoder<Data> {

    private static final String LINE = "\r\n";

    @Override
    protected void encode(ChannelHandlerContext ctx, Data msg, ByteBuf out) throws Exception {
        if(msg instanceof Data){
            out.writeBytes(msg.getMsg().getBytes());
            out.writeLong(msg.getServerTime());
            out.writeBytes(LINE.getBytes());
        }
    }
}
複製代碼

爲了簡化分析,BusinessHandler 的代碼比較簡單,它就是直接向客戶端返回一個 Data 對象ide

public class BusinessHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 讀到客戶端發來的消息後,服務端直接返回一個data對象
        Data data = new Data("Hello World",System.currentTimeMillis());
        ctx.channel().writeAndFlush(data);

    }
}
複製代碼

Data 類就是一個簡單的 Bean,包含兩個屬性:msgserverTimeoop

public class Data {

    private String msg;

    private Long serverTime;

    public Data(String msg, Long serverTime) {
        this.msg = msg;
        this.serverTime = serverTime;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Long getServerTime() {
        return serverTime;
    }

    public void setServerTime(Long serverTime) {
        this.serverTime = serverTime;
    }
}
複製代碼

源碼分析

當服務端讀到客戶端發送過來的消息後,就會經過客戶端 channel(即:NioSocketCahnnel)的 pipeline 進行傳播執行 channelRead() 方法,也就是會執行到咱們自定義的 BusinessHaandler 中的 channelRead()方法。此時咱們直接建立了一個 Data 對象,而後經過客戶端 channel 的 writeAndFlush() 方法,將 Data 對象寫出去,發送給客戶端。因此接下來詳細分析下 writeAndFlush()方法的工做原理。源碼分析

ctx.channel()獲取到的是客戶端 channel,調用客戶端 channel 的 writeAndFlush()時,最終會執行客戶端 channel 的 pipeline 的 writeAndFlush()方法。ui

public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}
複製代碼

看到調用 pipeline 的方法,首先想到的是這個方法的調用,會沿着 pipeline 這個管道執行全部 channelHandler 的方法。進入到 pipeline 的 writeAndFlush(msg)方法的源碼中,果真,它是從 tail 節點開始向前傳播執行。

@Override
public final ChannelFuture writeAndFlush(Object msg) {
    // 從尾結點開始向前傳播
    return tail.writeAndFlush(msg);
}
複製代碼

最終會調用 tail 節點的 write() 方法。在調用 write()方法時,傳入的第二個參數 flush 爲 true,表示的是須要執行 flush()方法(注意:此時傳入的是 true,記住這一點很重要,由於後面還會有一個地方也會調用 write()方法,可是傳入的是 false)。

private void write(Object msg, boolean flush, ChannelPromise promise) {
    // 省略部分代碼...

    // 找到下一個Outbound類型的handler
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    // 同步執行
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        // 異步執行,實際上就是將當前的寫操做封裝成一個task,而後提交到線程池
        final AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        if (!safeExecute(executor, task, promise, m)) {
            task.cancel();
        }
    }
}
複製代碼

write()方法的邏輯能夠分爲三步,第一步:找到 pipeline 中下一個 OutBound 類型的 channelHandler,對於咱們 demo 中的示例,就是 DataEncoder

第二步:根據當前的線程是不是 NioEventLoop 線程來判斷是同步執行仍是異步執行 writeAndFlush,因爲當前線程是 NioEventLoop,因此最終是同步執行。

第三步:根據傳入的 flush 標識來判斷是否須要執行 flush 方法,若是傳入的是 true,表示須要執行 flush,前面已經提到了,此時傳入的 flush 爲 true,所以最後會調用 next.invokeWriteAndFlush(m, promise) 這一行代碼。

在 invokeWriteAndFlush()中會先調用 invokeWrite0(),也就是觸發執行 write()方法,而後調用 invokeFlush0(),也就是觸發執行 flush()方法。今天先分析 write()方法相關的源碼,flush 相關的源碼下一篇文章分析。

invokeWrite0()會觸發執行 channelHandler 的 write()方法,此時傳播到的 handler 是咱們自定義的 DataEncoder(由於 tail 節點的前一個 OutBound 類型的節點就是 DataEncoder,注意:BusinessHandler 是 InBound 類型),因爲 DataEncoder 沒有重寫 write()方法,因此調用的是父類 MessageToByteEncoder 的 write()方法。在 write()方法中,會調用到子類的 encode()方法,對數據進行編碼。其源碼以下。

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        // 判斷要寫出的數據對象msg,是否須要當前的MessageToByteEncoder來處理
        // 如何判斷呢?就根據類中傳入的泛型類型和msg的實際類型相比較
        // 若是類型匹配成功,那就使用MessageToByteEncoder來處理,即會進行編碼:encode()
        // 若是匹配不成功,則直接使用write()方法
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            // 建立一個ByteBuf
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                // 調用編碼器的編碼方法(在編碼後,會將編碼出來的字節數據存放到buf)
                encode(ctx, cast, buf);
            } finally {
                // 將cast對象的資源釋放,由於前面一步已經將對象進行編碼了,後面再也不須要用到該對象了
                ReferenceCountUtil.release(cast);
            }
            // 若是編碼成功,buf中就會有數據,若是有數據,那麼就就能夠調用write()方法了
            // 會調用父類AbstractChannelHandlerContext的write()方法
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        // 出現異常後,釋放資源
        if (buf != null) {
            buf.release();
        }
    }
}
複製代碼

write()方法的源碼比較長,執行流程能夠用以下流程圖表示。

write()流程

首先會根據 acceptOutboundMessage(msg) 方法判斷要寫出的數據對象 msg,是否須要當前的 MessageToByteEncoder 來處理。如何判斷呢?就根據類中傳入的泛型類型和 msg 的實際類型相比較,若是類型匹配成功,那就使用 MessageToByteEncoder 來處理,即會進行編碼:encode()。例如本文的示例代碼中,DataEncoder 這個編碼器中,傳入的泛型是 Data 類型,這也就意味着 DataEncoder 這個編碼器只會對類型是 Data 的數據進行編碼。在 BusinessHandler 中咱們發送的數據就是 Data 類型,所以此時 DataEncoder 編碼器會對 Data 對象進行編碼。若是類型匹配不成功,那麼就會直接調用 ctx.write(msg, promise)方法,向下一個 handler 傳播執行 write()方法。

當類型匹配成功之後,會先將 msg 的類型強轉爲指定的泛型類型,在 demo 中就是 Data 類型。而後 allocateBuffer()方法申請一塊內存,因爲 preferDirect 屬性默認是 true,所以會建立一塊堆外內存。接着調用 encode()方法進行編碼,因爲咱們定義的 DataEncoder 重寫了 encode()方法,因此這裏會調用 DataEncoder 的 encode()方法。

protected void encode(ChannelHandlerContext ctx, Data msg, ByteBuf out) throws Exception {
    if(msg instanceof Data){
        out.writeBytes(msg.getMsg().getBytes());
        out.writeLong(msg.getServerTime());
        out.writeBytes(LINE.getBytes());
    }
}
複製代碼

encode 中傳入的第一個參數就是當前持有 DataEncoder 的 context,第二個參數 msg 就是咱們在 BusinessHandler 中建立的 Data 對象,第三個參數 out 就是經過 allocateBuffer()建立出來的一塊堆外內存

encode()方法執行完成之後,再次回到父類的 write()中,因爲已經對 msg 對象編碼完成了,因此後面該對象沒有任何用處了,能夠直接釋放該對象。

若是編碼成功,buf 中就會有數據,若是有數據,buf.isReadable()會返回 true,那麼就接着調用 write()方法了,最終會調用父類 AbstractChannelHandlerContext 的 write()方法。若是沒有數據被編碼,那麼就將以前申請的 ByteBuf 內存釋放,而後依舊是調用父類 AbstractChannelHandlerContext 的 write()方法,只不過傳入的 msg 對象是一個空的 ByteBuf。

最終又會調用到 AbstractChannelHandlerContext 類中的 write(msg,flush,promise)方法(在前面分析 tail 節點的時候已經貼出了該方法的源碼),與以前不一樣的是,此時傳入的第二個參數 flush 爲 false,表示不調用 flush。一樣先經過 pipeline 找到下一個 Outbound 節點,對於本文 demo 中,下一個節點就是 Head 節點,因爲 flush 傳入的是 false,所以會調用 next.invokeWrite(m, promise),也就是最終會調用 head 節點的 write()方法。head 節點的 write()方法源碼以下。

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    // 對於客戶端Channel而言,unsafe對象是NioSocketChannelUnsafe類型的實例
    unsafe.write(msg, promise);
}
複製代碼

能夠看到,在 head 節點中最終是經過 unsafe 對象來寫數據的,此時因爲 channel 是 NioSocketChannel,因此 unsafe 是 NioSocketChannelUnsafe 對象實例。最終調用的是 AbstractChannel 類的 write()方法。源碼以下。

public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    // 非空判斷
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, newClosedChannelException(initialCloseCause));
        ReferenceCountUtil.release(msg);
        return;
    }
    int size;
    try {
        // 將msg轉化成DirectByteBuf
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        // 異常處理
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    // 將待寫的數據添加到待寫隊列中,待寫隊列是一個entry組成的鏈表
    outboundBuffer.addMessage(msg, size, promise);
}
複製代碼

這該方法中有兩個比較重要的邏輯,第一處邏輯:若是 msg 不是堆外內存,會先將 msg 轉換爲 DirectByteBuf 類型(也就是堆外內存);第二處邏輯:也就是該方法的最後一行,這一行代碼的做用是啥呢?將待寫的數據添加到待寫隊列中,待寫隊列是一個 entry 組成的鏈表,被存放在 ChannelOutboundBuffer 這個緩衝區中。下面詳細分析一下這一行代碼的實現原理。

netty 在發送數據時,是先 write(),而後再 flush(),這樣纔會把數據發送到操做系統的套接字中。若是僅僅是調用 write()方法,數據是不會被髮送出去的,而是先存放在 ChannelOutboundBuffer 緩衝區,該緩衝區裏面維護了一個隊列,每調用一次 write()方法,就向隊列中添加一個 entry。對於同一個 NioSocketChannel 而言,可能存在一邊調用 write(),一邊調用 flush()方法,所以就須要區分出這個緩衝隊列中,哪些數據是剛剛 write 進來的,哪些數據是已經被 flush 過了,怎麼區分呢?netty 爲這個緩衝區提供了兩個指針:flushedEntryunflushedEntry,分別指向的是第一個已經被 flush 的數據第一個沒有被 flush 的數據(注意:這裏強調一下一個詞:第一個,由於可能連續有還幾條數據等待被 flush 或者已經被 flush,因此指針指向的是第一個),另外因爲是隊列,咱們須要維護一個指向隊列頭部的指針或者尾部的指針,所以還提供了一個 tailEntry 指針用來指向隊列的尾部。

初始狀態下,這三個指針均指向 null,由於尚未數據沒寫入緩衝隊列。當調用 write()方法和 flush()方法後,三個指針的指向變化關係以下圖所示。

待寫隊列指針指向示意圖

調用 write()方法時,最終會調用 outboundBuffer.addMessage()方法來移動這三個指針,下面看下該方法的源碼。

public void addMessage(Object msg, int size, ChannelPromise promise) {
    // 將要寫的數據封裝成一個Entry
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    // 移動相關的指針
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // 累計要寫的字節數
    incrementPendingOutboundBytes(entry.pendingSize, false);
}
複製代碼

addMessage()方法的邏輯大體能夠分爲兩部分,第一部分:將傳入的數據封裝成一個 Entry,而後移動三個指針,至於如何移動的,能夠參考上面的示意圖,那樣會更清晰。第二部分:累計寫數據的字節數,由於操做系統的套接字緩衝區會有限制,咱們不能不停地向緩衝區中寫入數據,所以 netty 在寫數據時,會累加當前要發送數據的字節數,若是超過了限制,就會將當前 channel 設置爲不可寫狀態,直到要寫的數據量低於某個值後,channel 的狀態又會被設置成可寫狀態。incrementPendingOutboundBytes()方法的源碼以下。

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    // 累加字節數
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    // 若是待寫的數量超過了設置的最高水位線,那麼就會將channel設置爲不可寫狀態
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}
複製代碼

從代碼中能夠看到,若是累計的字節數超過了緩衝區配置的高水位線,那麼就會將 channel 設置爲不可寫狀態。和高水位線相對應的是低水位線,當要寫的字節數低於低水位線時,channel 會被從新設置成可寫狀態。何時會判斷低水位線呢?那就是在調用 flush()方法時,flush()方法會在下一篇文章中詳細分析。

在 netty 中,默認的高水位線的值是 64KB,低水位線是 32KB。也就是說,當累計要發送的數據大於 64KB 時,channel 會被暫時設置爲不可寫狀態,直到 channel 緩衝區中的數據低於 32KB 時,會從新變爲可寫狀態。

private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
複製代碼

總結

至此,關於寫數據時的 write()方法的源碼就分析完了。

最後總結一下,當調用 channel 的 writeAndFlush()方法發送數據時,會從 pipleline 的 tail 節點開始向前傳播執行 writeAndFlush()方法。先從 tail 節點向前依次調用 OutBound 類型 handler 的 write()方法,對於編碼器而言,在 write()方法中會調用 encode()方法,將數據進行編碼,最終 write()方法會被傳播執行到 head 節點中,在 head 節點中,會將要寫的數據存儲到緩衝區,這個緩衝區是由一個隊列組成的,由 3 個指針來維護關聯關係,每當調用 write()方法或者 flush()方法時,會移動這 3 個指針。最後當等待發送的數據超過 64KB 時,channel 會暫時變爲不可寫狀態,直到堆積的數據量低於 32KB 時,纔會從新變爲可寫狀態。

關於 flush()方法的源碼會在下一篇文章中詳細分析。

推薦

微信公衆號
相關文章
相關標籤/搜索