類圖以下,EventLoop接口繼承了EventLoopGroup的接口,EventLoop和EventLoopGroup是io.netty.channel包中的類,爲了與Channel的事件進行交互,EventExecutorGroup是io.netty.util.concurrent包中的類,他直接繼承了JDK的java.util.concurrent包的ScheduledExecutorService接口,對ScheduledExecutorService接口進行加強,用於提供線程執行器。
在EventLoop接口中,只定義了一個方法parent(),用於返回他所屬的EventLoopGroup。java
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup { @Override EventLoopGroup parent(); }
EventLoop和EventLoopGroup以及Thread、Channel的關係以下圖:
這些關係是:數組
以NioEventLoopGroup爲例,下面是他的部分類結構圖,能夠看出它實現了LoopGroup接口。
咱們從new NioEventLoopGroup()
開始。
NioEventLoopGroup類ide
// 這邊沒傳線程數,就默認0 public NioEventLoopGroup() { this(0); } // 。。。部分略 // 這邊會調用父類MultithreadEventLoopGroup的構造函數 public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
NioEventLoopGroup的幾個構造參數:函數
MultithreadEventLoopGroup類,對線程數的處理。oop
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { // 判斷傳過來的線程數量是否等於0,等於0從DEFAULT_EVENT_LOOP_THREADS取值 // 在靜態代碼塊中,從系統屬性中io.netty.eventLoopThreads獲取,缺省值爲CPU核心數*2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
MultithreadEventExecutorGroup類,建立EventExecutor數組,並監聽是否terminated源碼分析
// 傳遞EventExecutorChooserFactory,這個是用來從EventExecutor數組選擇一個EventExecutor protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { // 線程判斷 if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 設置executor,ThreadPerTaskExecutor中的execute方法以下 // threadFactory.newThread(command).start(),說明每次有任務,就建立一個線程 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 設置子類,children至關於線程池的數組 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 經過executor和傳過來的參數實例化 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { // 若是失敗了,前面已經實例化的線程都要關閉 for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 根據線程的數量建立chooser // 若是線程數是2的冪次方,則使用PowerOfTwoEventExecutorChooser // 不然使用GenericEventExecutorChooser chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { // terminated的EventExecutor個數若是等於EventExecutor數組的個數,這個線程池就terminated if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; // 爲每一個EventExecutor都添加監聽 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } // 設置只讀的EventExecutor數組 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
newChild方法,在NioEventLoopGroup類中,這個後面的args參數,就是咱們以前傳過來的,其中Selector對象就是在這個方法裏經過SelectorProvider的openSelector()獲取的,EventLoop都有本身的Selector對象。this
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
總結一下,MultithreadEventExecutorGroup有個EventExecutor(NioEventLoop繼承了SingleThreadEventLoop,SingleThreadEventLoop實現了EventLoop接口,而EventLoop繼承了EventExecutor)類型的children,因此至關於NioEventLoopGroup內部有個EventLoop數組,這個EventLoop數組的大小,就是線程數。這些EventLoop的parent都是同一個Executor。spa
在Channel註冊的時候(這個後面講),會調用SingleThreadEventExecutor的execute方法。這個方法主要限制線程的數量,taskQueue隊列的大小最小爲16,若是系統屬性io.netty.eventLoop.maxPendingTasks沒有設置,就默認Integer.MAX_VALUE
。.net
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); // 當前調用線程是不是支撐 EventLoop 的線程 boolean inEventLoop = inEventLoop(); // 加入到隊列,若是隊列滿了,則根據策略拋出異常 addTask(task); // 若是不是當前線程 if (!inEventLoop) { // 啓動一個線程 startThread(); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
startThread方法,先判斷是否知足啓動的條件,實際的啓動在doStartThread方法裏。線程
private void startThread() { // 還沒啓動 if (state == ST_NOT_STARTED) { // 經過cas判斷是否正確更改狀態,這邊可能有多個線程進行啓動,可是隻要一個啓動就行了 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { // 開啓線程 doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); } } } }
doStartThread,經過ThreadPerTaskExecutor線程池建立一個線程。
private void doStartThread() { assert thread == null; // 這個executor就是ThreadPerTaskExecutor,上面提過,每次有任務就建立一個線程 executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; // 更新最後執行時間 updateLastExecutionTime(); try { // 主要調用run方法,在這裏執行隊列的任務 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { // 省略 } } }); }
總結一下,一個EventLoop只有一個線程,並維護一個任務隊列,經過這個線程來處理任務。