本文是我對Netty的NioEventLoopGroup及NioEventLoop初始化工做的源碼閱讀筆記, 以下圖,是Netty的Reactor線程模型圖,本文描述NioEventLoopGroup等價於我在圖中標紅的MainReactor組件,全篇圍繞它的初始化展開,不免地方理解的不正確,歡迎留言java
在Nio網絡編程模型的圖示是下面那張圖, 單條Thread全職執行一個Selector,首先是服務端在啓動的時候,會把表明服務端的ServerSockerChannel註冊進Selector,且感興趣的事件是Accept, 一旦有客戶端請求創建鏈接,ServerSockerChannel的accept事件就會被Selector感知到,進行下一步處理算法
對NioEventLoopGroup最感性的認識,是在必定程度上,它實際上是對上圖組件的封裝,那麼封裝了哪些部分呢?編程
NioEventLoopGroup維護的是事件循環,EventLoop, 在Netty的主從Reactor線程模型中,兩個事件循環組其實也是線程組,由於每個EventLoop在他的整個生命週期中都始終和一條線程惟一綁定,EventLoop的線程使用的是它本身封裝的FastThreadLocalThread
, 這條線程使得EventLoop有了處理事件的能力設計模式
NioEventLoopGroup維護的是事件循環,EventLoop,一樣維護着屬於本身的Selector選擇器,這個選擇器使得EventLoop擁有了輪詢綁定在本身身上的Channel的能力. 而且Netty對JDK原生的選擇器作出了升級,使用自定義的數組替換了原生Selector的HashSet集合SelectedKeys
,使得時間的複雜度在任什麼時候刻都是O1api
如今看,每個EventLoop都是一個工做單元, 它的初始化過程是怎樣的? 下面就開始正式的閱讀源碼數組
上圖是一個簡化了體系圖,,慢慢擼出他們的關係安全
這個圖使咱們下面源碼的流程圖, 完成NioEventLoopGroup
的初始化網絡
入口:數據結構
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
通過幾層this()
構造方法的調用,咱們來到它的這個構造方法, 它調用了父類的構造方法,帶上了默認的select策略,以及拒絕執行任務的handler多線程
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { //todo 0 null 根據系統選出 nioXXXprovider 默認的選擇策略 // todo 調用父類的構造方法 MultithreadEventLoopGroup 多線程的事件循環組 super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
接着進入super()
, 進入MultiThreadEventLoopGroup
多線程事件循環組, 它幹了件大事, 初始化了NioEventLoopGroup線程的數量,注意,是數量, 源碼在下面: 固然,咱們通常會把BossGroup的線程數量設置爲1
// todo 當MultithreadEventLoopGroup被加載進 JVM就會執行, 對 DEFAULT_EVENT_LOOP_THREADS進行初始化 static { // todo max方法取最大值, // todo SystemPropertyUtil.getInt,這是個系統輔助類, 若是系統中有 io.netty.eventLoopThreads,就取它, 沒有的話,去後面的值 // todo NettyRuntime.availableProcessors() 是當前的 系統的核數*2 , 在個人電腦上 2*2*2=8條線程 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } /** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) */ // todo 接着 使用父類的構造方法, nThreads= DEFAULT_EVENT_LOOP_THREADS // todo Object... args 是 selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()的簡寫 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
接着進入它的Supper()
來到了它的父類的構造方法,MultiThreadEventLoopExcutorGroup
的構造方法, 這個類是相當重要的類,它作了大量的初始化工做, 整體上看:
EventExecutor[]
這個數組其實就是盛放EventLoop的數組, 當這個for循環結束後,實際上NioEventLoopGroup
就添加完成了EventLoop
NioEventLoopGroup
中選取一個NioEventLoop
處理源碼:
// todo 在這個構造方法中,完成了一些屬性的賦值, 完全構造完成 事件循環組對象 // todo Object... args 是 selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()的簡寫 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { // todo 下面須要的參數,一開始使用無參的構造方法時, 傳遞進來的 就是null ,執行這一行代碼, 建立默認的線程工廠 /// todo ThreadPerTaskExecutor 意味爲當前的事件循環組 建立Executor , 用於 針對每個任務的Executor 線程的執行器 // todo newDefaultThreadFactory根據它的特性,能夠給線程加名字等, // todo 比傳統的好處是 把建立線程和 定義線程須要作的任務分開, 咱們只關心任務, 二者解耦 // todo 每次執行任務都會建立一個線程實體 // todo NioEventLoop 線程命名規則 nioEventLoop-1-XX 1表明是第幾個group XX第幾個eventLoop executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; // todo 循環 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // todo 建立EventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // todo chooser 在這裏 初始化了 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
這個過程當中的細節:
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
默認的線程工廠,建立出的Thread,並非JDK原生的Thread,而是Netty本身封裝的
protected Thread newThread(Runnable r, String name) { System.out.println(threadGroup+" threadGroup"); return new FastThreadLocalThread(threadGroup, r, name); }
ThreadPerTaskExecutor
源碼以下, 能夠看到,它的execute直接關聯着Thread.start()
方法, 一旦執行它就會開啓新的線程, 固然源碼看到這裏時,它是沒有沒執行的,由於線程和NioEventLoop
關聯着,再往下就看NioEventLoop
的實現* todo 這裏實際上使用了設計模式 * todo 1. command是用戶定義的任務, 命令模式; 直觀的 我定義一種任務, 程序不須要知道我執行的命令是什麼,可是當我把任務扔給你, 你幫我執行就行了 * todo 2. 代理設計模型, 代理了ThreadFactory , 把原本給ThreadPerTaskExecutor執行的任務給了ThreadFactory */ public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } // todo 必須實現 Executor 裏面惟一的抽象方法, execute , 執行性 任務 @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
其次,上面的newChild(executor, args);
方法實際上是抽象方法,真正運行時會執行子類NioEventLoopGroup
的實現, 以下:
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { // todo !! 真正建立事件循環組的邏輯在這裏!!! return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } // todo 這裏是 它的構造方法 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { // todo 進入到父類, 着重看他是如何建立出 TaskQueue的 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } // todo 經常使用的 屬性 provider = selectorProvider; // todo 獲取Selector選擇器 final SelectorTuple selectorTuple = openSelector(); // todo SelectorTuple是netty維護 jdk 原生的Selector的包裝類, 下面看,他有兩個Selector, 一個是通過包裝的,一個是未通過包裝的 selector = selectorTuple.selector; // unwrappedSelector = selectorTuple.unwrappedSelector; // todo Jdk 原生的Selector selectStrategy = strategy; }
繼續跟進去,上面的NioEventLoopGroup的體系圖也就分析到右半部分了,如上圖,是這半部分初始化工做的主要流程, 下面是它的構造方法,能夠看到主要完成了兩件事
細節:
能夠看到,如今已經進入了它的三級父類SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); // todo 保存線程執行器 this.executor = ObjectUtil.checkNotNull(executor, "executor"); // todo 任務隊列 , 進入查看 taskQueue = newTaskQueue(this.maxPendingTasks); System.out.println(taskQueue.getClass()); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
在這個類中進行了以下的工做:
隊列有啥用?
咱們知道,Netty中的線程可不止一個, 多個EventLoop意味着多個線程, 任務隊列的做用就是當其餘線程拿到CPU的執行權時,卻獲得了其餘線程的IO請求,這時當前線程就把這個請求以任務的方式提交到對應線程的任務隊列裏面
建立的什麼任務隊列?
有個誤區, 當我跟進newTaskQueue(this.maxPendingTasks);
方法時, 進入的方法建立了一個LinkedBlockingQueue隊列
, 實際上建立的確是MpscQueue
, 這並不奇怪,是由於NioEventLoop
把這個方法重寫了, 源碼以下:
@Override protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); }
這個任務隊列的特性是 非阻塞的,多生產者單消費者, 正好和Netty的線程模型對應
此外,這個SingleThreadEventExecutor還有不少重要的方法
NioEventLoop打開本身的隊列時,作了哪些優化?
經過反射,藉助Java.Security.AccessController提供的線程安全的策略執行機制把原生JDK的selector的SelectedKeys這個HashSet替換成了數組,使得他的事件複雜度在任什麼時候刻都是O1
// todo 這裏進行了 優化, netty把hashSet轉換成了數組, 由於在JDK的NIO模型中,獲取Selector時, Selector裏面內置的存放SelectionKey的容器是Set集合 // todo 而netty把它替換成了本身的數據結構, 數組, 從而使在任何狀況下, 它的時間複雜度都是 O1 private SelectorTuple openSelector() { final Selector unwrappedSelector; try { // todo 使用jdk 的api建立新的 selector unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEYSET_OPTIMIZATION) { // todo 若是不須要優化,就返回原生的selector , 默認爲false 即 使用優化 return new SelectorTuple(unwrappedSelector); } // todo 接下來 netty會用下面這個SelectedSelectionKeySet數據結構 替換原來的 keySet , 進入查看 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { // todo 經過反射 sun.nio.ch.SelectorImpl 或者這個類 return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); // todo 判斷是否獲取到了這個類 if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } // todo 肯定是Selector的實現類 換了個名字 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; /** * todo 類java.security.AccessController提供了一個默認的安全策略執行機制,它使用棧檢查來決定潛在不安全的操做是否被容許。 * todo 這個訪問控制器不能被實例化,它不是一個對象,而是集合在單個類中的多個靜態方法。 */ Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { // todo 經過反射, 獲取到 selectorImplClass的兩個字段 selectedKeys publicSelectedKeys // todo selectedKeys publicSelectedKeys底層都是 hashSet() 實現的, 如今獲取出來了, 放入上面的數組數據結構中 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); // todo trySetAccessible 能夠強制訪問私有的對象 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField); if (cause != null) { return cause; } // todo trySetAccessible 能夠強制訪問私有的對象 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField); if (cause != null) { return cause; } // todo 真正的把經過反射獲得的 那兩個字段放入咱們本身的數據結構中 // // todo 下面是把咱們的NioEventLoop中的 unwrappedSelector 的 selectedKeysField的屬性 直接設置成 優化後的selectedKeySet selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } // todo 初始化本身維護被選中的key的集合 --> 數組類型的 selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
到如今爲止, NioEventLoopGroup和NioEventLoop就都初始化完成了,固然這是初始化,程序運行到如今,依然只有一條主線程, EventLoop的Thrad還沒start()
幹活,可是起碼已經有能力準備啓動了
總結一下:
就像下面的體系同樣, 五臟俱全