1、NioEventLoop的概述數組
NioEventLoop作爲Netty線程模型的核心部分,從本質上講是一個事件循環執行器,每一個NioEventLoop都會綁定一個對應的線程經過一個for(;;)
循環來處理與 Channel 相關的 IO 操做, 包括 調用 select 等待就緒的 IO 事件、讀寫數據與數據的處理等;其次做爲任務隊列, 執行 taskQueue 中的任務, 例如eventLoop.schedule 提交的定時任務也是這個線程執行的。而NioEventLoopGroup顧名思義,它是維護了一組這樣的事件循環器,這也是Netty基於Reactor模式的具體設計體現。app
接下來咱們就結合具體的代碼,對NioEventLoop的整個建立流程進行一個說明與總結ide
2、NioEventLoop的建立函數
咱們基於Netty構建服務端仍是客戶端時,都首先須要建立NioEventLoopGroup 實例oop
// Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup 作爲基於NIO的處理channle相關IO操做的事件循環器組,它的類層次結構以下優化
經過NioEventLoopGroup構造函數傳入線程數量this
/** * Create a new instance using the specified number of threads, {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); }
NioEventLoopGroup最終的構造函數中會包含如下幾個函數spa
一、nThreads:傳入的線程數量.net
二、executor :線程執行器Executor接口,默認爲空線程
三、selectorProvider:用於建立Selector的SelectorProvider
四、selectStrategyFactory:傳入DefaultSelectStrategyFactory.INSTANCE, 一個使用默認選擇策略的工廠。
五、RejectedExecutionHandlers.reject():Netty自定義線程拒絕策略
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
在父類MultithreadEventLoopGroup中,會根據你傳入nThreads大小,肯定初始化的線程數量,爲0且沒有設置io.netty.eventLoopThreads參數項,則會以當前系統的核心線程數*2作爲默認的線程數量
static { //若是沒有設置io.netty.eventLoopThreads參數項,則會以當前運行系統的核心線程數*2做爲線程數 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } /** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
接下來在MultithreadEventExecutorGroup的構造函數中咱們會根據傳入的線程數,去初始化和建立一組NioEventLoop
首先咱們看下NioEventLoop的類層次結構
下面在MultithreadEventExecutorGroup構造函數中主要完成如下幾個功能:
一、初始化ThreadPerTaskExecutor線程執行器,並傳入一個線程建立工廠,用於NioEventLoop對應線程的建立
二、根據傳入的線程數,初始化一個EventExecutor數組,用於放置建立的NioEventLoop對象
三、循環數組,經過newChild方法建立NioEventLoop對象。
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { // 建立線程工廠,netty根據須要指定了線程的命名方式、優先級、是不是守護線程等屬性 // 該線程池沒有任何隊列,提交任務後,建立任何線程類型都是 FastThreadLocalRunnable, 而且當即start。 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //初始化一組事件循環執行器 children = new EventExecutor[nThreads]; //根據傳入的線程數,初始化一個線程數組 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 建立 new NioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } }
繼續跟蹤進入newChild(executor, args)內部,看到它會返回一個NioEventLoop對象
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
繼續查看NioEventLoop構造函數和他的父類構造函數
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
父類構造函數
/** * Create a new instance * * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it * @param executor the {@link Executor} which will be used for executing * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * executor thread * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. * @param rejectedHandler the {@link RejectedExecutionHandler} to use. */ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
經過上面的代碼咱們能夠看到,初始化NioEventLoop主要完成了如下的功能
一、保存線程執行器ThreadPerTaskExecutor
二、建立一個selector
三、基於LinkedBlockingQueue建立一個taskQueue任務隊列,用於保存要執行的任務
這些都是爲了後續的循環執行Channel 相關事件所作準備。
到這裏其實咱們建立了一組NioEventLoop,也就是一組事件循環執行器,每一個NioEventLoop中都有對應的一個線程和一個selector ;建立完畢以後,天然就是要爲每個鏈接分配對應的NioEventLoop。Netty中經過
實現EventLoopGroup接口中的next()方法來返回一個可使用的的NioEventLoop
public interface EventLoopGroup extends EventExecutorGroup { /** * Return the next {@link EventLoop} to use */ @Override EventLoop next(); }
在MultithreadEventExecutorGroup中咱們能夠查看它的具體實現方式
chooser = chooserFactory.newChooser(children); @Override public EventExecutor next() { return chooser.next(); }
進入代碼內部咱們能夠看到Netty針對數組大小,對數組下標的計算方式進行了優化
/** * Default implementation which uses simple round-robin to choose next {@link EventExecutor}. */ @UnstableApi public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { //判斷是不是二的次冪,若是爲true返回PowerOfTwoEventExecutorChooser,反之GenericEventExecutorChooser if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } //經過&運算的方式循環獲取數組下標 @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } //經過取模的方式循環獲取數組下標 @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } }
到此咱們基本把Netty中NioEventLoop及NioEventLoopGroup的建立流程及核心代碼梳理了一遍。NioEventLoop作爲Netty線程模型的核心部分包含的內容比較多,上面只是初始化及建立的一部份內容,後續的部分我會陸續的補齊,其中有錯誤和不足之處還請指正與海涵。