對於NioEventLoopGroup
這個對象,在個人理解裏面它就和ThreadGroup
相似,NioEventLoopGroup
中有一堆NioEventLoop
小弟,ThreadGroup
中有一堆Thread
小弟,真正意義上幹活的都是NioEventLoop
和Thread
這兩個小弟。下面的文章你們能夠類比下進行閱讀,應該會很容易弄懂的。(本文基於netty-4.1.32.Final)php
這裏我們能夠從NioEventLoopGroup
最簡單的無參構造函數開始。java
1 public NioEventLoopGroup() {
2 this(0);
3 }
複製代碼
一步步往下走,能夠發現最終調用到構造函數:git
1 public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
2 final SelectStrategyFactory selectStrategyFactory) {
3 super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
4 }
複製代碼
參數說明:github
io.netty.eventLoopThreads
系統環境變量,則優先考慮,不然設置成爲CPU核心數*2
。null
。SelectorProvider.provider()
。SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory()
。RejectedExecutionException
異常。繼續往下面走,調用父類MultithreadEventLoopGroup
中的構造函數:數組
1 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
3 }
複製代碼
這裏能夠看到判斷nThreads == 0
後就會給其附上一個默認值。繼續走,調用父類MultithreadEventExecutorGroup
中的構造方法。bash
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
2 this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
3 }
複製代碼
這裏有個關注的點,DefaultEventExecutorChooserFactory
。這是一個chooserFactory,用來生產EventExecutorChooser
選擇器的。而EventExecutorChooser
的功能是用來選擇哪一個EventExecutor
去執行我們的任務。我們從下面的代碼中能夠觀察到DefaultEventExecutorChooserFactory
一共給我們提供了兩種策略。併發
1 public EventExecutorChooser newChooser(EventExecutor[] executors) {
2 if (isPowerOfTwo(executors.length)) {
3 return new PowerOfTwoEventExecutorChooser(executors);
4 } else {
5 return new GenericEventExecutorChooser(executors);
6 }
7 }
複製代碼
這裏的策略也很簡單:app
PowerOfTwoEventExecutorChooser
這個選擇器,由於這樣能夠採用位運算去獲取執行任務的EventExecutor
。1 public EventExecutor next() {
2 return executors[idx.getAndIncrement() & executors.length - 1];
3 }
複製代碼
GenericEventExecutorChooser
選擇器,這裏採用的是取模的方式去獲取執行任務的EventExecutor
。1 public EventExecutor next() {
2 return executors[Math.abs(idx.getAndIncrement() % executors.length)];
3 }
複製代碼
相比而言,位運算的效率要比取模的效率高,因此我們在自定義線程數的時候,最好設置成爲2^n個線程數。ide
到達最終調用的函數函數
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
2 EventExecutorChooserFactory chooserFactory, Object... args) {
3 if (nThreads <= 0) {
4 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
5 }
6
7 if (executor == null) {
8 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
9 }
10
11 children = new EventExecutor[nThreads];
12
13 for (int i = 0; i < nThreads; i ++) {
14 boolean success = false;
15 try {
16 children[i] = newChild(executor, args);
17 success = true;
18 } catch (Exception e) {
19 // TODO: Think about if this is a good exception type
20 throw new IllegalStateException("failed to create a child event loop", e);
21 } finally {
22 if (!success) {
23 for (int j = 0; j < i; j ++) {
24 //建立NioEventLoop失敗後進行資源的一些釋放
25 children[j].shutdownGracefully();
26 }
27
28 for (int j = 0; j < i; j ++) {
29 EventExecutor e = children[j];
30 try {
31 while (!e.isTerminated()) {
32 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
33 }
34 } catch (InterruptedException interrupted) {
35 // Let the caller handle the interruption.
36 Thread.currentThread().interrupt();
37 break;
38 }
39 }
40 }
41 }
42 }
43 //這裏能夠去看下上面對於 DefaultEventExecutorChooserFactory的一些介紹
44 chooser = chooserFactory.newChooser(children);
45
46 final FutureListener<Object> terminationListener = new FutureListener<Object>() {
47 @Override
48 public void operationComplete(Future<Object> future) throws Exception {
49 if (terminatedChildren.incrementAndGet() == children.length) {
50 terminationFuture.setSuccess(null);
51 }
52 }
53 };
54
55 for (EventExecutor e: children) {
56 // 給每個成功建立的EventExecutor 綁定一個監聽終止事件
57 e.terminationFuture().addListener(terminationListener);
58 }
59
60 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
61 Collections.addAll(childrenSet, children);
62 // 弄一個只讀的EventExecutor數組,方便後面快速迭代,不會拋出併發修改異常
63 readonlyChildren = Collections.unmodifiableSet(childrenSet);
64 }
複製代碼
從上面的代碼能夠觀察到,等了好久的executor 在這裏終於給其賦值了,其值爲ThreadPerTaskExecutor
的一個實例對象,這一塊的初始化賦值都是很簡單的,幹活調用的是以下方法:
1 public void execute(Runnable command) {
2 threadFactory.newThread(command).start();
3 }
複製代碼
對這一塊不是很瞭解的能夠去查閱下線程池有關的資料,我們重點關注一下newChild
這個方法,能夠說是上面整個流程中的重點:
newChild
這個方法在NioEventLoopGroup
中被重寫了:
1 protected EventLoop newChild(Executor executor, Object... args) throws Exception {
2 return new NioEventLoop(this, executor, (SelectorProvider) args[0],
3 ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
4 }
複製代碼
細心的小夥伴能夠觀察到,這裏有用到SelectorProvider,SelectStrategyFactory以及RejectedExecutionHandler這個三個參數,實際上就是本文最開始初始化的三個實例對象(能夠翻閱到頂部查看一下)。
繼續往下走流程:
1 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
2 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
3 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
4 if (selectorProvider == null) {
5 throw new NullPointerException("selectorProvider");
6 }
7 if (strategy == null) {
8 throw new NullPointerException("selectStrategy");
9 }
10 provider = selectorProvider;
11 final SelectorTuple selectorTuple = openSelector();
12 selector = selectorTuple.selector;
13 unwrappedSelector = selectorTuple.unwrappedSelector;
14 selectStrategy = strategy;
15 }
複製代碼
在上面的代碼片斷中除了調用父類的構造器以外就進行了參數的判空和簡單的賦值。這裏openSelector
方法調用後返回SelectorTuple
實例主要是爲了能同時獲得包裝先後的selector
與unwrappedSelector
。
1 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
2 boolean addTaskWakesUp, int maxPendingTasks,
3 RejectedExecutionHandler rejectedHandler) {
4 super(parent);
5 this.addTaskWakesUp = addTaskWakesUp;
6 this.maxPendingTasks = Math.max(16, maxPendingTasks);
7 this.executor = ObjectUtil.checkNotNull(executor, "executor");
8 taskQueue = newTaskQueue(this.maxPendingTasks);
9 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
10 }
複製代碼
這裏會有一個taskQueue
隊列的初始化(Queue<Runnable> taskQueue
),看名字就知道,這個隊列裏面放着的是我們要去執行的任務。這裏的初始化方法newTaskQueue
在NioEventLoop
中重寫了的。具體以下:
1 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
2 // This event loop never calls takeTask()
3 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
4 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
5 }
複製代碼
這裏生成的是一個MPSC隊列(Multi Producer Single Consumer),這是一個多生產者單消費的無鎖隊列,支持併發。從字面意思上就能夠觀察到這個隊列效率應該是蠻高的。這裏的maxPendingTasks
值爲Integer.MAX_VALUE
。而後最終生成的是MpscUnboundedArrayQueue
這樣一個無邊界的隊列。
這樣newChild
這個方法到這裏就走完了。
簡單介紹下這個環節,在上面的建立NioEventLoopGroup
有個環節是給每一個NioEventLoop
兒子綁定一個terminationListener監聽事件
1 for (EventExecutor e: children) {
2 e.terminationFuture().addListener(terminationListener);
3 }
複製代碼
這個事件的回調方法是:
1 @Override
2 public void operationComplete(Future<Object> future) throws Exception {
3 if (terminatedChildren.incrementAndGet() == children.length) {
4 terminationFuture.setSuccess(null);
5 }
6 }
複製代碼
在每個NioEventLoop
關閉後,就會回調這個方法,而後給NioEventLoopGroup
實例中的terminatedChildren
字段自增1,並與初始化成功的NioEventLoop
的總個數進行比較,若是terminatedChildren
的值與NioEventLoop
的總個數相等,則調用bossGroup.terminationFuture().get()
方法就不會阻塞,並正常返回null
。
一樣,future.channel().closeFuture().sync()
這段代碼也將不會阻塞住了,調用sync.get()
也會返回null
。
下面給一段測試代碼,完整示例你們能夠到個人github中去獲取:
上面的代碼只是一個簡單的測試,後面還有別的發現的話會繼續在github中與你們一塊兒分享~