NioEventLoop是netty及其重要的組成部件,它的首要職責就是爲註冊在它上的channels服務,發現這些channels上發生的新鏈接、讀寫等I/O事件,而後將事件轉交 channel 流水線處理。使用netty時,咱們首先要作的就是建立NioEventLoopGroup,這是一組NioEventLoop的集合,相似線程池與線程池組。一般,服務端會建立2個group,一個叫作bossGroup,一個叫作workerGroup。bossGroup負責監聽綁定的端口,發現端口上的新鏈接,初始化後交由workerGroup處理後續的讀寫事件。java
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
咱們先看看bossGroup和workerGroup的構造方法。windows
public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } 除此以外,還有多達8種構造方法,這些構造方法能夠指定5種參數: 一、最大線程數量。若是指定爲0,那麼Netty會將線程數量設置爲CPU邏輯處理器數量的2倍 二、線程工廠。要求線程工廠類必須實現java.util.concurrent.ThreadFactory接口。若是沒有指定線程工廠,那麼默認DefaultThreadFactory。 三、SelectorProvider。若是沒有指定SelectorProvider,那麼默認的SelectorProvider爲SelectorProvider.provider()。 四、SelectStrategyFactory。若是沒有指定則默認爲DefaultSelectStrategyFactory.INSTANCE 五、RejectedExecutionHandler。拒絕策略處理類,若是這個EventLoopGroup已被關閉,那麼以後提交的Runnable任務會默認調用RejectedExecutionHandler的reject方法進行處理。若是沒有指定,則默認調用拒絕策略。
最終,NioEventLoopGroup會重載到父類mUltiThreadEventExecutorGroup的構造方法上,這裏省略了一些健壯性代碼。數組
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) { // 步驟1 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 步驟2 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); } // 步驟3 chooser = chooserFactory.newChooser(children); // 步驟4 final FutureListener<Object> terminationListener = future -> { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } // 步驟5 Set<EventExecutor> childrenSet = new LinkedHashSet<>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
第一個步驟是建立線程池executor。從workerGroup構造方法可知,默認傳進來的executor爲null,因此首先建立executor。newDefaultThreadFactory的做用是設置線程的前綴名和線程優先級,默認狀況下,前綴名是nioEventLoopGroup-x-y這樣的命名規則,而線程優先級則是5,處於中間位置。
建立完newDefaultThreadFactory後,進入到ThreadPerTaskExecutor。它直接實現了juc包的線程池頂級接口,從構造方法能夠看到它只是簡單的把factory賦值給本身的成員變量。而它實現的接口方法調用了threadFactory的newThread方法。從名字能夠看出,它構造了一個thread,並當即啓動thread。app
public ThreadPerTaskExecutor(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); }
那麼咱們回過頭來看下DefaultThreadFactory的newThread方法,發現他建立了一個FastThreadLocalThread。這是netty自定義的一個線程類,顧名思義,netty認爲它的性能更快。關於它的解析留待之後。這裏步驟1建立線程池就完成了。總的來講他與咱們一般使用的線程池不太同樣,不設置線程池的線程數和任務隊列,而是來一個任務啓動一個線程。(問題:那任務一多豈不是直接線程爆炸?)負載均衡
@Override public Thread newThread(Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); return t; } protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); }
步驟2是建立workerGroup中的NioEventLoop。在示例代碼中,傳進來的線程數是0,顯然不可能真正只建立0個nioEventLoop線程。在調用父類MultithreadEventLoopGroup構造函數時,對線程數進行了判斷,若爲0,則傳入默認線程數,該值默認爲2倍CPU核心數ide
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } // 靜態代碼塊初始化DEFAULT+EVENT_LOOP_THREADS static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); }
接下來是經過newChild方法爲每個EventExecutor建立一個對應的NioEventLoop。這個方法傳入了一些args到NioEventLoop中,它們分別是:函數
進入NioEventLoop的構造函數,以下:oop
NioEventLoop構造函數 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } // 父類構造函數 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.executor = ThreadExecutorMap.apply(executor, this); this.taskQueue = ObjectUtil.checkNotNull(taskQueue, taskQueue"); rejectedExecutionHandler = ObjectUtil.checkNotNullrejectedHandler, "rejectedHandler"); }
首先調用一個newTaskQueue方法建立一個任務隊列。這是一個mpsc即多生產者單消費者的無鎖隊列。以後調用父類的構造函數,在父類的構造函數中,將NioEventLoopGroup設置爲本身的parent,並經過匿名內部類建立了這樣一個Executor————經過ThreadPerTaskExecutor執行傳進來的任務,而且在執行時將當前線程與NioEventLoop綁定。其餘屬性也一一設置。
在nioEventLoop構造函數中,咱們發現建立了一個selector,不妨看一看netty對它的包裝。性能
unwrappedSelector = provider.openSelector(); if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); }
首先看到netty定義了一個常量DISABLE_KEY_SET_OPTIMIZATION,若是這個常量設置爲true,也即不對keyset進行優化,則直接返回未包裝的selector。那麼netty對selector進行了哪些優化?優化
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } }
往下,咱們看到了一個叫作selectedSelectionKeySet的類,點進去能夠看到,它繼承了AbstractSet,然而它的成員變量卻讓咱們想到了ArrayList,再看看它定義的方法,除了不支持remove和contains外,活脫脫一個簡化版的ArrayList,甚至也支持擴容。
沒錯,netty確實經過反射的方式,將selectionKey從Set替換爲了ArrayList。仔細一想,卻又以爲此番作法有些道理。衆所周知,雖然HashSet和ArrayList隨機查找的時間複雜度都是o(1),但相比數組直接經過偏移量定位,HashSet因爲須要Hash運算,時間消耗上又稍稍遜色了些。再加上使用場景上,都是獲取selectionKey集合而後遍歷,Set去重的特性徹底用不上,也無怪乎追求性能的netty想要替換它了。
建立完workerGroup的NioEventLoop後,如何挑選一個nioEventLoop進行工做是netty接下來想要作的事。通常來講輪詢是一個很容易想到的方案,爲此須要建立一個相似負載均衡做用的線程選擇器。固然追求性能到喪心病狂的netty是不會輕易知足的。咱們看看netty在這樣常見的方案裏又作了哪些操做。
public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } // PowerOfTwo public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } // Generic public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; }
能夠看到,netty根據workerGroup內線程的數量採起了2種不一樣的線程選擇器,當線程數x是2的冪次方時,能夠經過&(x-1)來達到對x取模的效果,其餘狀況則須要直接取模。這與hashmap強制設置容量爲2的冪次方有殊途同歸之妙。
步驟4就是添加一些保證健壯性而添加的監聽器了,這些監聽器會在EventLoop被關閉後獲得通知。
建立一個只讀的NioEventLoop線程組
到此NioEventLoopGroup及其包含的NioEventLoop組就建立完成了