網絡編程 - Netty(EventLoop)

EventLoop和EventLoopGroup的關係

類圖以下,EventLoop接口繼承了EventLoopGroup的接口,EventLoop和EventLoopGroup是io.netty.channel包中的類,爲了與Channel的事件進行交互,EventExecutorGroup是io.netty.util.concurrent包中的類,他直接繼承了JDK的java.util.concurrent包的ScheduledExecutorService接口,對ScheduledExecutorService接口進行加強,用於提供線程執行器。
image.png
在EventLoop接口中,只定義了一個方法parent(),用於返回他所屬的EventLoopGroup。java

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}

EventLoop和EventLoopGroup以及Thread、Channel的關係以下圖:
image.png
這些關係是:數組

  1. 一個EventLoopGroup包含一個或者多個EventLoop;
  2. 一個EventLoop在它的生命週期內只和一個Thread綁定;
  3. 全部由EventLoop處理的I/O事件都將在它專有的Thread上被處理;
  4. 一個Channel在它的生命週期內只註冊於一個EventLoop;
  5. 一個EventLoop可能會被分配給一個或多個Channel。

源碼分析

初始化

以NioEventLoopGroup爲例,下面是他的部分類結構圖,能夠看出它實現了LoopGroup接口。
image.png
咱們從new NioEventLoopGroup()開始。
NioEventLoopGroup類ide

// 這邊沒傳線程數,就默認0
public NioEventLoopGroup() {
    this(0);
}
// 。。。部分略
// 這邊會調用父類MultithreadEventLoopGroup的構造函數
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

NioEventLoopGroup的幾個構造參數:函數

  • nThreads:線程數
  • executor:線程池
  • selectorProvider:用來獲取Selector
  • selectStrategyFactory:select操做的策略
  • RejectedExecutionHandler:線程池滿了,對新的任務應該用的策略

MultithreadEventLoopGroup類,對線程數的處理。oop

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    // 判斷傳過來的線程數量是否等於0,等於0從DEFAULT_EVENT_LOOP_THREADS取值
    // 在靜態代碼塊中,從系統屬性中io.netty.eventLoopThreads獲取,缺省值爲CPU核心數*2
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

MultithreadEventExecutorGroup類,建立EventExecutor數組,並監聽是否terminated源碼分析

// 傳遞EventExecutorChooserFactory,這個是用來從EventExecutor數組選擇一個EventExecutor
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    // 線程判斷                                    
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
    // 設置executor,ThreadPerTaskExecutor中的execute方法以下
    // threadFactory.newThread(command).start(),說明每次有任務,就建立一個線程
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 設置子類,children至關於線程池的數組
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 經過executor和傳過來的參數實例化
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // 若是失敗了,前面已經實例化的線程都要關閉
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    // 根據線程的數量建立chooser
    // 若是線程數是2的冪次方,則使用PowerOfTwoEventExecutorChooser
    // 不然使用GenericEventExecutorChooser
    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            // terminated的EventExecutor個數若是等於EventExecutor數組的個數,這個線程池就terminated
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    // 爲每一個EventExecutor都添加監聽
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
    // 設置只讀的EventExecutor數組
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

newChild方法,在NioEventLoopGroup類中,這個後面的args參數,就是咱們以前傳過來的,其中Selector對象就是在這個方法裏經過SelectorProvider的openSelector()獲取的,EventLoop都有本身的Selector對象。this

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

總結一下,MultithreadEventExecutorGroup有個EventExecutor(NioEventLoop繼承了SingleThreadEventLoop,SingleThreadEventLoop實現了EventLoop接口,而EventLoop繼承了EventExecutor)類型的children,因此至關於NioEventLoopGroup內部有個EventLoop數組,這個EventLoop數組的大小,就是線程數。這些EventLoop的parent都是同一個Executor。spa

execute

在Channel註冊的時候(這個後面講),會調用SingleThreadEventExecutor的execute方法。這個方法主要限制線程的數量,taskQueue隊列的大小最小爲16,若是系統屬性io.netty.eventLoop.maxPendingTasks沒有設置,就默認Integer.MAX_VALUE.net

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    
    // 當前調用線程是不是支撐 EventLoop 的線程
    boolean inEventLoop = inEventLoop();
    // 加入到隊列,若是隊列滿了,則根據策略拋出異常
    addTask(task);
    // 若是不是當前線程
    if (!inEventLoop) {
        // 啓動一個線程
        startThread();
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

startThread方法,先判斷是否知足啓動的條件,實際的啓動在doStartThread方法裏。線程

private void startThread() {
    // 還沒啓動
    if (state == ST_NOT_STARTED) {
        // 經過cas判斷是否正確更改狀態,這邊可能有多個線程進行啓動,可是隻要一個啓動就行了
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
                // 開啓線程
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}

doStartThread,經過ThreadPerTaskExecutor線程池建立一個線程。

private void doStartThread() {
    assert thread == null;
    // 這個executor就是ThreadPerTaskExecutor,上面提過,每次有任務就建立一個線程
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            // 更新最後執行時間
            updateLastExecutionTime();
            try {
                // 主要調用run方法,在這裏執行隊列的任務
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // 省略
            }
        }
    });
}

總結一下,一個EventLoop只有一個線程,並維護一個任務隊列,經過這個線程來處理任務。

相關文章
相關標籤/搜索