前言html
NioEventLoopGroup是netty對Reactor線程組這個抽象概念的具體實現,其內部維護了一個EventExecutor數組,而NioEventLoop就是EventExecutor的實現(看名字也可發現,一個是NioEventLoopGroup,一個是NioEventLoop,前者是集合,後者是集合中的元素)。一個NioEventLoop中運行着惟一的一個線程即Reactor線程,這個線程一直執行NioEventLoop的run方法。這個run方法就是netty的核心方法,其重要性能夠類比於Spring中的refresh方法。java
下面是從百度上隨便找的一篇netty文章的線程模型圖(詳見文章http://www.javashuo.com/article/p-wevkifbp-ch.html),此處引用是爲方便在頭腦中產生一個總體印象,結合圖下面的代碼進行各個概念的歸位。圖中綠色的Reactor Thread就是上文說的NioEventLoopGroup,對應下面代碼中的boss變量,負責處理客戶端的鏈接事件,它其實也是一個池(由於內部維護的是一個數組);藍色的Reactor Thread Pool也是NioEventLoopGroup,對應下面代碼中的worker變量,負責處理客戶端的讀寫事件。bootstrap
注:上圖是Reactor多線程模型,而下面的代碼示例是主從多線程模型,區別是隻要將代碼boss中的參數2改爲1,示例代碼就成了多線程模型,細細品味一下。數組
1 public class NettyDemo1 { 2 // netty服務端的通常性寫法 3 public static void main(String[] args) { 4 EventLoopGroup boss = new NioEventLoopGroup(2); 5 EventLoopGroup worker = new NioEventLoopGroup(); 6 try { 7 ServerBootstrap bootstrap = new ServerBootstrap(); 8 bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) 9 .option(ChannelOption.SO_BACKLOG, 100) 10 .childHandler(new ChannelInitializer<SocketChannel>() { 11 @Override 12 protected void initChannel(SocketChannel socketChannel) throws Exception { 13 ChannelPipeline pipeline = socketChannel.pipeline(); 14 pipeline.addLast(new StringDecoder()); 15 pipeline.addLast(new StringEncoder()); 16 pipeline.addLast(new NettyServerHandler()); 17 } 18 }); 19 ChannelFuture channelFuture = bootstrap.bind(90); 20 channelFuture.channel().closeFuture().sync(); 21 } catch (Exception e) { 22 e.printStackTrace(); 23 } finally { 24 boss.shutdownGracefully(); 25 worker.shutdownGracefully(); 26 } 27 } 28 }
以上部分是博主對netty的一個歸納性總結,以將概念和其實現鏈接起來,方便創建一個初始的整體認識,下面進入EventLoopGroup的初始化。多線程
1、EventLoopGroup初始化app
一、NioEventLoopGroup構造器框架
順着有參和無參的構造方法進去,發現無參的構造器將線程數賦值0繼續調了有參的構造器,而有參的構造器將線程池executor參數賦值null繼續調重載構造器socket
1 public NioEventLoopGroup() { 2 this(0); 3 }
1 public NioEventLoopGroup(int nThreads) { 2 this(nThreads, (Executor) null); 3 }
1 public NioEventLoopGroup(int nThreads, Executor executor) { 2 this(nThreads, executor, SelectorProvider.provider()); 3 }
由於博主是在筆記本電腦調試的,故此時的selectorProvider是WindowsSelectorProvider,而後又加了一個參數DefaultSelectStrategyFactory單例對象:ide
1 public NioEventLoopGroup( 2 int nThreads, Executor executor, final SelectorProvider selectorProvider) { 3 this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); 4 }
而後調父類的構造器,在末尾增長一個參數RejectedExecutionHandler單例對象:oop
1 public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, 2 final SelectStrategyFactory selectStrategyFactory) { 3 super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); 4 }
二、MultithreadEventLoopGroup構造器
在該構造器中,對線程數參數進行了處理,若是是0(對應上面NioEventLoopGroup的無參構造器),則將線程數設置爲默認值,默認值取的是CPU核數*2,8核處理器對應16個線程;若是不是0,則以指定的線程數爲準。同時,將executor後面的參數變爲數組的形式,對應上面能夠知道args中有三個元素:WindowsSelectorProvider、DefaultSelectStrategyFactory、RejectedExecutionHandler。
1 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { 2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); 3 }
三、MultithreadEventExecutorGroup構造器
此構造器又在args數組前面加了一個單例對象DefaultEventExecutorChooserFactory,用於從NioEventLoopGroup的數組中選取一個NioEventLoop。
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { 2 this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); 3 }
下面纔是最終的核心構造器方法,結合註釋應該比較好理解。其中最重要的是第3步和第4步,下面着重講解這兩步。
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, 2 EventExecutorChooserFactory chooserFactory, Object... args) { 3 // 1.對線程數進行校驗 4 if (nThreads <= 0) { 5 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); 6 } 7 // 2.給線程池參數賦值,從前面追蹤可知,若未賦值,executor一直是null,後續用於建立NioEventLoop中的啓動線程,因此這玩意就是一個線程工廠 8 if (executor == null) { 9 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); 10 } 11 // 3.給children循環賦值,newChild方法是重點,後續會講解 *** 12 children = new EventExecutor[nThreads]; 13 for (int i = 0; i < nThreads; i ++) { 14 boolean success = false; 15 try { 16 children[i] = newChild(executor, args); 17 success = true; 18 } catch (Exception e) { 19 // TODO: Think about if this is a good exception type 20 throw new IllegalStateException("failed to create a child event loop", e); 21 } finally { 22 // 省略掉未建立成功後的資源釋放處理 23 } 24 } 25 // 4.完成chooser選擇器的賦值,此處是netty一個小的優化點,後續會講解 ** 26 chooser = chooserFactory.newChooser(children); 27 // 5.給數組中每個成員設置監聽器處理 28 final FutureListener<Object> terminationListener = new FutureListener<Object>() { 29 @Override 30 public void operationComplete(Future<Object> future) throws Exception { 31 if (terminatedChildren.incrementAndGet() == children.length) { 32 terminationFuture.setSuccess(null); 33 } 34 } 35 }; 36 37 for (EventExecutor e: children) { 38 e.terminationFuture().addListener(terminationListener); 39 } 40 // 6.設置一個只讀的set集合 41 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); 42 Collections.addAll(childrenSet, children); 43 readonlyChildren = Collections.unmodifiableSet(childrenSet); 44 }
3.1)、第4步chooser的賦值
由上面構造器調用過程可知,chooserFactory對應DefaultEventExecutorChooserFactory對象,該對象的newChooser方法以下:
1 public EventExecutorChooser newChooser(EventExecutor[] executors) { 2 if (isPowerOfTwo(executors.length)) { 3 return new PowerOfTwoEventExecutorChooser(executors); 4 } else { 5 return new GenericEventExecutorChooser(executors); 6 } 7 }
邏輯比較簡單,判斷數組的長度是否是2的N次冪,若是是,返回PowerOfTwoEventExecutorChooser對象,若是不是則返回GenericEventExecutorChooser對象。這兩者有什麼區別,netty設計者爲何要這麼作呢?若是對HashMap的實現原理有深刻了解的園友應該不難想到,若是一個數X是2的N次冪,那麼用任意一個數Y對X取模能夠用Y&(X-1)來高效的完成,這樣作比直接%取模快了好幾倍,這也是HashMap用2次冪做爲數組長度的主要緣由。這裏是一樣的道理,以下代碼所示,這兩個chooser類都很簡單,內部維護了一個原子遞增對象,每次調用next方法都加1,而後用這個數與數組長度取模,獲得要對應下標位置的元素。而若是數組長度恰好是2次冪,用PowerOfTwoEventExecutorChooser就會提升效率,若是不是那也沒辦法,走%取模就是了。netty這種對效率提高的處理,是否在平時的CRUD中也能套用一下呢?
1 private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { 2 private final AtomicInteger idx = new AtomicInteger(); 3 private final EventExecutor[] executors; 4 5 PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { 6 this.executors = executors; 7 } 8 9 @Override 10 public EventExecutor next() { 11 return executors[idx.getAndIncrement() & executors.length - 1]; 12 } 13 } 14 15 private static final class GenericEventExecutorChooser implements EventExecutorChooser { 16 private final AtomicInteger idx = new AtomicInteger(); 17 private final EventExecutor[] executors; 18 19 GenericEventExecutorChooser(EventExecutor[] executors) { 20 this.executors = executors; 21 } 22 23 @Override 24 public EventExecutor next() { 25 return executors[Math.abs(idx.getAndIncrement() % executors.length)]; 26 } 27 }
3.2)、第3步newChild方法的邏輯
該方法的實如今NioEventLoopGroup中,因爲args長度爲3,因此queueFactory爲null(暫時未發現哪裏的實現args參數長度會是4,或許只是爲後續擴展用,若是園友對args長度爲4的場景有了解的還請留言指教)。而後調用了NioEventLoop的構造器,下面進入NioEventLoop的初始化。
1 protected EventLoop newChild(Executor executor, Object... args) throws Exception { 2 EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; 3 return new NioEventLoop(this, executor, (SelectorProvider) args[0], 4 ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); 5 }
執行完上述初始化方法後NioEventLoopGroup的快照圖以下,最重要的就兩個屬性:child和chooser。
2、NioEventLoop的初始化
一、NioEventLoop的構造器
到這裏,有必要將此構造器的入參再梳理一遍。parent即上面的NioEventLoopGroup對象,executor是在MultithreadEventExecutorGroup中初始化的ThreadPerTaskExecutor,selectorProvider是WindowsSelectorProvider,strategy是DefaultSelectStrategyFactory,rejectedExecutionHandler是RejectedExecutionHandler,queueFactory是null。
1 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, 2 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, 3 EventLoopTaskQueueFactory queueFactory) { 4 super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), 5 rejectedExecutionHandler); 6 if (selectorProvider == null) { 7 throw new NullPointerException("selectorProvider"); 8 } 9 if (strategy == null) { 10 throw new NullPointerException("selectStrategy"); 11 } 12 provider = selectorProvider; 13 final SelectorTuple selectorTuple = openSelector(); 14 selector = selectorTuple.selector;// netty封裝的selector 15 unwrappedSelector = selectorTuple.unwrappedSelector;// java NIO原生的selector 16 selectStrategy = strategy; 17 }
能夠看到只是作了一些賦值,其中newTaskQueue方法建立的是MpscUnboundedArrayQueue隊列(多生產單消費無界隊列,mpsc是multi provider single consumer的首字母縮寫,即多個生產一個消費),繼續追查父類構造方法。
二、SingleThreadEventLoop構造器
調用父類構造器,給tailTasks賦值。
1 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, 2 boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, 3 RejectedExecutionHandler rejectedExecutionHandler) { 4 super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler); 5 tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue"); 6 }
三、SingleThreadEventExecutor構造器
在該構造方法中完成了剩餘變量的賦值,其中有兩個變量很重要:executor和taskQueue。前者負責建立Reactor線程,後者是實現串行無鎖化的任務隊列。
1 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, 2 boolean addTaskWakesUp, Queue<Runnable> taskQueue, 3 RejectedExecutionHandler rejectedHandler) { 4 super(parent); 5 this.addTaskWakesUp = addTaskWakesUp; 6 this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; 7 this.executor = ThreadExecutorMap.apply(executor, this); 8 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); 9 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); 10 }
NioEventLoopGroup的對象引用最終記錄在了AbstractEventExecutor中:
1 protected AbstractEventExecutor(EventExecutorGroup parent) { 2 this.parent = parent; 3 }
NioeventLoop初始化完成以後的對象快照以下,左邊是子類,右邊是父類:
小結
本文詳細講述了netty中Reactor線程組概念模型的實現類 -- NioEventLoopGroup的實例化過程。NioEventLoopGroup和其內部數組元素NioEventLoop是netty通訊框架的基石,相信本文的內容對初學netty的園友有一點幫助。
下篇將研究ServerBootstrap的初始化過程,敬請期待。