在上一章中,咱們觀摩完了輪詢註冊到reactor線程對用的selector上的全部的channel的IO事件的過程,此次,咱們繼續瞭解下處理IO事件的過程:java
二、處理IO事件react
在NioEventLoop類中,run方法執行完select方法後,將執行processSelectedKeys方法,該方法就是處理IO事件的方法:git
還記得這個selectedKeys是啥不?咱們在第一章的時候,說NioEventLoopGroup初始化的時候見到了該變量的初始化了的。啥?忘了,不要緊,再瞅下:github
提示到這裏應該記得了吧,沒錯,就是netty優化了的selectedKeySet,用數組替換了HastSet的。因爲第一章已經看過,忘了的同窗,請自行觀看。。。數組
回到 processSelectedKeys方法,若是輪詢到了有IO事件的話,則執行processSelectedKeysOptimized方法,那麼咱們進入該方法瞅瞅:異步
首先遍歷selectedKeys數組,而後取出該數組中的成員:jvm
並將該數組成員的位置賦值爲null,主要是便於jvm的回收。而後final Object a = k.attachment();獲取NioServerSocketChannel。oop
很明顯會執行processSelectedKey方法,咱們看下這個方法:優化
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
首先判斷這個SelectionKey是否合法,若是合法,獲取IO事件 int readyOps = k.readyOps();,後面就是對相應的事件進行相應的處理邏輯。回到processSelectedKeysOptimized方法,執行完processSelectedKey方法後,netty會執行如下代碼:this
因爲在run方法中調用以前會設置needsToSelectAgain變量爲false,因此暫時不會執行:
那何時執行呢?
查看下哪裏調用了該方法?
AbstractNioChannel類中的doDeregister:
所以能夠看出,在channel從selector上移除的時候,會調用cancel方法,key.cancel()會將key進行取消。而後將cancelledKeys自增。當cancelledKeys大於等於CLEANUP_INTERVAL時,會將needsToSelectAgain值爲true;回到processSelectedKeysOptimized這個方法中,若needsToSelectAgain爲true,則調用selectedKeys.reset(i + 1);
該方法就是將數組從i開始,之後的值都設置爲null,就是清空selectedKeys數組。清空完數組後,重新選擇,selectAgain();,重新遍歷selectedKeys數組。總結下,這麼作的目的就是每隔256次斷開channel連接,就會清理一次selectedKeys數組。至此,處理IO事件的流程咱們已經觀摩完了,下一章咱們來了解下處理異步任務隊列。