Netty系列(一):NioEventLoopGroup源碼解析

前言

對於NioEventLoopGroup這個對象,在個人理解裏面它就和ThreadGroup相似,NioEventLoopGroup中有一堆NioEventLoop小弟,ThreadGroup中有一堆Thread小弟,真正意義上幹活的都是NioEventLoopThread這兩個小弟。下面的文章你們能夠類比下進行閱讀,應該會很容易弄懂的。(本文基於netty-4.1.32.Final)php

NioEventLoopGroup

這裏我們能夠從NioEventLoopGroup最簡單的無參構造函數開始。java

1    public NioEventLoopGroup() {
2        this(0);
3    }
複製代碼

一步步往下走,能夠發現最終調用到構造函數:git

1    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
2                             final SelectStrategyFactory selectStrategyFactory)
 
{
3        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
4    }
複製代碼

參數說明:github

  1. nThreads:在整個方法鏈的調用過程當中,其值到這裏爲止一直爲0,在沒有主動配置的狀況下後面會進行設置。若配置io.netty.eventLoopThreads系統環境變量,則優先考慮,不然設置成爲CPU核心數*2
  2. executor: 到目前爲止是null
  3. selectorProvider: 這裏爲JDK的默認實現SelectorProvider.provider()
  4. selectStrategyFactory:這裏的值是DefaultSelectStrategyFactory的一個實例SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory()
  5. RejectedExecutionHandlers:這裏是個拒絕策略,這裏默認的實現是隊列溢出時拋出RejectedExecutionException異常。

MultithreadEventLoopGroup

繼續往下面走,調用父類MultithreadEventLoopGroup中的構造函數:數組

1    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
2        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
3    }
複製代碼

這裏能夠看到判斷nThreads == 0後就會給其附上一個默認值。繼續走,調用父類MultithreadEventExecutorGroup中的構造方法。bash

1    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
2        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
3    }
複製代碼
DefaultEventExecutorChooserFactory

這裏有個關注的點,DefaultEventExecutorChooserFactory。這是一個chooserFactory,用來生產EventExecutorChooser選擇器的。而EventExecutorChooser的功能是用來選擇哪一個EventExecutor去執行我們的任務。我們從下面的代碼中能夠觀察到DefaultEventExecutorChooserFactory一共給我們提供了兩種策略。併發

1    public EventExecutorChooser newChooser(EventExecutor[] executors) {
2        if (isPowerOfTwo(executors.length)) {
3            return new PowerOfTwoEventExecutorChooser(executors);
4        } else {
5            return new GenericEventExecutorChooser(executors);
6        }
7    }
複製代碼

這裏的策略也很簡單:app

  1. 若是給的線程數是2^n個,那麼選擇PowerOfTwoEventExecutorChooser這個選擇器,由於這樣能夠採用位運算去獲取執行任務的EventExecutor
1        public EventExecutor next() {
2            return executors[idx.getAndIncrement() & executors.length - 1];
3        }
複製代碼
  1. GenericEventExecutorChooser選擇器,這裏採用的是取模的方式去獲取執行任務的EventExecutor
1        public EventExecutor next() {
2            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
3        }
複製代碼

相比而言,位運算的效率要比取模的效率高,因此我們在自定義線程數的時候,最好設置成爲2^n個線程數。ide


幹正事

到達最終調用的函數函數

 1    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
2                                            EventExecutorChooserFactory chooserFactory, Object... args)
 
{
3        if (nThreads <= 0) {
4            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
5        }
6
7        if (executor == null) {
8            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
9        }
10
11        children = new EventExecutor[nThreads];
12
13        for (int i = 0; i < nThreads; i ++) {
14            boolean success = false;
15            try {
16                children[i] = newChild(executor, args);
17                success = true;
18            } catch (Exception e) {
19                // TODO: Think about if this is a good exception type
20                throw new IllegalStateException("failed to create a child event loop", e);
21            } finally {
22                if (!success) {
23                    for (int j = 0; j < i; j ++) {
24                        //建立NioEventLoop失敗後進行資源的一些釋放
25                        children[j].shutdownGracefully();
26                    }
27
28                    for (int j = 0; j < i; j ++) {
29                        EventExecutor e = children[j];
30                        try {
31                            while (!e.isTerminated()) {
32                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
33                            }
34                        } catch (InterruptedException interrupted) {
35                            // Let the caller handle the interruption.
36                            Thread.currentThread().interrupt();
37                            break;
38                        }
39                    }
40                }
41            }
42        }
43       //這裏能夠去看下上面對於 DefaultEventExecutorChooserFactory的一些介紹
44        chooser = chooserFactory.newChooser(children);
45
46        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
47            @Override
48            public void operationComplete(Future<Object> future) throws Exception {
49                if (terminatedChildren.incrementAndGet() == children.length) {
50                    terminationFuture.setSuccess(null);
51                }
52            }
53        };
54
55        for (EventExecutor e: children) {
56            // 給每個成功建立的EventExecutor 綁定一個監聽終止事件
57            e.terminationFuture().addListener(terminationListener);
58        }
59
60        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
61        Collections.addAll(childrenSet, children);
62        // 弄一個只讀的EventExecutor數組,方便後面快速迭代,不會拋出併發修改異常
63        readonlyChildren = Collections.unmodifiableSet(childrenSet);
64    }
複製代碼

