首先咱們來看一下 Reactor 的線程模型.
Reactor 的線程模型有三種:數組
首先來看一下 單線程模型:服務器
所謂單線程, 即 acceptor 處理和 handler 處理都在一個線程中處理. 這個模型的壞處顯而易見: 當其中某個 handler 阻塞時, 會致使其餘全部的 client 的 handler 都得不到執行, 而且更嚴重的是, handler 的阻塞也會致使整個服務不能接收新的 client 請求(由於 acceptor 也被阻塞了). 由於有這麼多的缺陷, 所以單線程 Reactor 模型用的比較少.多線程
那麼什麼是多線程模型呢? Reactor 的多線程模型與單線程模型的區別就是 acceptor 是一個單獨的線程處理, 而且有一組特定的 NIO 線程來負責各個客戶端鏈接的 IO 操做. Reactor 多線程模型以下:ide
Reactor 多線程模型 有以下特色:oop
接下來咱們再來看一下 Reactor 的主從多線程模型.fetch
通常狀況下, Reactor 的多線程模式已經能夠很好的工做了, 可是咱們考慮一下以下狀況: 若是咱們的服務器須要同時處理大量的客戶端鏈接請求或咱們須要在客戶端鏈接時, 進行一些權限的檢查, 那麼單線程的 Acceptor 頗有可能就處理不過來, 形成了大量的客戶端不能鏈接到服務器.spa
Reactor 的主從多線程模型就是在這樣的狀況下提出來的, 它的特色是: 服務器端接收客戶端的鏈接請求再也不是一個線程, 而是由一個獨立的線程池組成. 它的線程模型以下:線程
能夠看到, Reactor 的主從多線程模型和 Reactor 多線程模型很相似, 只不過 Reactor 的主從多線程模型的 acceptor 使用了線程池來處理大量的客戶端請求.code
咱們介紹了三種 Reactor 的線程模型, 那麼它們和 NioEventLoopGroup 又有什麼關係呢? 其實, 不一樣的設置 NioEventLoopGroup
的方式就對應了不一樣的 Reactor 的線程模型.繼承
來看一下下面的例子:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup) .channel(NioServerSocketChannel.class) ...
注意, 咱們實例化了一個 NioEventLoopGroup
, 構造器參數是1, 表示 NioEventLoopGroup
的線程池大小是1.
而後接着咱們調用 b.group(bossGroup)
設置了服務器端的 EventLoopGroup
. 有些朋友可能會有疑惑: 我記得在啓動服務器端的 Netty 程序時, 是須要設置 bossGroup
和 workerGroup
的, 爲何這裏就只有一個 bossGroup
?
其實很簡單, ServerBootstrap
重寫了 group
方法:
@Override public ServerBootstrap group(EventLoopGroup group) { return group(group, group); }
所以當傳入一個 group 時, 那麼 bossGroup 和 workerGroup 就是同一個 NioEventLoopGroup
了. 而且這個 NioEventLoopGroup
只有一個線程, 這樣就會致使 Netty 中的 acceptor 和後續的全部客戶端鏈接的 IO 操做都是在一個線程中處理的. 那麼對應到 Reactor 的線程模型中, 咱們這樣設置 NioEventLoopGroup
時, 就至關於 Reactor 單線程模型.
同理, 再來看一下下面的例子:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ...
bossGroup 中只有一個線程, 而 workerGroup 中的線程是 CPU 核心數乘以2, 所以對應的到 Reactor 線程模型中, 咱們知道, 這樣設置的 NioEventLoopGroup 其實就是 Reactor 多線程模型.
EventLoopGroup bossGroup = new NioEventLoopGroup(4); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ...
服務器端的 ServerSocketChannel
只綁定到了 bossGroup 中的一個線程, 所以在調用 Java NIO 的 Selector.select
處理客戶端的鏈接請求時, 其實是在一個線程中的, 因此對只有一個服務的應用來講, bossGroup 設置多個線程是沒有什麼做用的, 反而還會形成資源浪費.
bossGroup 是用於服務端的 accept, 即用於處理客戶端的鏈接請求. 咱們能夠把 Netty 比做一個飯店, bossGroup 就像一個像一個前臺接待, 當客戶來到飯店吃時, 接待員就會引導顧客就坐, 爲顧客端茶送水等.
而 workerGroup, 其實就是實際上幹活的, 它們負責客戶端鏈接通道的 IO 操做: 當接待員 招待好顧客後, 就能夠稍作休息, 而此時後廚裏的廚師們(workerGroup)就開始忙碌地準備飯菜了.
關於 bossGroup 與 workerGroup 的關係, 咱們能夠用以下圖來展現:
首先, 服務器端 bossGroup 不斷地監聽是否有客戶端的鏈接, 當發現有一個新的客戶端鏈接到來時, bossGroup 就會爲此鏈接初始化各項資源, 而後從 workerGroup 中選出一個 EventLoop 綁定到此客戶端鏈接中. 那麼接下來的服務器與客戶端的交互過程就所有在此分配的 EventLoop 中了.
NioEventLoop 繼承於 SingleThreadEventLoop, 而 SingleThreadEventLoop 又繼承於 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中對本地線程的抽象, 它內部有一個 Thread thread 屬性, 存儲了一個本地 Java 線程. 所以咱們能夠認爲, 一個 NioEventLoop 其實和一個特定的線程綁定, 而且在其生命週期內, 綁定的線程都不會再改變.
NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor
在 AbstractScheduledEventExecutor 中, Netty 實現了 NioEventLoop 的 schedule 功能, 即咱們能夠經過調用一個 NioEventLoop 實例的 schedule 方法來運行一些定時任務. 而在 SingleThreadEventLoop 中, 又實現了任務隊列的功能, 經過它, 咱們能夠調用一個 NioEventLoop 實例的 execute 方法來向任務隊列中添加一個 task, 並由 NioEventLoop 進行調度執行.
一般來講, NioEventLoop 肩負着兩種任務, 第一個是做爲 IO 線程, 執行與 Channel 相關的 IO 操做, 包括 調用 select 等待就緒的 IO 事件、讀寫數據與數據的處理等; 而第二個任務是做爲任務隊列, 執行 taskQueue 中的任務, 例如用戶調用 eventLoop.schedule 提交的定時任務也是這個線程執行的.
NioEventLoopGroup 就像一個線程池, 負責爲每一個新建立的 Channel 分配一個 EventLoop. 而 EventLoop 就是一個線程, 負責執行用戶任務和 IO 事件.
咱們已經提到過, 在Netty 中, 一個 NioEventLoop 一般須要肩負起兩種任務, 第一個是做爲 IO 線程, 處理 IO 操做; 第二個就是做爲任務線程, 處理 taskQueue 中的任務.
NioEventLoop 繼承於 SingleThreadEventExecutor, 而 SingleThreadEventExecutor 中有一個 Queue<Runnable> taskQueue 字段, 用於存放添加的 Task. 在 Netty 中, 每一個 Task 都使用一個實現了 Runnable 接口的實例來表示.
例如當咱們須要將一個 Runnable 添加到 taskQueue 中時, 咱們能夠進行以下操做:
EventLoop eventLoop = channel.eventLoop(); eventLoop.execute(new Runnable() { @Override public void run() { System.out.println("Hello, Netty!"); } });
當一個任務被添加到 taskQueue 後, 它是怎麼被 EventLoop 執行的呢?
讓咱們回到 NioEventLoop.run() 方法中, 在這個方法裏, 會分別調用 processSelectedKeys() 和 runAllTasks() 方法, 來進行 IO 事件的處理和 task 的處理.
runAllTasks 方法有兩個重載的方法, 一個是無參數的, 另外一個有一個參數的. 首先來看一下無參數的 runAllTasks:
protected boolean runAllTasks() { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } }
在此方法的一開始調用的 fetchFromScheduledTaskQueue()
其實就是將 scheduledTaskQueue
中已經能夠執行的(即定時時間已到的 schedule 任務) 拿出來並添加到 taskQueue
中, 做爲可執行的 task 等待被調度執行.
private void fetchFromScheduledTaskQueue() { if (hasScheduledTasks()) { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { break; } taskQueue.add(scheduledTask); } } }
接下來 runAllTasks()
方法就會不斷調用 task = pollTask()
從 taskQueue 中獲取一個可執行的 task, 而後調用它的 run() 方法來運行此 task.