【源起Netty 正傳】Netty Channel

Channel定位

注意:如無特別說明,文中的Channel都指的是Netty Channel(io.netty.channel)html

一週時間的Channel家族學習,一度讓我懷疑人生——研究這個方法有沒有用?學習Netty是否是有點兒下了高速走鄉間小路的意思?我爲啥要讀源碼?
之因此產生這些疑問,除了我自己心理活動豐富之外,主要病因在於沒搞清楚Channel在Netty體系中的定位。而沒能清晰理解Netty的定位,也默默的送出了一記助攻。java

做些本質思考:Netty是一個NIO框架,是一個嫁接在java NIO基礎上的框架react

宏觀上能夠這麼理解,見下圖:git

clipboard.png

先不急着聊Channel,回顧下IO演進過程,重點關注IO框架的結構變化。搞懂了這部分後,咱們將明白Channel在IO世界中扮演的角色!github

進擊的IO

BIO

clipboard.png

此圖展現的已經算是優化後的BIO了——用到了線程池。顯然,每個client都須要server端付出一個Thread的代價,即便你經過線程池作了優化,因爲受到線程個數的制約,激增的客戶端依舊錶現的「慾求不滿」。api

NIO

clipboard.png

  • Acceptor註冊Selector,監聽accept事件
  • 當客戶端鏈接後,觸發accept事件
  • 服務器構建對應的Channel,並在其上註冊Selector,監聽讀寫事件
  • 當發生讀寫事件後,進行相應的讀寫處理

Reactor單線程

clipboard.png

與NIO模型類似,固然也就有和NIO一樣的問題:selector/reactor單個線程處理多個channel的各類操做,若是其中一個channel的事件處理延緩了,將影響其它channel。promise

Reactor多線程

將read/write這種io處理操做分隔出來,非io型操做(業務操做)配備以線程池,進化成reactor多線程模型:服務器

clipboard.png

這樣的架構,系統瓶頸轉移至Reactor部分。而目前勞苦功高的Reactor做了兩件事:
1.接收客戶端連接請求
2.處理IO型讀寫操做網絡

主從Reactor

將接收client連接的功能再次拆分出來:多線程

clipboard.png

Netty偏偏就是主從Reactor模型的實踐者,想一想服務端建立時的代碼:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)

...

從nio時代的模型圖上開始出現channel(java channel),它的定位就是進行諸如connect、write、read、close等底層交互。歸納一下,java channel是上承selector下連socket的存在。而netty channel,則把java channel看成了底層

源碼分析

類結構

清楚了Channel的定位,接下來對其經常使用api進行分析。

首先拍出類圖:
clipboard.png

其實Channel內部還有一套體系,Unsafe家族:
clipboard.png

Unsafe是Channel的內置類(接口),與java channel交互的重任最終會落到Unsafe身上。

write方法

write只是將數據寫入到了ChannelOutboundBuffer中,並無真正的發送出去,到flush方法調用時,才寫入到java channel中發送給對方。

下面列出AbstractChannel的write方法,值得關注的地方已打上中文註釋:

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of the rest
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);
        return;
    }

    int size;
    try {
        msg = filterOutboundMessage(msg);   //做消息的包裝,轉換成ByteBuf等
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);    //msg消息寫入ChannelOutboundBuffer
}

上述代碼最後一行,msg寫入了ChannelOutboundBuffer的尾節點tailEntry,同時將unflushedEntry賦值暫存。代碼展開以下:

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {    //註釋1、標記成「未刷新」的數據
        unflushedEntry = entry;
    }

    incrementPendingOutboundBytes(entry.pendingSize, false);
}

ChannelOutboundBuffer類

這裏對ChannelOutboundBuffer類進行簡單說明,按慣例先看類註釋。

/**
 * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
 * outbound write requests.
 *
 *  省略...
 */

前文提到過,write方法將消息寫到ChannelOutboundBuffer,算是數據暫存;以後的flush再將消息刷到java channel乃至客戶端。

來張示意圖,方便理解:
clipboard.png

圖中列出的三個屬性,在write->ChannelOutboundBuffer->flush的數據流轉過程當中比較關鍵。Entry是啥?ChannelOutboundBuffer的靜態內部類,典型的鏈表結構數據:

