在上節咱們知道Netty啓動後會動起一個selector線程監聽IO事件,IO事件包括如下幾個:java
讀事件便可以發生在客戶端也可會發生在服務端,當客戶端或服務端註冊讀事件並接受到遠端發送的數據就會觸發讀事件。編程
寫事件便可以發生在客戶端也可會發生在服務端,寫事件能夠由外部直接調用觸發,當出現寫半包時(出如今TCP緩存滿的狀況),Netty會註冊寫操做位,待TCP緩存消耗後也會觸發寫事件。數組
只發生在服務端,服務端啓動的時候會註冊接收操做位監聽客戶端的鏈接。promise
只發生在客戶端,客戶端啓動時會嘗試鏈接服務端,鏈接是異步的不必定立刻成功不成功則須要註冊鏈接操做位監聽客戶端的鏈接成功。緩存
下面從服務端的角度介紹Netty啓動後,接收客戶端鏈接的流程,以及客戶端鏈接上後服務端的讀和寫的流程。多線程
當Selector輪詢到接收事件會在NioEventLoop類中的processSelectedKey方法中進行處理,源碼以下:併發
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ... int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } ... }
服務端的接收邏輯直接委託給unsafe.read()處理,unsafe中有2個實現類NioMessageUnsafe和NioByteUnsafe,因爲服務端啓動初始化的Channel用的是NioServerSocketChannel,因此unsafe的實現類是NioMessageUnsafe,下面看下unsafe.read()的實現:異步
public void read() { ... try { for (;;) { //獲取接收結果 int localRead = doReadMessages(readBuf); //若是接收結果爲空直接推出 if (localRead == 0) { break; } //異常狀況下返回,tcp協議未用到 if (localRead < 0) { closed = true; break; } //非自動讀,退出並去掉監聽客戶端的鏈接事件,變成手工註冊,通常不用 if (!config.isAutoRead()) { break; } //每波的最大處理鏈接請求數默認爲16 if (readBuf.size() >= maxMessagesPerRead) { break; } } } catch (Throwable t) { exception = t; } setReadPending(false); int size = readBuf.size(); //調用pipeline鏈處理客戶端鏈接事件 for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } //清理接收對象 readBuf.clear(); //調用pipeline鏈處理接收完成事件 pipeline.fireChannelReadComplete(); ... }
接收客戶端的處理邏輯主要流程以下:tcp
SocketChannel是服務端和客戶端通信的核心操做類,pipeline.fireChannelRead方法在以前講過是一個調用鏈,調用用戶的配置ChannelHandler,這裏系統會調用初始化channel時系統自動註冊的ServerBootstrapAcceptor裏的channelRead方法(初始化channel流程能夠閱讀上節內容)ide
channelRead是Netty的核心代碼主要對SocketChannel進一步封裝使剝離AcceptorSelector線程,獨立出跟客戶端通信IOSelector線程。咱們來看下它的實現:
public void channelRead(ChannelHandlerContext ctx, Object msg) { //獲取與客戶端通信的通道SocketChannel(下面叫childChannel) final Channel child = (Channel) msg; //將用戶配置的處理器childChannel設置到childChannel child.pipeline().addLast(childHandler); //將用戶配置的系統參數設置到childChannel for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } //將用戶配置的屬性設置到childChannel for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //在從線程池裏註冊childChannel childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
channelRead流程比較簡單就是在從線程池註冊childChannel,而後從線程池起相應的selector線程處理服務端和客戶端的讀事件和寫事件。
這裏childChannel的註冊流程和服務端啓動時的channel的註冊流程基本同樣不過這裏默認自動註冊SelectionKey.OP_READ讀事件而不是SelectionKey.OP_ACCEPT接收事件。須要注意的是從線程池的每一個線程會建立一個selector對象而一個selector可能註冊多個childChannel。
完成上面的流程客戶端就能夠跟服務端通信了,若是客戶端發送了數據,服務端的從selector線程就會了輪詢到讀事件,一樣讀事件會在NioEventLoop類中的processSelectedKey方法中進行處理,源碼以下:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ... int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } ... }
能夠看到讀事件源碼跟接收事件的源碼是同一塊,只是unsafe實現的類不同,因爲childChannel的實現類是NioSocketChannel所以unsafe的實現類是NioByteUnsafe,下面重點分析下unsafe.read()的實現:
@Override public final void read() { final ChannelConfig config = config(); //readPending狀態是非自動讀狀況下使用,readPending若是是false表示數據已讀完移除讀操做位 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); //獲取ByteBuf構造器 final ByteBufAllocator allocator = config.getAllocator(); //獲取自動讀模式下的一次性讀取的最大的次數 final int maxMessagesPerRead = config.getMaxMessagesPerRead(); //獲取ByteBuf容量分配器 RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { //構造ByteBuf byteBuf = allocHandle.allocate(allocator); //獲取byteBuf最大可寫字節 int writable = byteBuf.writableBytes(); //將接收到的字段寫入到byteBuf,並獲取接收數據長度 int localReadAmount = doReadBytes(byteBuf); //未讀到數據釋放byteBuf,跳出讀取邏輯 if (localReadAmount <= 0) { byteBuf.release(); byteBuf = null; close = localReadAmount < 0; break; } //數據讀完設置readPending爲false if (!readPendingReset) { readPendingReset = true; setReadPending(false); } //交給用戶配置的數據解析器(ChannelHandler)處理讀到的數據 pipeline.fireChannelRead(byteBuf); byteBuf = null; //避免溢出,若是讀取的數據量大於Integer的最大值則直接退出 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { totalReadAmount = Integer.MAX_VALUE; break; } //加上此次讀到的數據量 計算出總數據量 totalReadAmount += localReadAmount; //非自動讀,退出 if (!config.isAutoRead()) { break; } //若是讀到的數據量小於byteBuf最大可寫字節 說明數據已經接受完,退出循環 if (localReadAmount < writable) { break; } //不然數據還未讀完,繼續讀,直到讀完或者讀取次數大於最大次數 } while (++ messages < maxMessagesPerRead); //觸發讀取完成的處理器 pipeline.fireChannelReadComplete(); //記錄此次讀取數據的總量,以便後續動態建立byteBuf的大小 allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
unsafe.read()的實現比較複雜咱們按步驟來分析
allocator做用是構造怎麼樣類型的ByteBuf好比默認就是構造了UnpooledUnsafeDirectByteBuf,UnpooledUnsafeDirectByteBuf提供了非池化的堆外內存直接操做的支持。
allocHandle做用是分配ByteBuf的容量,allocHandle有2個實現類分別是FixedRecvByteBufAllocator和AdaptiveRecvByteBufAllocator,FixedRecvByteBufAllocator實現比較簡單,根據用戶的配置分配固定的容量,AdaptiveRecvByteBufAllocator是默認的實現,它會根據上次分配的容量動態調整大小。
static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_INITIAL = 1024; static final int DEFAULT_MAXIMUM = 65536; private static final int INDEX_INCREMENT = 4; private static final int INDEX_DECREMENT = 1;
AdaptiveRecvByteBufAllocator的最小容量爲64字節,默認初始容量爲1024,最大容量爲65536字節,其擴展步伐值爲4,收縮步伐值爲1。
static { List<Integer> sizeTable = new ArrayList<Integer>(); for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } }
AdaptiveRecvByteBufAllocator初始化了53個容量選項當容量小於512字節時以16字節的步伐增加,當容量大於512字節時,容量以上一個容量的2倍增加。
咱們重點分析下擴容策略方法record()
public void record(int actualReadBytes) { //本次讀取的總容量與上次容量收縮後的前一個位置的容量比較,若是比它還小或相等說明還在收縮 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { //是否持續在收縮,是的話上次容量收縮1個步伐 if (decreaseNow) { index = Math.max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; //不是持續收縮,標記下,不作收縮處理 } else { decreaseNow = true; } //若是當前容量大於上次的容量,則擴容4個步伐 } else if (actualReadBytes >= nextReceiveBufferSize) { index = Math.min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } }
record()的參數actualReadBytes表示本次讀取的總容量。以上代碼的總結以下:
容量的收縮如下2個條件 - 1.若是本次讀取的總容量比上次容量收縮後的前一個位置的容量還小或相等。 - 2.容量至少持續2次在收縮。 容量的擴展如下1個條件 - 1.若是本次讀取的總容量大於上次的容量。
以上分析了構造ByteBuf,下面繼續分析unsafe.read()方法。
將接收到的字段寫入到byteBuf,並獲取接收數據長度。
若是未讀到數據,說明數據已經讀物,釋放byteBuf緩存,跳出讀取循環邏輯。
這裏的doReadBytes可能返回0,大於0和-1,等於0說明已無數據可讀,大於0表示讀到數據,-1表示讀取異常須要設置close標誌爲true用於關閉鏈接。
避免溢出,若是讀取的數據量大於Integer的最大值則直接跳出讀取循環邏輯。
交給用戶配置的數據解析器(ChannelHandler)處理讀到的數據。
這裏交給數據解析器數據,不必定是你想要的完整的數據,可能出現半包和粘包的現象,這須要數據解析器處理才能得到完整數據。
累計數據總量totalReadAmount,以便後面擴容策略用。
非自動讀,跳出讀取循環邏輯。
非自動狀況下須要編程人員本身註冊讀操做位,才能觸發讀事件
若是讀到的數據量小於byteBuf最大可寫字節 說明數據已經接受完,跳出讀取循環邏輯。
若是讀到的數據量等於byteBuf最大可寫字節,說明TCP緩存區還能還有數據,須要再次循環去讀。
這裏最大的循環次數模式是16次(可配)若是超過這個次數不管TCP緩存區是否還要數據都會終止循環,等下個selector週期再去讀。
觸發讀取完成的處理器。
這裏的處理器須要編程人員本身配置。
記錄此次讀取數據的總量,以便後續動態建立byteBuf的大小,動態擴容上面已經講過,這裏不累述。
若是讀取發生IO異常,則關閉鏈接。(上面講過的讀取狀態返回-1而且close標誌設置爲true的狀況)。
以上就是整個讀事件的整個流程,下面來分析下寫事件流程。
服務端通常在接受到數據處理結束後給客戶端端返回一個響應數據,發送響應數據則須要調用ChannelHandlerContext#writeAndFlush方法。咱們之外部調用ChannelHandlerContext#writeAndFlush(如下稱ctx.writeAndFlush)方法爲例來分析寫事件流程,ctx.writeAndFlush方法看字面意思就是寫入和刷新,ctx.writeAndFlush的寫入是不會真正的發送,而是存到緩存中,刷新後才從緩存拿出數據發送。ctx.writeAndFlush也是調用鏈,開發人員能夠實現ChannelOutboundHandler裏的
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
方法捕獲寫數據事件或者實現
void flush(ChannelHandlerContext ctx) throws Exception;
方法來捕獲刷新數據事件。
ctx.writeAndFlush的真正實現是其私有方法write,我看來看下write的源碼實現:
private void write(Object msg, boolean flush, ChannelPromise promise) { //獲取下個ChannelOutboundHandler的包裝ChannelHandlerContext AbstractChannelHandlerContext next = findContextOutbound(); //獲取ChannelHandlerContext裏分配的線程 EventExecutor executor = next.executor(); //若是是跟當前線程同一個 if (executor.inEventLoop()) { //直接調用write next.invokeWrite(msg, promise); //若是須要刷新調用flush if (flush) { next.invokeFlush(); } //若是非同一個線程,須要異步處理 } else { //獲取可讀取數據量 int size = channel.estimatorHandle().size(msg); if (size > 0) { ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); //由於異步發送事先增長待發送緩存量,佔用空間 if (buffer != null) { buffer.incrementPendingOutboundBytes(size); } } Runnable task; //若是須要刷新,建立帶刷新的任務 if (flush) { task = WriteAndFlushTask.newInstance(next, msg, size, promise); //不然建立不帶刷新的任務 } else { task = WriteTask.newInstance(next, msg, size, promise); } safeExecute(executor, task, promise, msg); } }
實現比較簡單,取鏈中下個ChannelHandlerContext,這裏的ChannelHandlerContext是ChannelHandler的包裝類,維護了ChannelHandlerContext的next和prev節點。
若是ChannelHandlerContext中配置的線程跟當前是同線程則同步調用寫和刷新的方法,這裏若是ChannelHandlerContext中沒有配置線程默認取的channel裏的線程。
若是不是當前線程就要建立任務異步執行寫和刷新的方法,這裏若是是異步執行的話會事先增長待發送緩存量預佔用空間,待要執行寫的方法的時候會還原緩存佔用空間。
增長待發送緩存量主要是爲了反正發送的數據太大致使緩存消耗速度小於寫入速度,若是超過用戶配置的大小(默認64k),會給用戶配置的處理器發報警,這裏涉及Netty緩存設計,不過多介紹。
netty寫的過程會調用用戶配置的處理器,這裏編程人員能夠實現ChannelOutboundHandler的
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
方法來捕獲寫事件(好比對數據進行編碼)。固然寫的最終流程是在channel的unsafe裏執行:
public final void write(Object msg, ChannelPromise promise) { ... int size; try { //包裝msg msg = filterOutboundMessage(msg); //獲取數據大小 size = estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //寫入緩存 outboundBuffer.addMessage(msg, size, promise); }
寫入緩存前,會對msg進行包裝,這裏的包裝主要將存儲在Java堆內存的待數據寫入到堆外內存,存在堆外內存好處就是真正的發送時減小一次堆內拷貝到堆外的過程,提高發送效率。
最後將轉化好的msg寫入到outboundBuffer緩存,這裏的寫入也會執行一次增長待發送緩存量操做,因此上面講的在異步寫入的寫入操做真正執行前會會還原緩存佔用空間,爲的就是避免重複的增長待發送緩存量操做。
刷新的過程成也是同樣,會先調用用戶配置的處理器,這裏編程人員能夠實現ChannelOutboundHandler的
void flush(ChannelHandlerContext ctx) throws Exception;
方法來捕獲刷新事件,固然寫的最終流程也是在channel的unsafe裏執行:
public final void flush() { //獲取緩存容器 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; //若是緩存已經釋放,則退出 if (outboundBuffer == null) { return; } //準備待發送數據 outboundBuffer.addFlush(); //刷新 flush0(); }
比較簡單,先獲取緩存而後準備待發送數據最後調用flush0()刷新,這裏須要注意的是發送相關的處理不要配置成多線程,這裏會出現併發準備待發送數據的問題。
下面來看下flush0()的實現:
protected void flush0() { //避免再次進入 if (inFlush0) { return; } //獲取緩存容器 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; //若是當前通道(channel)已經關閉,或斷開鏈接,則執行刪除當前待發送數據操做。 if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true); } else { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { //發送數據操做 doWrite(outboundBuffer); } catch (Throwable t) { //出現IO異常而且配置自動關閉則關閉全部 if (t instanceof IOException && config().isAutoClose()) { close(voidPromise(), t, false); //不然執行刪除當前待發送數據操做。 } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }
flush0()主要有2個操做
咱們重點分析下doWrite方法:
protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { //獲取須要待發送數據個數 int size = in.size(); //若是待發送數據爲空,清理寫操做位並退出 if (size == 0) { clearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; //獲取待發送數據 ByteBuffer[] nioBuffers = in.nioBuffers(); //獲取待發送數據總個數 int nioBufferCnt = in.nioBufferCount(); //獲取待發送數據總字節數 long expectedWrittenBytes = in.nioBufferSize(); //獲取JDK的SocketChannel SocketChannel ch = javaChannel(); switch (nioBufferCnt) { //若是爲0可能除了ByteBuffers類型外還要其餘類型要發送,交給父類處理 case 0: super.doWrite(in); return; //若是是單個待發送數據,調用JDK的SocketChannel單個發送方法 case 1: ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; //若是是多個待發送數據,調用JDK的SocketChannel多個發送方法 default: //發送嘗試,默認嘗試16次 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { //調用JDK的SocketChannel的write發送數據 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); //若是發送的量爲0,通道不可寫,TCP緩存隊列已滿,設置寫操做位標記setOpWrite爲true,退出發送 if (localWrittenBytes == 0) { setOpWrite = true; break; } //指望發送字節數減掉已經成功發送字節數 expectedWrittenBytes -= localWrittenBytes; //累加已發送字節數 writtenBytes += localWrittenBytes; //指望發送字節數爲0,說明已經發送完畢,設置發送結束標誌done爲true,退出發送 if (expectedWrittenBytes == 0) { done = true; break; } } break; } // 釋放徹底寫入的緩衝區,並更新部分寫入的緩衝區的索引。 in.removeBytes(writtenBytes); //若是數據未發送完,處理未發送完成邏輯 if (!done) { incompleteWrite(setOpWrite); break; } } }
doWrite方法是一個大循環,它每次處理完會再試,直到沒有須要待發送數據,就是刪除寫操做位(若是有)。
doWrite發送數據前先獲取3個局部變量:
待發送數據數組nioBuffers。
獲取待發送數據總個數nioBufferCnt。
獲取期待待發送數據總字節數expectedWrittenBytes。
根據待發送數據總個數nioBufferCnt,發送要分3種狀況。
若是nioBufferCnt爲0,說明待發送的ByteBuffers類型數據爲0,但可能除了ByteBuffers類型外還要其餘類型要發送,交給父類處理。
若是nioBufferCnt爲1,說明待發送的ByteBuffer數據只有1個,調用JDK的SocketChannel的實現單個數據發送的write方法。
若是nioBufferCnt大於1,說明待發送的ByteBuffer數據有多個,調用JDK的SocketChannel的實現多個數據發送的write方法。
第1種通常不會出現咱們不作分析,重點分析下第2第3種狀況,第2第3種方法除了調用JDK的SocketChannel的write方法實現不同,其餘邏輯徹底相同。
發送的邏輯可能會出現一次發送不徹底的狀況這裏默認嘗試16次發送(可配),最終會出現3種最終發送狀況:
發送數據完成。
發送徹底失敗,出現通道不可寫狀況。
嘗試16次發送後只是部分發送成功,出現通道不可寫或發送數據量太大狀況。
第1和第3種狀況是有數據發送成功的狀況,因此發送完後會釋放徹底發送成功的緩衝區,並更新部分發送成功的緩衝區的索引。
第2和第3狀況是有產生數據未發送成功的狀況,因此會調用incompleteWrite作相應的後續處理。
咱們來看下incompleteWrite方法實現:
protected final void incompleteWrite(boolean setOpWrite) { // 若是出現通道不可寫狀況,則註冊寫操做位由selector異步輪詢到OP_WRITE事件的時候調用foreceFlush進行flush if (setOpWrite) { setOpWrite(); //若是出現數據量太大狀況,放入channel線程中排隊處理未發送數據,以便在此期間能夠執行其餘任務 } else { Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask); } }
incompleteWrite方法有2種狀況:
若是出現通道不可寫狀況,則註冊寫操做位由selector異步輪詢到OP_WRITE事件的時候去刷新。
出現不可寫狀況一般都是TCP緩存空間滿了,只有TCP緩存預留空間大於發送低潮限度時纔會觸發OP_WRITE事件,因此出現通道不可寫狀況必須註冊寫操做位交給系統來判斷TCP緩存空間是否可寫。
若是出現數據量太大狀況,放入channel線程中排隊去刷新未發送數據,這樣以便在此期間能夠執行其餘任務。
以上就是對Netty的IO事件的分析