在學習 Netty 的 EventLoop 線程模型以前,須要先了解 Java 的 Reactor 模式。java
在網絡編程過程當中,服務器端最原始的方式就是經過一個循環來不斷的監聽端口是否有新的 socket 連接,若是有就直接處理,好比讀取 socket 輸入,寫入輸出等等,完成以後再開始下一次監聽。這種方式有一個最大的問題,當前請求沒有處理完以前,下一個請求只能阻塞着,直到當前請求處理完成,服務器吞吐量低。在併發量大的狀況下,效率很低。
天然的,咱們會想到使用線程來處理。當有新的 socket 連接時,建立一個線程,將後面的處理邏輯都交給前程處理,即一個請求(socket 連接)用一個線程處理,這樣就不會產生阻塞了。雖然這樣解決了吞吐量的問題,可是又帶來新的問題,即在併發量大的狀況下,會不斷的產生新的線程。反覆的線程建立會消耗大量的資源,而多個線程的上下文切換也會影響效率。
採用事件驅動的方式能夠很好的解決這些問題。當有事件發生,好比鏈接創建、數據可讀或可寫時,調用獨立的處理器進行處理,避免阻塞主流程,同時也可使用有限的線程來處理多個事件,避免了線程過多消耗大量資源。
Reactor 模型是一種事件驅動機制。通常的,應用程序經過主動調用某些 API 來完成處理,而 Reactor 卻相反,應用程序提供相應的接口(回調函數)註冊到 Reactor 上,若是有相應的事件發生,Reactor 將主動調用以前註冊的接口。Reactor 能夠同時接收多個請求,而後將它們分發到對應的處理器上。
Reactor 模型有如下幾個主要部分:react
- Selector:事件通知器
- Handler:事件處理器
- SelectionKey:事件標識編程
接下來使用以前的 demo,經過 Java 相關 API 實現 Server 端來學習 Reactor 模型的幾種形態。
複用 demo 中 Netty 的 Client 端,鏈接創建時發送消息到 Server,接收 Server 返回的消息打印在控制檯,如下是 Client 端的代碼示例:bootstrap
package com.niklai.demo; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; public class Client { private static final Logger logger = LoggerFactory.getLogger(Client.class.getSimpleName()); public static void init() { try { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress("localhost", 9999)) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect().sync();點 future.channel().closeFuture().sync(); group.shutdownGracefully().sync(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } static class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String msg = "Client message!"; ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8)); } } }
所謂單線程的 Reactor,就是隻有一個線程來通知和處理事件。數組
package com.niklai.demo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; public class ReactorServer { private static final Logger logger = LoggerFactory.getLogger(ReactorServer.class.getSimpleName()); private Selector selector; private boolean loop = true; private ServerSocketChannel serverChannel; public ReactorServer() { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } public void init() { try { serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress("localhost", 9999)); SelectionKey selectionKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 註冊鏈接接受事件 selectionKey.attach(new Accept(serverChannel, selector)); // 綁定鏈接接受事件的處理器 while (loop) { int select = selector.select(); // 阻塞獲取當前是否有事件觸發 if (select != 0) { Set<SelectionKey> readKeys = selector.selectedKeys(); // 獲取觸發的事件 Iterator<SelectionKey> iterator = readKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); Runnable runnable = (Runnable) key.attachment(); // 獲取事件綁定的處理器並執行 runnable.run(); } } } } catch (Exception e) { e.printStackTrace(); } } // 鏈接創建時的事件處理器 static class Accept implements Runnable { private ServerSocketChannel channel; private Selector selector; public Accept(ServerSocketChannel channel, Selector selector) { this.channel = channel; this.selector = selector; } @Override public void run() { try { SocketChannel client = channel.accept(); new Handler(selector, client); } catch (Exception e) { e.printStackTrace(); } } } // 讀、寫事件處理器 static class Handler implements Runnable { private Selector selector; private SocketChannel socket; private SelectionKey selectionKey; private HandleState state; public Handler(Selector selector, SocketChannel socket) { this.selector = selector; this.socket = socket; this.state = HandleState.READING; try { this.socket.configureBlocking(false); this.selectionKey = this.socket.register(this.selector, 0); this.selectionKey.interestOps(SelectionKey.OP_READ); // 註冊讀事件 this.selectionKey.attach(this); // 當前類就是讀寫處理器 this.selector.wakeup(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { switch (state) { case READING: read(); break; case WRITING: write(); break; } } private void read() { StringBuffer sb = new StringBuffer(); ByteBuffer buf = ByteBuffer.allocate(1024); try { while (true) { buf.clear(); int read = socket.read(buf); sb.append(Charset.forName("utf-8").newDecoder().decode(buf.asReadOnlyBuffer()).toString()); if (read == 0) { logger.info("receive message: {}.....", sb.toString()); Thread.sleep(2000); // 模擬讀取處理數據邏輯比較耗時 selectionKey.interestOps(SelectionKey.OP_WRITE); // 註冊寫事件 state = HandleState.WRITING; break; } } } catch (Exception e) { e.printStackTrace(); } } private void write() { try { ByteBuffer output = ByteBuffer.wrap("Reactor server answer!".getBytes()); socket.write(output.duplicate()); } catch (IOException e) { e.printStackTrace(); } finally { selectionKey.cancel(); // 整個讀寫流程處理完,取消事件註冊,避免再次觸發。 } } private enum HandleState { READING, WRITING } } }
經過 Selector.open()方法建立一個 Selector 事件通知器,ServerSocketChannel 註冊到 Selector 上監聽 ACCEPT 事件,並綁定對應的 Accept 事件處理器。當鏈接創建後觸發對應事件,建立 SocketChannel 並再次註冊到同一個 Selector 上,並監聽讀寫事件,同時綁定對應的 Handler 事件處理器。等待 READ 事件觸發讀取數據完成業務處理後觸發寫事件完成寫數據。整個過程當中,全部事件都由同一個 Selector 觸發和處理。
運行單元測試,並行初始化 6 個 Client 鏈接到 Server 上,查看控制檯服務器
@Test public void test() throws InterruptedException { new Thread(() -> { // 服務端 new ReactorServer().init(); }).start(); Thread.sleep(1000); // 並行初始化多個Client,模擬併發效果 IntStream.range(1, 7).parallel().forEach(item -> { logger.info("Client No.{} init...", item); // 客戶端 Client.init(); }); }
從控制檯日誌中能夠看到,雖然 Client 是並行初始化鏈接到 Server 的,可是在 Server 端倒是同一個線程依次處理的,每次處理耗時 2ms。這就是單線程 Reactor 的特色,資源利用率不高,在高併發的狀況下效率會很是的低,甚至會由於某些處理邏輯耗時太長致使後面的鏈接被拒絕。網絡
爲了解決上面的問題,咱們能夠考慮將耗時的操做放在線程裏執行,這樣能夠避免 Selector 被阻塞。將 demo 中的 ReactorServer 改造一下多線程
// 省略部分代碼 static class Handler implements Runnable { private Selector selector; private SocketChannel socket; private SelectionKey selectionKey; private HandleState state; private ExecutorService pool; public Handler(Selector selector, SocketChannel socket) { this.selector = selector; this.socket = socket; this.state = HandleState.READING; this.pool = Executors.newFixedThreadPool(4); // 增長一個線程池 try { this.socket.configureBlocking(false); this.selectionKey = this.socket.register(this.selector, 0); this.selectionKey.interestOps(SelectionKey.OP_READ); this.selectionKey.attach(this); this.selector.wakeup(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { switch (state) { case READING: state = HandleState.WORKING; // 改變當前handler狀態 pool.execute(() -> { // 將耗時操做放在線程池中執行 read(); }); break; case WRITING: write(); break; } } private void read() { StringBuffer sb = new StringBuffer(); ByteBuffer buf = ByteBuffer.allocate(1024); try { while (true) { buf.clear(); int read = socket.read(buf); sb.append(Charset.forName("utf-8").newDecoder().decode(buf.asReadOnlyBuffer()).toString()); if (read == 0) { logger.info("receive message: {}.....", sb.toString()); Thread.sleep(2000); selectionKey.interestOps(SelectionKey.OP_WRITE); state = HandleState.WRITING; selector.wakeup(); // 喚醒Selector,讓當前阻塞的selector.select()方法返回 break; } } } catch (Exception e) { e.printStackTrace(); } } private void write() { try { ByteBuffer output = ByteBuffer.wrap("Reactor server answer!".getBytes()); socket.write(output.duplicate()); } catch (IOException e) { e.printStackTrace(); } finally { selectionKey.cancel(); } } private enum HandleState { WORKING, READING, WRITING } }
運行上面的單元測試,查看控制檯
與以前運行的結果不一樣,耗時的 Read 方法都使用線程池的線程執行,整個流程相比以前總耗時要小不少。併發
從前面的例子能夠看到,即便是使用了線程池,從頭到尾都是一個 Selector 在負責事件分發和處理。當在分發以前的邏輯存在耗時長的狀況時,會影響到其餘事件的觸發。這樣咱們能夠將其分紅兩部分:監聽並創建鏈接能夠由一個獨立的 Selector 負責,而讀寫和業務操做能夠由另一個 Selector 負責,而且前一個 Selector 將創建好的鏈接分派給後一個 Selector。繼續修改 demo,實現上述過程。app
package com.niklai.demo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ReactorServer { private static final Logger logger = LoggerFactory.getLogger(ReactorServer.class.getSimpleName()); private Selector mainSelector; // 主Selector private Selector[] subSelectors; // 從Selector private int next = 0; private int count = 2; private boolean loop = true; private ServerSocketChannel serverChannel; public ReactorServer() { try { subSelectors = new Selector[count]; // 初始化多個從Selector,複用 for (int i = 0; i < count; i++) { subSelectors[i] = Selector.open(); } mainSelector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } public void init() { try { serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress("localhost", 9999)); SelectionKey selectionKey = serverChannel.register(mainSelector, SelectionKey.OP_ACCEPT); selectionKey.attach((Runnable) () -> { try { SocketChannel client = serverChannel.accept(); new Handler(subSelectors[next], client); // 每創建一個鏈接,就選擇一個從Selector綁定 next++; if (next == count) { next = 0; } } catch (Exception e) { e.printStackTrace(); } }); // 每一個從Selector使用獨立的線程監聽事件,避免相互阻塞 for (int i = 0; i < count; i++) { int finalI = i; new Thread(() -> { new HandlerLoop(subSelectors[finalI]).run(); }).start(); } while (loop) { int select = mainSelector.select(); if (select != 0) { Set<SelectionKey> readKeys = mainSelector.selectedKeys(); Iterator<SelectionKey> iterator = readKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); Runnable runnable = (Runnable) key.attachment(); runnable.run(); } } } } catch (Exception e) { e.printStackTrace(); } } static class HandlerLoop implements Runnable { private Selector selector; public HandlerLoop(Selector selector) { this.selector = selector; } @Override public void run() { while (!Thread.interrupted()) { try { int select = selector.select(); if (select != 0) { Set<SelectionKey> readKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); Runnable runnable = (Runnable) key.attachment(); runnable.run(); iterator.remove(); } } } catch (IOException e) { e.printStackTrace(); } } } } static class Handler implements Runnable { // 不變,省略代碼 } }
運行上面的單元測試,查看控制檯
在這裏只有兩個從 Selector,卻同時處理了六個 Socket 鏈接,能夠達到使用較少的線程來處理大量的請求。
在上面的例子裏,每 accept 一個 SocketChannel,就會建立一個 handler,雖然多個 SocketChannel 共享了一個 Selector,可是每一個 SocketChannel 對應的 handler 裏都有一個獨立的線程池。在高併發下,會建立大量的線程池,消耗大量的資源。爲了解決這個問題,咱們能夠給每個 Selector 分配一個線程池,用同一個線程池的線程來處理同一個 Selector 關聯下的多個 SocketChannel。
public class ReactorServer { // 省略代碼 private ExecutorService[] pools; // 線程池數組 public ReactorServer() { try { subSelectors = new Selector[count]; pools = new ExecutorService[count]; // 初始化線程池組,數組大小與從Selector數組大小相同 for (int i = 0; i < count; i++) { subSelectors[i] = Selector.open(); pools[i] = Executors.newFixedThreadPool(10); // 初始化線程池 } mainSelector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } public void init() { try { // 省略代碼 selectionKey.attach((Runnable) () -> { try { SocketChannel client = serverChannel.accept(); new Handler(subSelectors[next], client, pools[next]); // 選擇一個線程池給Handler next++; if (next == count) { next = 0; } } catch (Exception e) { e.printStackTrace(); } }); // 省略代碼 } catch (Exception e) { e.printStackTrace(); } } static class Handler implements Runnable { // 省略代碼 public Handler(Selector selector, SocketChannel socket) { this(selector, socket, Executors.newFixedThreadPool(4)); } public Handler(Selector selector, SocketChannel socket, ExecutorService pool) { this.selector = selector; this.socket = socket; this.state = HandleState.READING; this.pool = pool; try { this.socket.configureBlocking(false); this.selectionKey = this.socket.register(this.selector, 0); this.selectionKey.interestOps(SelectionKey.OP_READ); this.selectionKey.attach(this); this.selector.wakeup(); } catch (IOException e) { e.printStackTrace(); } } // 省略代碼 } }