static final class Entry {

        Entry next;
        
        // 省略...
}

write方法的最後部分(註釋一位置)調用outboundBuffer.addMessage(msg, size, promise),已將封裝msg的Entry賦值給tailEntry和unflushedEntry;而flush方法,經過調用outboundBuffer.addFlush()(下文,註釋二位置),將unflushedEntry間接賦值給了flushedEntry

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}

flush方法

直接從AbstractChannel的flush方法開始(若以Channel的flush爲開端會經pipeline,將有很長調用鏈,省略):

public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }

    outboundBuffer.addFlush();    //註釋2、標記成「已刷新」數據
    flush0();    //數據處理
}

outboundBuffer.addFlush()方法已經分析過了,跟蹤調用鏈flush0->doWrite,咱們看下AbstractNioByteChanneldoWrite方法:

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = config().getWriteSpinCount();    //自旋計數,限制循環次數,默認16
    do {
        Object msg = in.current();    //flushedEntry的msg
        if (msg == null) {
            // Wrote all messages.
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }
        writeSpinCount -= doWriteInternal(in, msg);
    } while (writeSpinCount > 0);

    incompleteWrite(writeSpinCount < 0);
}

writeSpinCount是個自旋計數,相似於自旋鎖的設定,防止當前IO線程因爲網絡等緣由無盡執行寫操做,而使得線程假死,形成資源浪費

觀察doWriteInternal方法,關鍵處依舊中文註釋伺候:

private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (!buf.isReadable()) {    //writerIndex - readerIndex >0 ? true: flase
            in.remove();
            return 0;
        }

        final int localFlushedAmount = doWriteBytes(buf);   //返回實際寫入到java channel的字節數
        if (localFlushedAmount > 0) {   //寫入成功
            in.progress(localFlushedAmount);
            /**
             * 1.已經所有寫完,執行in.remove()
             * 2.「寫半包」場景,直接返回1。
             *   外層方法的自旋變量writeSpinCount遞減成15,輪詢再次執行本方法
             */
            if (!buf.isReadable()) {
                in.remove();
            }
            return 1;
        }
    } else if (msg instanceof FileRegion) {
    
        //「文件型」消息處理邏輯省略..
        
    } else {
        // Should not reach here.
        throw new Error();
    }
    return WRITE_STATUS_SNDBUF_FULL;    //發送緩衝區滿,值=Integer.MAX_VALUE
}

回到doWrite方法,最後執行了incompleteWrite(writeSpinCount < 0)

protected final void incompleteWrite(boolean setOpWrite) {
    // Did not write completely.
    if (setOpWrite) {
        setOpWrite();
    } else {
        // Schedule flush again later so other tasks can be picked up in the meantime
        Runnable flushTask = this.flushTask;
        if (flushTask == null) {
            flushTask = this.flushTask = new Runnable() {
                @Override
                public void run() {
                    flush();
                }
            };
        }
        eventLoop().execute(flushTask);
    }
}

這裏的設定挺有意思:

  • 若是 setOpWrite = writeSpinCount < 0 = true,即 doWriteInternal方法返回值 = WRITE_STATUS_SNDBUF_FULL(發送緩衝區滿)時,設置寫操做位:
protected final void setOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
        key.interestOps(interestOps | SelectionKey.OP_WRITE);
    }
}

其實就是設置SelectionKey的OP_WRITE操做位,在selector/reactor下次輪詢的時候,將再次執行寫操做

  • 若是 setOpWrite = writeSpinCount < 0 = false,即 doWriteInternal方法返回值 = 1,16次寫半包仍舊沒將消息發送出去,則經過定時器再次執行flush:
public Channel flush() {
    pipeline.flush();
    return this;
}

結論:前者因爲發送緩衝區滿,已沒法寫入數據,因而繼但願於selector的下次輪詢;後者則可能只是由於自旋次數少,引發的數據發送不徹底,直接將任務再次放入pipeline,而無需等待selector。
這無疑是種優化,細節之處,功力盡顯!

感謝

相關文章
相關標籤/搜索