Netty 源碼 NioEventLoop(一)初始化

Netty 源碼 NioEventLoop(一)初始化

Netty 系列目錄(http://www.javashuo.com/article/p-hskusway-em.html)html

Netty 基於事件驅動模型,使用不一樣的事件來通知咱們狀態的改變或者操做狀態的改變。它定義了在整個鏈接的生命週期裏當有事件發生的時候處理的核心抽象。java

Channel 爲 Netty 網絡操做抽象類,EventLoop 主要是爲 Channel 處理 I/O 操做,二者配合參與 I/O 操做。算法

EventLoopGroup 是一個 EventLoop 的分組,它能夠獲取到一個或者多個 EventLoop 對象,所以它提供了迭代出 EventLoop 對象的方法。設計模式

1、EventLoop 類圖

NioEventLoop 類圖

其中 Executor、ExecutorService、AbstractExecutorService、ScheduledExecutorService 屬於 JDK 定義的規範,Netty 實現了本身的自定義線程池。網絡

  • EventExecutorGroup 提供了 next() 方法,除此以外是線程池的生命週期方法,如 shutdownGracefully。
  • EventLoopGroup 提供了 register 方法,將 channel 綁定到線程上。
  • EventExecutor 繼承自 EventExecutorGroup,提供了 inEventLoop 方法。
  • EventLoop 繼承自 EventLoopGroup,提供了 register 註冊和 parent 方法。
  • MultithreadEventExecutorGroup 基於多線程的 EventExecutor (事件執⾏行器)的分組抽象類
  • ThreadPerTaskExecutor 實現 Executor 接⼝口,每一個任務一個線程的執行器實現類

2、NioEventLoopGroup 初始化

NioEventLoopGroup 初始化

(1) NioEventLoopGroup多線程

NioEventLoopGroup 構造方法中最重要的一件事是建立子線程 NioEventLoop,建立完成後子線程並未啓動,該線程在 channel 註冊時啓動。app

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
         final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory,
         final RejectedExecutionHandler rejectedExecutionHandler) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
  • nThreads, threadFactory 前兩個參數用於建立線程池,若是 nThreads=0 則默認爲 CPU 核數的 2 倍
  • chooserFactory 用於循環獲取 NioEventLoopGroup 中的下一個 NioEventLoop 的算法。2 的冪次方使用位運算
  • selectorProvider, selectStrategyFactory 後二個參數用於建立 selector
  • rejectedExecutionHandler 異常處理的 Handler

(2) MultithreadEventExecutorGroupide

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
        EventExecutorChooserFactory chooserFactory, Object... args) {
    // 1. executor 用於建立一個子線程
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 2. 建立全部的子線程 children
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // 有異常則須要銷燬資源
            }
        }
    }

    // 3. chooser 用於循環獲取 children 中的下一個元素的算法。2 的冪次方使用位運算
    chooser = chooserFactory.newChooser(children);

    // 4. children 初始化完成則設置 setSuccess 爲 true
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    // 5. 返回一個只讀的 children 暴露給開發者 
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

MultithreadEventExecutorGroup 建立了全部的子線程,其中最重要的方法是 newChild(executor, args)oop

(3) newChildthis

// NioEventLoopGroup 建立子線程 NioEventLoop
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

(4) NioEventLoop

NioEventLoop 構造時就建立了一個 selector 對象。下面看一個 NioEventLoop 的建立過程。

// 建立了一個 selector 對象
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

// 負責 channel 的註冊
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
        boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    tailTasks = newTaskQueue(maxPendingTasks);
}

// 負責執行任務
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
        boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

經過以上步驟就建立了 NioEventLoop 對象,但這個線程並未啓動。很顯然在 channel 註冊到 NioEventLoop 時會啓動該線程。

3、NioEventLoop 執行任務

(1) execute

在 eventLoop 執行 execute 方法時,若是線程還未啓動則須要先啓動線程。

// SingleThreadEventExecutor
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    // 是否在 EventLoop 線程中
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        // 啓動線程
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    // 喚醒線程
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

!addTaskWakesUp 表示「添加任務時,是否喚醒線程」?!可是,怎麼使⽤用取反了。這樣反倒變成了,「添加任務時,是否【不】喚醒線程」。具體的緣由是爲何呢?

真正的意思是,「添加任務後,任務是否會自動致使線程喚醒」。爲何呢?

  • 對於 Nio 使用的 NioEventLoop ,它的線程執行任務是基於 Selector 監聽感興趣的事件,因此當任務添加到 taskQueue 隊列中時,線程是無感知的,因此須要調用 #wakeup(boolean inEventLoop) 方法,進行主動的喚醒。

  • 對於 Oio 使用的 ThreadPerChannelEventLoop,它的線程執行是基於 taskQueue 隊列列監聽(阻塞拉取)事件和任務,因此當任務添加到 taskQueue 隊列中時,線程是可感知的,至關於說,進行被動的喚醒。

(2) startThread 啓動線程

private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}

// 真正啓動線程,執行的是 NioEventLoop 中的 run 任務
private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // run 方法由子類 NioEventLoop 實現,是一個死循環
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // 若是執行到這裏,則說明須要關閉該線程
            }
        }
    });
}

executor 默認是在 MultithreadEventExecutorGroup 的構造方法完成初始化的,代碼以下:

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

ThreadPerTaskExecutor 經過線程工廠 threadFactory 建立一個線程並啓動,至此 NioEventLoop 開始工做了。在這個簡單的類中使用的代理模式和命令模式兩種設計模式。

(3) 線程狀態變化

SingleThreadEventExecutor 維護了線程的狀態字段 state。

線程狀態變化


天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索