Netty源碼分析第三章: 客戶端接入流程html
第二節: 處理接入事件之handle的建立數組
上一小節咱們剖析完成了與channel綁定的ChannelConfig初始化相關的流程, 這一小節繼續剖析客戶端鏈接事件的處理oop
回到上一章NioEventLoop的processSelectedKey ()方法:源碼分析
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //獲取到channel中的unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //若是這個key不是合法的, 說明這個channel可能有問題
if (!k.isValid()) { //代碼省略
} try { //若是是合法的, 拿到key的io事件
int readyOps = k.readyOps(); //連接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //寫事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //讀事件和接受連接事件 //若是當前NioEventLoop是work線程的話, 這裏就是op_read事件 //若是是當前NioEventLoop是boss線程的話, 這裏就是op_accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
咱們看其中的if判斷:this
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
上一小節咱們分析過, 若是當前NioEventLoop是work線程的話, 這裏就是op_read事件, 若是是當前NioEventLoop是boss線程的話, 這裏就是op_accept事件, 這裏咱們以boss線程爲例進行分析spa
以前咱們講過, 不管處理op_read事件仍是op_accept事件, 都走的unsafe的read()方法, 這裏unsafe是經過channel拿到, 咱們知道若是是處理accept事件, 這裏的channel是NioServerSocketChannel, 這裏與之綁定的unsafe是NioMessageUnsafe線程
咱們跟到NioMessageUnsafe的read()方法:rest
public void read() { //必須是NioEventLoop方法調用的, 不能經過外部線程調用
assert eventLoop().inEventLoop(); //服務端channel的config
final ChannelConfig config = config(); //服務端channel的pipeline
final ChannelPipeline pipeline = pipeline(); //處理服務端接入的速率
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //設置配置
allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //建立jdk底層的channel //readBuf用於臨時承載讀到連接
int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器將讀到的連接進行計數
allocHandle.incMessagesRead(localRead); //鏈接數是否超過最大值
} while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍歷每一條客戶端鏈接
for (int i = 0; i < size; i ++) { readPending = false; //傳遞事件, 將建立NioSokectChannel進行傳遞 //最終會調用ServerBootstrap的內部類ServerBootstrapAcceptor的channelRead()方法
pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代碼省略
} finally { //代碼省略
} }
首先獲取與NioServerSocketChannel綁定config和pipeline, config咱們上一小節進行分析過, pipeline咱們將在下一章進行剖析code
咱們看這一句:htm
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
這裏經過RecvByteBufAllocator接口調用了其內部接口Handler
咱們看其RecvByteBufAllocator接口:
public interface RecvByteBufAllocator { Handle newHandle(); interface Handle { int guess(); void reset(ChannelConfig config); void incMessagesRead(int numMessages); void lastBytesRead(int bytes); int lastBytesRead(); void attemptedBytesRead(int bytes); int attemptedBytesRead(); boolean continueReading(); void readComplete(); } }
咱們看到RecvByteBufAllocator接口只有一個方法newHandle(), 顧名思義就是用於建立Handle對象的方法, 而Handle中的方法, 纔是實際用於操做的方法
在RecvByteBufAllocator實現類中包含Handle的子類, 具體實現關係以下:
3-2-1
回到read()方法中再看這段代碼:
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
unsafe()返回當前channel綁定的unsafe對象, recvBufAllocHandle()最終會調用AbstractChannel內部類AbstractUnsafe的recvBufAllocHandle()方法
跟進AbstractUnsafe的recvBufAllocHandle()方法:
public RecvByteBufAllocator.Handle recvBufAllocHandle() { //若是不存在, 則建立一個recvHandle的實例
if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }
若是若是是第一次執行到這裏, 自身屬性recvHandle爲空, 會建立一個recvHandle實例, config()返回NioServerSocketChannel綁定的ChannelConfig, getRecvByteBufAllocator()獲取其RecvByteBufAllocator對象, 這兩部分上一小節剖析過了, 這裏經過newHandle()建立一個Handle, 這裏會走到AdaptiveRecvByteBufAllocator類中的newHandle()方法中
跟進newHandle()方法中:
public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); }
這裏建立HandleImpl傳入了三個參數, 這三個參數咱們上一小節剖析過, minIndex爲最小內存在SIZE_TABLE中的下標, maxIndex爲最大內存在SEIZE_TABEL中的下標, initial是初始內存, 咱們跟到HandleImpl的構造方法中:
public HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; }
初始化minIndex和maxIndex, 根據initial找到當前的下標, nextReceiveBufferSize是根據當前的下標找到對應的內存
這樣, 咱們就建立了個Handle對象
在這裏咱們須要知道, 這個handle, 是和channel惟一綁定的屬性, 而AdaptiveRecvByteBufAllocator對象是和ChannelConfig對象惟一綁定的, 間接也是和channel進行惟一綁定
繼續回到read()方法:
public void read() { //必須是NioEventLoop方法調用的, 不能經過外部線程調用
assert eventLoop().inEventLoop(); //服務端channel的config
final ChannelConfig config = config(); //服務端channel的pipeline
final ChannelPipeline pipeline = pipeline(); //處理服務端接入的速率
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //設置配置
allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //建立jdk底層的channel //readBuf用於臨時承載讀到連接
int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器將讀到的連接進行計數
allocHandle.incMessagesRead(localRead); //鏈接數是否超過最大值
} while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍歷每一條客戶端鏈接
for (int i = 0; i < size; i ++) { readPending = false; //傳遞事件, 將建立NioSokectChannel進行傳遞 //最終會調用ServerBootstrap的內部類ServerBootstrapAcceptor的channelRead()方法
pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代碼省略
} finally { //代碼省略
} }
繼續往下跟:
allocHandle.reset(config);
這個段代碼是從新設置配置, 也就是將以前的配置信息進行初始化, 最終會走到, DefaultMaxMessagesRecvByteBufAllocator中的內部類MaxMessageHandle的reet中
咱們跟進reset中:
public void reset(ChannelConfig config) { this.config = config; maxMessagePerRead = maxMessagesPerRead(); totalMessages = totalBytesRead = 0; }
這裏僅僅對幾個屬性作了賦值, 簡單介紹下這幾個屬性:
config:當前channelConfig對象
maxMessagePerRead:表示讀取消息的時候能夠讀取幾回(循環次數), maxMessagesPerRead()返回的是RecvByteBufAllocator的maxMessagesPerRead屬性, 上一小節已經作過剖析
totalMessages:表明目前讀循環已經讀取的消息個數, 在NIO傳輸模式下也就是已經執行的循環次數, 這裏初始化爲0
totalBytesRead:表明目前已經讀取到的消息字節總數, 這裏一樣也初始化爲0
咱們繼續往下走, 這裏首先是一個do-while循環, 循環體裏經過int localRead = doReadMessages(readBuf)這種方式將讀取到的鏈接數放入到一個List集合中, 這一步咱們下一小節再分析, 咱們繼續往下走:
咱們首先看allocHandle.incMessagesRead(localRead)這一步, 這裏的localRead表示此次循環往readBuf中放入的鏈接數, 在Nio模式下這, 若是讀取到一條鏈接會返回1
跟到中的MaxMessageHandle的incMessagesRead(int amt)方法中:
public final void incMessagesRead(int amt) { totalMessages += amt; }
這裏將totalMessages增長amt, 也就是+1
這裏totalMessage, 剛纔已經剖析過, 在NIO傳輸模式下也就是已經執行的循環次數, 這裏每次執行一次循環都會加一
再去看循環終止條件allocHandle.continueReading()
跟到MaxMessageHandle的continueReading()方法中:
public boolean continueReading() { //config.isAutoRead()默認返回true // totalMessages < maxMessagePerRead //totalMessages表明當前讀到的連接, 默認是1 //maxMessagePerRead每一次最大讀多少連接(默認16)
return config.isAutoRead() && attemptedBytesRead == lastBytesRead && totalMessages < maxMessagePerRead && totalBytesRead < Integer.MAX_VALUE; }
咱們逐個分析判斷條件:
config.isAutoRead(): 這裏默認爲true
attemptedBytesRead == lastBytesRead: 表示本次讀取的字節數和最後一次讀取的字節數相等, 由於到這裏都沒有進行字節數組的讀取操做, 因此默認都爲0, 這裏也返回true
totalMessages < maxMessagePerRead: 表示當前讀取的次數是否小於最大讀取次數, 咱們知道totalMessages每次循環都會自增, 而maxMessagePerRead默認值爲16, 因此這裏會限制循環不能超過16次, 也就是最多一次只能讀取16條鏈接
totalBytesRead < Integer.MAX_VALUE: 表示讀取的字節數不能超過int類型的最大值
這裏就剖析完了Handle的建立和初始化過程, 而且剖析了循環終止條件等相關的邏輯