從上面的代碼能夠觀察到,等了好久的executor 在這裏終於給其賦值了,其值爲ThreadPerTaskExecutor的一個實例對象,這一塊的初始化賦值都是很簡單的,幹活調用的是以下方法:

1    public void execute(Runnable command) {
2        threadFactory.newThread(command).start();
3    }
複製代碼

對這一塊不是很瞭解的能夠去查閱下線程池有關的資料,我們重點關注一下newChild這個方法,能夠說是上面整個流程中的重點:

newChild

newChild這個方法在NioEventLoopGroup中被重寫了:

1    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
2        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
3            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
4    }
複製代碼

細心的小夥伴能夠觀察到,這裏有用到SelectorProvider,SelectStrategyFactory以及RejectedExecutionHandler這個三個參數,實際上就是本文最開始初始化的三個實例對象(能夠翻閱到頂部查看一下)。
繼續往下走流程:

 1    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
2                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
3        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
4        if (selectorProvider == null) {
5            throw new NullPointerException("selectorProvider");
6        }
7        if (strategy == null) {
8            throw new NullPointerException("selectStrategy");
9        }
10        provider = selectorProvider;
11        final SelectorTuple selectorTuple = openSelector();
12        selector = selectorTuple.selector;
13        unwrappedSelector = selectorTuple.unwrappedSelector;
14        selectStrategy = strategy;
15    }
複製代碼

在上面的代碼片斷中除了調用父類的構造器以外就進行了參數的判空和簡單的賦值。這裏openSelector方法調用後返回SelectorTuple實例主要是爲了能同時獲得包裝先後的selectorunwrappedSelector

 1    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
2                                        boolean addTaskWakesUp, int maxPendingTasks,
3                                        RejectedExecutionHandler rejectedHandler) {
4        super(parent);
5        this.addTaskWakesUp = addTaskWakesUp;
6        this.maxPendingTasks = Math.max(16, maxPendingTasks);
7        this.executor = ObjectUtil.checkNotNull(executor, "executor");
8        taskQueue = newTaskQueue(this.maxPendingTasks);
9        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
10    }
複製代碼

這裏會有一個taskQueue隊列的初始化(Queue<Runnable> taskQueue),看名字就知道,這個隊列裏面放着的是我們要去執行的任務。這裏的初始化方法newTaskQueueNioEventLoop中重寫了的。具體以下:

1    protected Queue<Runnable> newTaskQueue(int maxPendingTasks{
2        // This event loop never calls takeTask()
3        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
4                                                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
5    }
複製代碼

這裏生成的是一個MPSC隊列(Multi Producer Single Consumer),這是一個多生產者單消費的無鎖隊列,支持併發。從字面意思上就能夠觀察到這個隊列效率應該是蠻高的。這裏的maxPendingTasks值爲Integer.MAX_VALUE。而後最終生成的是MpscUnboundedArrayQueue這樣一個無邊界的隊列。

這樣newChild這個方法到這裏就走完了。


terminationListener

簡單介紹下這個環節,在上面的建立NioEventLoopGroup有個環節是給每一個NioEventLoop兒子綁定一個terminationListener監聽事件

1        for (EventExecutor e: children) {
2            e.terminationFuture().addListener(terminationListener);
3        }
複製代碼

這個事件的回調方法是:

1            @Override
2            public void operationComplete(Future<Object> future) throws Exception {
3                if (terminatedChildren.incrementAndGet() == children.length) {
4                    terminationFuture.setSuccess(null);
5                }
6            }
複製代碼

在每個NioEventLoop關閉後,就會回調這個方法,而後給NioEventLoopGroup實例中的terminatedChildren字段自增1,並與初始化成功的NioEventLoop的總個數進行比較,若是
terminatedChildren的值與NioEventLoop的總個數相等,則調用bossGroup.terminationFuture().get()方法就不會阻塞,並正常返回null
一樣,future.channel().closeFuture().sync()這段代碼也將不會阻塞住了,調用sync.get()也會返回null
下面給一段測試代碼,完整示例你們能夠到個人github中去獲取:

terminationListener_test
terminationListener_test

上面的代碼只是一個簡單的測試,後面還有別的發現的話會繼續在github中與你們一塊兒分享~


End

相關文章
相關標籤/搜索