文本分享Netty中NioEventLoop的具體實現與Accept事件處理過程。
源碼分析基於Netty 4.1java
前面文章說過了,NioEventLoop能夠視爲一個線程,Netty會將register,bind等操做做爲一個任務提交給EventLoop,而EventLoop的啓動正是在提交任務的execute方法中git
public void execute(Runnable task) { ... boolean inEventLoop = inEventLoop(); addTask(task); // #1 if (!inEventLoop) { startThread(); if (isShutdown() && removeTask(task)) { reject(); } } // #2 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
#1
若當前線程非EventLoop執行線程,startThread會啓動一個新的線程,執行run方法。這裏就是啓動EventLoop了。#2
若當前EventLoop執行線程阻塞等待IO事件(Selector#select方法),喚醒它執行任務github
NioEventLoop中維護了一個關注事件Key集合SelectedSelectionKeySet,它使用數組代替原(Jvm)Selector的中的HashSet,提升性能。數組默認大小爲1024,不夠用時容量*2。
另外NioEventLoop還維護了(jvm)Selector和優化後的SelectedSelectionKeySetSelector,SelectedSelectionKeySetSelector每次調用select前都清除SelectedSelectionKeySet。
NioEventLoop#構造方法 -> NioEventLoop#openSelector數組
private SelectorTuple openSelector() { final Selector unwrappedSelector; try { // #1 unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } ... final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { // #2 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); ... selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } ... } }); ... selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); // #3 return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
#1
經過(jvm)SelectorProvider打開一個(jvm)Selector#2
構造了SelectedSelectionKeySet對象selectedKeySet,並經過反射將該對象設置到(jvm)Selector的selectedKeys,publicSelectedKeys屬性中,這樣Selector監聽到的事件就會存儲到selectedKeySet。#3
構造了SelectedSelectionKeySetSelector對象
NioEventLoop#unwrappedSelector維護的是JVM的Selector對象,NioEventLoop#selector則是Netty的SelectedSelectionKeySetSelector對象微信
下面看一下Netty的核心方法,NioEventLoop#runapp
protected void run() { for (;;) { try { // #1 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: // #2 select(wakenUp.getAndSet(false)); // #3 if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; // #4 if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { // #5 if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
run方法就是事件循環機制的具體實現,經過一個for循環不斷處理任務和IO事件。#1
若是當前存在任務,調用selector.selectNow(),這時跳出switch循環,處理事件和任務,不然返回SelectStrategy.SELECT#2
select方法會調用(jvm)Selector.select,阻塞線程,等待事件#3
預喚醒線程
因爲頻繁調用(jvm)Selector.wakeup會形成性能消耗,NioEventLoop維護了一個喚醒標識wakenUp。
NioEventLoop#wakeup方法中,只有wakenUp.compareAndSet(false, true)
成功才調用selector.wakeup()jvm
考慮這樣的場景,select方法阻塞期間,有新任務加入,這時wakenUp會被設置爲true。
任務處理完後,從新執行到Run方法#1
,#2
步驟間,若是這時有任務加入,因爲wakenUp已是true,沒法調用selector.wakeup(),致使該任務大量延遲。
爲了不這種狀況,Netty作了一次預喚醒。ide
#4
ioRatio表示執行IO事件所佔百分比,默認50,
這裏根據ioRatio,使用processSelectedKeys方法處理IO事件,runAllTasks方法處理Task,注意方法參數,經過ioTime,ioRatio計算任務應該執行的時間。#5
檢查狀態,若是Netty中止,則關閉全部Channeloop
NioEventLoop#select源碼分析
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { // #1 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // #2 ... // #3 int selectedKeys = selector.select(timeoutMillis); selectCnt ++; // #4 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } if (Thread.interrupted()) { ... selectCnt = 1; break; } // #5 long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // #6 ... rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } ... } catch (CancelledKeyException e) { ... } }
select方法中使用for循環不斷阻塞線程,直達發送IO事件或加入了新任務。#1
delayNanos方法計算延遲任務隊列中第一個任務的到期執行時間(即最晚還能延遲多長時間執行),沒有任務則返回1s。
阻塞時間timeoutMillis,爲第一個延遲任務的到期時間和當前時間差,加上預留時間5毫秒
timeoutMillis小於等於0(延遲任務到期了),跳出for循環,直接返回#2
這裏的處理邏輯在Netty5.0.0.Alpha2版本移除,這裏也不列出#3
調用select方法,阻塞線程等待IO時間或被喚醒#4
當前已經發生了IO事件或者加入了任務,延遲任務(被喚醒),跳出for循環,直接返回#5
舊版本的Java NIO在Linux Epoll實現上存在bug,(jvm)Selector.select方法可能在沒有任何就緒事件的狀況下返回,致使CPU空轉,利用率飆升到100%。
因而,Netty計算select方法重複調用次數selectCnt,並在selectCnt大於SELECTOR_AUTO_REBUILD_THRESHOLD配置(默認512)時,重建selector,從而規避該問題。
這裏因爲select方法阻塞時間大於等於timeoutMillis,則認爲當前非空轉,selectCnt次數重置。#6
selectCnt大於SELECTOR_AUTO_REBUILD_THRESHOLD,重建Selector,將原Selector中註冊的Channle和關注事件Key轉移到新的Selector,並經過selectNow方法刷新selectedKeys。
幸虧在JDK6_6u4,JDK7_b12已修復該Bug。
NioEventLoop#processSelectedKeys -> (沒有禁用SelectedSelectionKeySet)processSelectedKeysOptimized -> processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ... try { int readyOps = k.readyOps(); // #1 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // #2 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // #3 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
#1
在調用read方法或write方法前,須要先調用finishConnect方法,不然(jvm)Channel可能拋出NotYetConnectedException異常#2
先處理OP_WRITE事件,可以儘早寫入數據釋放內存#3
處理OP_READ或OP_ACCEPT事件
下面咱們來看一下AcceptGroup中如何處理ServerChannel上監聽到的accept事件。
NioEventLoop#processSelectedKey方法#3
步驟 -> NioMessageUnsafe#read(Channel,UnSafe各實現類關係請查看前面文章)
public void read() { ... try { try { do { // #1 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // #2 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); // #3 pipeline.fireChannelReadComplete(); ... } ... }
#1
調用NioServerSocketChannel#doReadMessages,處理Accept事件。
注意,readBuf是一個List,用於接收處理結果。#2
觸發DefaultChannelPipeline#fireChannelRead#3
觸發DefaultChannelPipeline#fireChannelReadComplete
NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception { // #1 SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { // #2 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { ... } return 0; }
#1
調用(jvm)ServerSocketChannel#accept方法,生成的(jvm)SocketChannel#2
使用(jvm)SocketChannel構造NioSocketChannel
前面文章說過,ServerChannel註冊到AcceptGroup時,會給ServerChannel的ChannelPipeline添加一個ServerBootstrapAcceptor,用於處理accept事件。
DefaultChannelPipeline#fireChannelRead -> ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) { // #1 final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { // #2 childGroup.register(child).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
#1
注意msg參數,就是NioServerSocketChannel#doReadMessages方法中生成的NioSocketChannel。
上一篇文章已經說過,ServerBootstrap#init方法中會將ServerBootstrap的childHandler,currentChildHandler,currentChildOptions,currentChildAttrs變量都交給ServerBootstrapAcceptor。
這裏配置NioSocketChannel的Options,Attribute,並將childHandler添加給pipeline。
(咱們通常使用ChannelInitializer做爲childHandler,添加實際處理邏輯的ChannelHandler)#2
將NioSocketChannel註冊到ReadGroup中,註冊過程相似於NioServerSocketChannel註冊到AcceptGroup類型,調用AbstractUnsafe#register方法實現。
但有一點不一樣,調用AbstractNioChannel#doBeginRead方法註冊關注事件Key時,關注事件Key(即AbstractNioChannel#readInterestOp),是來自子類AbstractNioByteChannel#構造方法,固定爲SelectionKey.OP_READ。(關於Channel註冊的內容,請參考 -- Netty Server啓動原理解析)
到這裏,(jvm)SocketChannel已經註冊到ReadGroupo維護中(jvm)Selector,關注的事件Key爲read。
若是您以爲本文不錯,歡迎關注個人微信公衆號,您的關注是我堅持的動力!