Netty源碼分析第五章: ByteBufhtml
第十節: SocketChannel讀取數據過程java
咱們第三章分析過客戶端接入的流程, 這一小節帶你們剖析客戶端發送數據, Server讀取數據的流程:redis
首先舒適提示, 這一小節高度耦合第三章的第1, 2節的內容, 不少知識這裏並不會重複講解, 若是對以前的知識印象不深入建議惡補第三章的第1, 2節的內容以後再學習這一小節api
咱們首先看NioEventLoop的processSelectedKey方法:oop
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //獲取到channel中的unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //若是這個key不是合法的, 說明這個channel可能有問題
if (!k.isValid()) { //代碼省略
} try { //若是是合法的, 拿到key的io事件
int readyOps = k.readyOps(); //連接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //寫事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //讀事件和接受連接事件 //若是當前NioEventLoop是work線程的話, 這裏就是op_read事件 //若是是當前NioEventLoop是boss線程的話, 這裏就是op_accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 這裏的判斷表示輪詢到大事件是op_read或者op_accept事件源碼分析
以前的章節分析過, 若是當前NioEventLoop是work線程的話, 那麼這裏就是op_read事件, 也就是讀事件, 表示客戶端發來了數據流學習
這裏會調用unsafe的redis()方法進行讀取spa
若是是work線程, 那麼這裏的channel是NioServerSocketChannel, 其綁定的unsafe是NioByteUnsafe, 這裏會走進NioByteUnsafe的read()方法中:.net
public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
首先獲取SocketChannel的config, pipeline等相關屬性線程
final ByteBufAllocator allocator = config.getAllocator(); 這一步是獲取一個ByteBuf的內存分配器, 用於分配ByteBuf
這裏會走到DefaultChannelConfig的getAllocator方法中:
public ByteBufAllocator getAllocator() { return allocator; }
這裏返回的DefualtChannelConfig的成員變量, 咱們看這個成員變量:
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
這裏調用ByteBufAllocator的屬性DEFAULT, 跟進去:
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
咱們看到這裏又調用了ByteBufUtil的靜態屬性DEFAULT_ALLOCATOR, 再跟進去:
static final ByteBufAllocator DEFAULT_ALLOCATOR;
DEFAULT_ALLOCATOR這個屬性是在static塊中初始化的
咱們跟到static塊中:
static { String allocType = SystemPropertyUtil.get( "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim(); ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { alloc = UnpooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else if ("pooled".equals(allocType)) { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); } DEFAULT_ALLOCATOR = alloc; //代碼省略
}
首先判斷運行環境是否是安卓, 若是不是安卓, 在返回"pooled"字符串保存在allocType中
而後經過if判斷, 最後局部變量alloc = PooledByteBufAllocator.DEFAULT, 最後將alloc賦值到成員變量DEFAULT_ALLOCATOR
咱們跟到PooledByteBufAllocator的DEFAULT屬性中:
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
咱們看到這裏直接經過new的方式, 建立了一個PooledByteBufAllocator對象, 也就是基於申請一塊連續內存進行緩衝區分配的緩衝區分配器
緩衝區分配器的知識, 咱們以前小節進行了詳細的剖析, 這裏就再也不贅述
回到NioByteUnsafe的read()方法中:
public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
這裏 ByteBufAllocator allocator = config.getAllocator()中的allocator , 就是PooledByteBufAllocator
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle() 是建立一個handle, 咱們以前的章節講過, handle是對RecvByteBufAllocator進行實際操做的對象
咱們跟進recvBufAllocHandle:
public RecvByteBufAllocator.Handle recvBufAllocHandle() { //若是不存在, 則建立一個handle的實例
if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }
這裏是咱們以前剖析過的邏輯, 若是不存在, 則建立handle的實例, 具體建立過程咱們能夠回顧第三章的第二小節, 這裏就再也不贅述
一樣allocHandle.reset(config)是將配置重置, 第三章的第二小節也對其進行過剖析
重置完配置以後, 進行do-while循環, 有關循環終止條件allocHandle.continueReading(), 以前小節也有過詳細剖析, 這裏也再也不贅述
在do-while循環中, 首先看 byteBuf = allocHandle.allocate(allocator) 這一步, 這裏傳入了剛纔建立的allocate對象, 也就是PooledByteBufAllocator:
這裏會跑到DefaultMaxMessagesRecvByteBufAllocator類的allocate方法中:
public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(guess()); }
這裏的guess方法, 會調用AdaptiveRecvByteBufAllocator的guess方法:
public int guess() { return nextReceiveBufferSize; }
這裏會返回AdaptiveRecvByteBufAllocator的成員變量nextReceiveBufferSize, 也就是下次所分配緩衝區的大小, 根據咱們以前學習的內容, 第一次分配的時候會分配初始大小, 也就是1024字節
回到DefaultMaxMessagesRecvByteBufAllocator類的allocate方法中:
這樣, alloc.ioBuffer(guess())就會分配一個PooledByteBuf
咱們跟到AbstractByteBufAllocator的ioBuffer方法中:
public ByteBuf ioBuffer(int initialCapacity) { if (PlatformDependent.hasUnsafe()) { return directBuffer(initialCapacity); } return heapBuffer(initialCapacity); }
這裏首先判斷是否能獲取jdk的unsafe對象, 默認爲true, 因此會走到directBuffer(initialCapacity)中, 這裏最終會分配一個PooledUnsafeDirectByteBuf對象, 具體分配流程咱們再以前小節作過詳細剖析
回到NioByteUnsafe的read()方法中:
分配完了ByteBuf以後, 再看這一步allocHandle.lastBytesRead(doReadBytes(byteBuf)):
首先看參數doReadBytes(byteBuf)方法, 這步是將channel中的數據讀取到咱們剛分配的ByteBuf中, 並返回讀取到的字節數
這裏會調用到NioSocketChannel的doReadBytes方法:
protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
首先拿到綁定在channel中的handler, 由於咱們已經建立了handle, 因此這裏會直接拿到
再看allocHandle.attemptedBytesRead(byteBuf.writableBytes())這步, byteBuf.writableBytes()返回byteBuf的可寫字節數, 也就是最多能從channel中讀取多少字節寫到ByteBuf, allocate的attemptedBytesRead會把可寫字節數設置到DefaultMaxMessagesRecvByteBufAllocator 類的attemptedBytesRead屬性中
跟到DefaultMaxMessagesRecvByteBufAllocator中的attemptedBytesRead咱們會看到:
public void attemptedBytesRead(int bytes) { attemptedBytesRead = bytes; }
繼續看doReadBytes方法:
最後, 經過byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead())將jdk底層的channel中的數據寫入到咱們建立的ByteBuf中, 並返回實際寫入的字節數
回到NioByteUnsafe的read()方法中:
繼續看allocHandle.lastBytesRead(doReadBytes(byteBuf))這步
剛纔咱們剖析過doReadBytes(byteBuf)返回的是世界寫入ByteBuf的字節數
再看lastBytesRead方法, 跟到DefaultMaxMessagesRecvByteBufAllocator的lastBytesRead方法中:
public final void lastBytesRead(int bytes) { lastBytesRead = bytes; totalBytesRead += bytes; if (totalBytesRead < 0) { totalBytesRead = Integer.MAX_VALUE; } }
這裏會賦值兩個屬性, lastBytesRead表明最後讀取的字節數, 這裏賦值爲咱們剛纔寫入ByteBuf的字節數, totalBytesRead表示總共讀取的字節數, 這裏將寫入的字節數追加
繼續看NioByteUnsafe的read()方法:
若是最後一次讀取數據爲0, 說明已經將channel中的數據所有讀取完畢, 將新建立的ByteBuf釋放循環利用, 並跳出循環
allocHandle.incMessagesRead(1)這步是增長消息的讀取次數, 由於咱們循環最多16次, 因此當增長消息次數增長到16會結束循環
讀取完畢以後, 會經過pipeline.fireChannelRead(byteBuf)將傳遞channelRead事件, 有關channelRead事件, 咱們在第四章也進行了詳細的剖析
這裏讀者會有疑問, 若是一次讀取不完, 就傳遞channelRead事件, 那麼server接收到的數據有可能就是不完整的, 其實關於這點, netty也作了相應的處理, 咱們會在以後的章節詳細剖析netty的半包處理機制
循環結束後, 會執行到allocHandle.readComplete()這一步
咱們知道第一次分配ByteBuf的初始容量是1024, 可是初始容量不必定必定知足全部的業務場景, netty中, 將每次讀取數據的字節數進行記錄, 而後以後次分配ByteBuf的時候, 容量會盡量的符合業務場景所須要大小, 具體實現方式, 就是在readComplete()這一步體現的
咱們跟到AdaptiveRecvByteBufAllocator的readComplete()方法中:
public void readComplete() { record(totalBytesRead()); }
這裏調用了record方法, 而且傳入了這一次所讀取的字節總數
跟到record方法中:
private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { if (decreaseNow) { index = Math.max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) { index = Math.min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } }
首先看判斷條件 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)])
這裏index是當前分配的緩衝區大小所在的SIZE_TABLE中的索引, 將這個索引進行縮進, 而後根據縮進後的因此找出SIZE_TABLE中所存儲的內存值, 再判斷是否大於等於此次讀取的最大字節數, 若是條件成立, 說明分配的內存過大, 須要縮容操做, 咱們看if塊中縮容相關的邏輯
首先 if (decreaseNow) 會判斷是否馬上進行收縮操做, 一般第一次不會進行收縮操做, 而後會將decreaseNow設置爲true, 表明下一次直接進行收縮操做
假設須要馬上進行收縮操做, 咱們看收縮操做的相關邏輯:
index = Math.max(index - INDEX_DECREMENT, minIndex) 這一步將索引縮進一步, 但不能小於最小索引值
而後經過 nextReceiveBufferSize = SIZE_TABLE[index] 獲取設置索引以後的內存, 賦值在nextReceiveBufferSize, 也就是下次須要分配的大小, 下次就會根據這個大小分配ByteBuf了, 這樣就實現了縮容操做
再看 else if (actualReadBytes >= nextReceiveBufferSize)
這裏判斷此次讀取字節的總量比上次分配的大小還要大, 則進行擴容操做
擴容操做也很簡單, 索引步進, 而後拿到步進後的索引所對應的內存值, 做爲下次所須要分配的大小
再NioByteUnsafe的read()方法中:
通過了縮容或者擴容操做以後, 經過pipeline.fireChannelReadComplete()傳播ChannelReadComplete()事件
以上就是讀取客戶端消息的相關流程
第五章總結
本章主要剖析了ByteBuf的基本操做以及緩衝區分配等相關知識.
緩衝區分配, 分爲經過調用jdk的api的方式和分配一塊連續內存的方式
其中, 經過分配連續內存的方式分配緩衝區中, 又介紹了在page級別分配的邏輯和在subpage級別分配的邏輯
page級別分配時經過操做內存二叉樹的方式記錄分配狀況
subpage級別分配是經過位圖的方式記錄分配狀況
最後介紹了NioSocketChannel處理讀事件的相關邏輯
整體來講, 這一章的內容難度是比較大的, 但願同窗課後經過多調試的方式進行熟練掌握