注意:如無特別說明,文中的Channel都指的是Netty Channel(io.netty.channel)html
一週時間的Channel家族
學習,一度讓我懷疑人生——研究這個方法有沒有用?學習Netty是否是有點兒下了高速走鄉間小路的意思?我爲啥要讀源碼?
之因此產生這些疑問,除了我自己心理活動豐富之外,主要病因在於沒搞清楚Channel在Netty體系中的定位。而沒能清晰理解Netty的定位,也默默的送出了一記助攻。java
做些本質思考:Netty是一個NIO框架,是一個嫁接在java NIO基礎上的框架。react
宏觀上能夠這麼理解,見下圖:git
先不急着聊Channel,回顧下IO演進過程,重點關注IO框架的結構變化。搞懂了這部分後,咱們將明白Channel在IO世界中扮演的角色!github
此圖展現的已經算是優化後的BIO了——用到了線程池。顯然,每個client都須要server端付出一個Thread的代價,即便你經過線程池作了優化,因爲受到線程個數的制約,激增的客戶端依舊錶現的「慾求不滿」。api
與NIO模型類似,固然也就有和NIO一樣的問題:selector/reactor單個線程處理多個channel的各類操做,若是其中一個channel的事件處理延緩了,將影響其它channel。promise
將read/write這種io處理操做分隔出來,非io型操做(業務操做)配備以線程池,進化成reactor多線程模型:服務器
這樣的架構,系統瓶頸轉移至Reactor部分。而目前勞苦功高的Reactor做了兩件事:
1.接收客戶端連接請求
2.處理IO型讀寫操做網絡
將接收client連接的功能再次拆分出來:多線程
Netty偏偏就是主從Reactor模型的實踐者,想一想服務端建立時的代碼:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) ...
從nio時代的模型圖上開始出現channel(java channel),它的定位就是進行諸如connect、write、read、close等底層交互。歸納一下,java channel是上承selector下連socket的存在。而netty channel,則把java channel看成了底層。
清楚了Channel的定位,接下來對其經常使用api進行分析。
首先拍出類圖:
其實Channel內部還有一套體系,Unsafe
家族:
Unsafe是Channel的內置類(接口),與java channel交互的重任最終會落到Unsafe身上。
write
只是將數據寫入到了ChannelOutboundBuffer
中,並無真正的發送出去,到flush
方法調用時,才寫入到java channel中發送給對方。
下面列出AbstractChannel
的write方法,值得關注的地方已打上中文註釋:
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); //做消息的包裝,轉換成ByteBuf等 size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); //msg消息寫入ChannelOutboundBuffer }
上述代碼最後一行,msg寫入了ChannelOutboundBuffer的尾節點tailEntry
,同時將unflushedEntry
賦值暫存。代碼展開以下:
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { //註釋1、標記成「未刷新」的數據 unflushedEntry = entry; } incrementPendingOutboundBytes(entry.pendingSize, false); }
這裏對ChannelOutboundBuffer
類進行簡單說明,按慣例先看類註釋。
/** * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * outbound write requests. * * 省略... */
前文提到過,write方法將消息寫到ChannelOutboundBuffer,算是數據暫存;以後的flush
再將消息刷到java channel乃至客戶端。
來張示意圖,方便理解:
圖中列出的三個屬性,在write->ChannelOutboundBuffer->flush
的數據流轉過程當中比較關鍵。Entry是啥?ChannelOutboundBuffer的靜態內部類,典型的鏈表結構數據:
static final class Entry { Entry next; // 省略... }
write方法的最後部分(註釋一位置)調用outboundBuffer.addMessage(msg, size, promise)
,已將封裝msg的Entry賦值給tailEntry和unflushedEntry
;而flush方法,經過調用outboundBuffer.addFlush()
(下文,註釋二位置),將unflushedEntry間接賦值給了flushedEntry
。
public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }
直接從AbstractChannel的flush方法開始(若以Channel的flush爲開端會經pipeline,將有很長調用鏈,省略):
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); //註釋2、標記成「已刷新」數據 flush0(); //數據處理 }
outboundBuffer.addFlush()
方法已經分析過了,跟蹤調用鏈flush0->doWrite
,咱們看下AbstractNioByteChannel的doWrite
方法:
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = config().getWriteSpinCount(); //自旋計數,限制循環次數,默認16 do { Object msg = in.current(); //flushedEntry的msg if (msg == null) { // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } writeSpinCount -= doWriteInternal(in, msg); } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }
writeSpinCount
是個自旋計數,相似於自旋鎖的設定,防止當前IO線程因爲網絡等緣由無盡執行寫操做,而使得線程假死,形成資源浪費。
觀察doWriteInternal
方法,關鍵處依舊中文註釋伺候:
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (!buf.isReadable()) { //writerIndex - readerIndex >0 ? true: flase in.remove(); return 0; } final int localFlushedAmount = doWriteBytes(buf); //返回實際寫入到java channel的字節數 if (localFlushedAmount > 0) { //寫入成功 in.progress(localFlushedAmount); /** * 1.已經所有寫完,執行in.remove() * 2.「寫半包」場景,直接返回1。 * 外層方法的自旋變量writeSpinCount遞減成15,輪詢再次執行本方法 */ if (!buf.isReadable()) { in.remove(); } return 1; } } else if (msg instanceof FileRegion) { //「文件型」消息處理邏輯省略.. } else { // Should not reach here. throw new Error(); } return WRITE_STATUS_SNDBUF_FULL; //發送緩衝區滿,值=Integer.MAX_VALUE }
回到doWrite方法,最後執行了incompleteWrite(writeSpinCount < 0)
:
protected final void incompleteWrite(boolean setOpWrite) { // Did not write completely. if (setOpWrite) { setOpWrite(); } else { // Schedule flush again later so other tasks can be picked up in the meantime Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask); } }
這裏的設定挺有意思:
protected final void setOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } }
其實就是設置SelectionKey的OP_WRITE操做位,在selector/reactor下次輪詢的時候,將再次執行寫操做
public Channel flush() { pipeline.flush(); return this; }
結論:前者因爲發送緩衝區滿,已沒法寫入數據,因而繼但願於selector的下次輪詢;後者則可能只是由於自旋次數少,引發的數據發送不徹底,直接將任務再次放入pipeline,而無需等待selector。
這無疑是種優化,細節之處,功力盡顯!