瞭解Netty的人多少都會知道Netty的高性能的一個緣由就是它是基於事件驅動的,而這一事件的原型就是Reactor模式。java
因此在學習EventLoop前,頗有必要先搞懂Reactor模式。react
本文目錄:git
傳統的服務器設計模式:github
先來簡單的介紹下傳統的服務器設計模式。設計模式
看從圖例瞭解:服務器
傳統的服務器設計模式是基於IO實現的。服務器在等待鏈接,及IO準備就緒前都會被阻塞。網絡
代碼示例以下:多線程
class Server implements Runnable { public void run() { try { ServerSocket ss = new ServerSocket(PORT); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); } catch (IOException ex) { /* ... */ } } static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run() { try { byte[] input = new byte[MAX_INPUT]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { /* ... */ } } private byte[] process(byte[] cmd) { /* ... */ } } }
傳統的服務器模式的優點在於實現簡便,相對於NIO的服務器,它的代碼量更少,更直接。但它最大的缺點就是IO阻塞致使運行效率低下。socket
Reactor模式:ide
Reactor模式是利用NIO的多路複用而設計的一種基於事件驅動的服務器模式。主要的設計目的是經過分而治之的思想讓服務器實現可擴容的目標。
Basic Reactor(單線程版本):
Basic Reactor是Reactor模式最基礎的版本,能夠說是定義了整個Reactor模式的大骨架,其餘複雜的版本也是在此基礎上演變而來。
深刻了解Basic Reactor是掌握Reactor模式的基本,所以咱們會用最多的內容去理解Basic Reactor。
不管是Reactor模式的哪些變化,基本上都離不開下列三種角色:
Reactor(反應堆):服務器啓動的主入口
Acceptor(接收器):主要負責處理IO鏈接事件
Handler(處理器):負責處理IO讀寫以及業務邏輯處理等
先結合圖例來了解下Reactor:
圖中已經明顯畫出了Reactor和Acceptor的角色,而未畫出的Handler部分就是黃色圓圈的部分(read,decode, compute, encode, send 構成了一個Handler的基本職能)
在經過代碼來分析下:
1 package com.insaneXs.netty.reactor.basic; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.ServerSocketChannel; 9 import java.nio.channels.SocketChannel; 10 import java.util.Iterator; 11 import java.util.Set; 12 13 /** 14 * @Author: insaneXs 15 * @Description: 16 * @Date: Create at 2018-12-19 17 */ 18 public class Reactor implements Runnable{ 19 20 final Selector selector; 21 22 final ServerSocketChannel serverSocket; 23 24 Reactor(int port) throws Exception{ 25 26 //建立ServerSocketChannel,綁定端口,設置爲非阻塞,選擇器上註冊ACCEPT事件 27 selector = Selector.open(); 28 serverSocket = ServerSocketChannel.open(); 29 30 serverSocket.bind(new InetSocketAddress(port)); 31 serverSocket.configureBlocking(false); 32 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); 33 34 sk.attach(new Acceptor()); 35 } 36 37 @Override 38 public void run() { 39 try { 40 while (!Thread.interrupted()) { 41 //阻塞,直到註冊的事件發生 42 selector.select(); 43 Set selected = selector.selectedKeys(); 44 Iterator it = selected.iterator(); 45 while (it.hasNext()){ 46 //任務派發 47 dispatch((SelectionKey)(it.next())); 48 } 49 selected.clear(); 50 } 51 } catch (IOException ex) { 52 ex.printStackTrace(); 53 } 54 55 } 56 57 void dispatch(SelectionKey k) { 58 //經過將不一樣的附件綁定到SelectionKey上,實現dispatch統一派發Acceptor和Handler的邏輯 59 Runnable r = (Runnable)(k.attachment()); 60 if (r != null) 61 r.run(); 62 } 63 64 class Acceptor implements Runnable{ 65 @Override 66 public void run() { 67 try { 68 //ACCEPT負責接收連接 69 SocketChannel sc = serverSocket.accept(); 70 if(sc != null) 71 new Handler(selector, sc); 72 } catch (IOException e) { 73 e.printStackTrace(); 74 } 75 } 76 } 77 78 class Handler implements Runnable{ 79 final SocketChannel socket; 80 81 final SelectionKey sk; 82 83 ByteBuffer input = ByteBuffer.allocate(1024); 84 ByteBuffer output = ByteBuffer.allocate(1024); 85 86 static final int READING = 0, SENDING = 1; 87 int state = READING; 88 89 Handler(Selector sel, SocketChannel c) throws IOException{ 90 socket = c; 91 c.configureBlocking(false); 92 // Optionally try first read now 93 //返回了新的SelectionKey,將Handler添加爲SelectionKey的附件,先註冊READ事件 94 sk = socket.register(sel, 0); 95 sk.attach(this); 96 sk.interestOps(SelectionKey.OP_READ); 97 sel.wakeup(); 98 } 99 100 boolean inputIsComplete() { 101 return true; 102 } 103 boolean outputIsComplete() { 104 return true; 105 } 106 void process() { 107 //DO SOME THING 108 } 109 110 @Override 111 public void run() { 112 try { 113 if (state == READING) read(); 114 else if (state == SENDING) send(); 115 } catch (IOException ex) { 116 ex.printStackTrace(); 117 } 118 } 119 120 void read() throws IOException { 121 socket.read(input); 122 if (inputIsComplete()) { 123 process(); 124 state = SENDING; 125 // Normally also do first write now 126 sk.interestOps(SelectionKey.OP_WRITE); 127 } 128 } 129 130 void send() throws IOException { 131 socket.write(output); 132 if (outputIsComplete()) sk.cancel(); 133 } 134 135 } 136 }
瞭解完Reactor中的角色分工,再看代碼其實並不複雜。代碼關鍵的部分也都加上了註釋。
每一個角色的業務處理邏輯都是以run方法爲入口,
Reactor中run方法處理的主要邏輯就是監聽NIO的多路複用,並經過dispatch方法分發任務。
Acceptor中run方法處理的主要邏輯就是接收鏈接,併爲處理讀寫作準備。
Handler中run方法處理的主要邏輯就是讀寫和業務邏輯的處理。
有幾點值得注意的:
第一,這段代碼最關鍵的地方就是在Reactor進行任務分發時,利用SelectionKey的Attach添加附件的方法實現了用同一入口分發給Acceptor和Handler(這是設計的比較巧妙的部分)。
第二,不管是哪一個角色都實現了Runnable,這也保證了即便是其餘多線程版本,只須要修改部分代碼,而不用動整個Reactor模式的骨架。
第三,咱們能夠看到上面的代碼都是直接調用run方法,而不是經過Thread.start方法來運行,說明Basic Reactor的處理過程確實是單線程下的。
另外提到一點就是Handler的構造函數中先是register的0,而後再設置SelectionKey的interestOps爲OP_READ。這點在以前的Netty源碼分析中,咱們也瞭解到,Netty正是這樣的過程。
將代碼轉換成時序圖,加深對代碼的印象:
Basic Reactor優勢與不足:
優勢:利用了NIO的特性,能夠僅用一條線程處理多個通道的鏈接處理。相較於傳統的服務器模式,這樣對資源的消耗更少。
不足:咱們能夠看到不只IO的部分由Reactor的線程處理,連業務處理的邏輯一樣是放在Reactor的線程中處理,這樣可能就會致使Reactor線程積累愈來愈多的請求,致使效率降低。
MultiThreads版本的Reactor模型,正是爲了解決上述的問題。
一樣先經過圖例來了解這個模式下,各個角色的關係:
這個圖和Basic Reactor的區別是什麼?咱們又該如何理解呢?
咱們能夠看到以前的Handler處理的角色被一分爲二,read,send(也就是IO的讀寫)和Basic Reactor中的模式不變,可是decode,compute,encode(也就是業務處理的邏輯)被拆出來,提交給ThreadPool運行。
新的Reactor模式對比Basic Reactor,其餘代碼不變,只是咱們修改了Handler,增長了一個新的角色,叫作Processor,做爲負責處理業務邏輯的單元:
1 public class ThreadPooledHandler implements Runnable{ 2 final SocketChannel socket; 3 final SelectionKey sk; 4 ByteBuffer input = ByteBuffer.allocate(1024); 5 ByteBuffer output = ByteBuffer.allocate(1024); 6 static final int READING = 0, SENDING = 1; 7 static final int PROCESSING = 3; 8 int state = READING; 9 10 // uses util.concurrent thread pool 11 static ExecutorService pool = Executors.newFixedThreadPool(4); 12 13 ThreadPooledHandler(Selector sel, SocketChannel c) throws IOException { 14 socket = c; 15 c.configureBlocking(false); 16 // Optionally try first read now 17 //返回了新的SelectionKey,將Handler添加爲SelectionKey的附件,先註冊READ事件 18 sk = socket.register(sel, 0); 19 sk.attach(this); 20 sk.interestOps(SelectionKey.OP_READ); 21 sel.wakeup(); 22 } 23 24 boolean inputIsComplete() { 25 return true; 26 } 27 boolean outputIsComplete() { 28 return true; 29 } 30 31 void process() { 32 //DO SOME THING 33 } 34 35 @Override 36 public void run() { 37 try { 38 if (state == READING) read(); 39 else if (state == SENDING) send(); 40 } catch (IOException ex) { 41 ex.printStackTrace(); 42 } 43 } 44 45 synchronized void read() throws IOException { // ... 46 socket.read(input); 47 if (inputIsComplete()) { 48 state = PROCESSING; 49 pool.execute(new Processer()); 50 } 51 } 52 53 void send() throws IOException { 54 socket.write(output); 55 if (outputIsComplete()) sk.cancel(); 56 } 57 synchronized void processAndHandOff() { 58 process(); 59 state = SENDING; // or rebind attachment 60 sk.interestOps(SelectionKey.OP_WRITE); 61 } 62 63 //增長Processer角色,處理業務邏輯 64 class Processer implements Runnable { 65 public void run() { processAndHandOff(); } 66 } 67 68 }
爲了方便看出變化,我將兩個版本的代碼放在一塊兒,作了對比圖:
最大的區別就是原先Handler中process方法被交給了Processer執行,而且在執行時,是提交給線程池去執行。而Handler負責的IO讀寫邏輯仍然在Reactor的線程中執行(只是非網絡IO的業務邏輯部分在新的線程中執行)。
相對於BasicReactor,這個版本的Reactor能更好的利用現代多核CPU的性能。讓一條線程負責處理IO,而其餘線程執行業務邏輯。多路複用上監聽的阻塞,並不會阻塞業務邏輯的執行。
主從複合的Reactor模型
多線程的Reactor模型處理能力已經很是的高效,可是IO的鏈接過程仍然多是個耗時的過程(好比SSL認證)。所以引出了一個新的變化——主從複合的Reactor模型。
先看圖例:
和上一個版本比較,這個版本的Reactor區別主要是將Reactor拆分一個MainReactor(負責處理Accept事件)和多個SubReactor(負責處理IO讀寫事件)。
而MainReactor和SubReactor的關聯只要是經過Acceptor。
咱們知道Reactor和Selector的關係是一對一的關係。一般一個Reactor由一條獨立的線程執行。該線程在Reactor關聯的Selector是監聽事件。
所以這個模式下,當Accept在爲鏈接進來的SocketChannel綁定Selector時,再也不是綁定到MainReactor對應的Selector中,而是綁定到其餘Reactor對應的Selector上(對應其餘線程)。
這也所以讓MainReactor只負責執行ACCEPT,而SubReactor負責IO讀寫。也使得ACCEPT上費時的操做將不會影響IO讀寫和業務邏輯處理。
貼上代碼:
增長SubReactor:
1 public class SubReactor implements Runnable{ 2 private final Selector selector; 3 4 public SubReactor() throws IOException { 5 selector = Selector.open(); 6 } 7 8 @Override 9 public void run() { 10 while(!Thread.interrupted()){ 11 try { 12 selector.select(); 13 Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 14 while(iter.hasNext()){ 15 SelectionKey sk = iter.next(); 16 ((Runnable)sk.attachment()).run(); 17 } 18 } catch (IOException e) { 19 e.printStackTrace(); 20 } 21 } 22 } 23 24 public Selector getSelector(){ 25 return selector; 26 } 27 }
SubReactor的代碼和Basic Reactor中Reactor的代碼很類似,由於不處理鏈接部分,因此沒有ServerSocketChannel和綁定監聽端口的操做。
接下來看MainReactor和Acceptor的代碼:
1 package com.insaneXs.netty.reactor.multiple; 2 3 import com.insaneXs.netty.reactor.threadpooled.ThreadPooledHandler; 4 5 import java.io.IOException; 6 import java.net.InetSocketAddress; 7 import java.nio.channels.SelectionKey; 8 import java.nio.channels.Selector; 9 import java.nio.channels.ServerSocketChannel; 10 import java.nio.channels.SocketChannel; 11 import java.util.Iterator; 12 import java.util.Set; 13 14 /** 15 * @Author: insaneXs 16 * @Description: 17 * @Date: Create at 2018-12-21 18 */ 19 public class MainReactor implements Runnable{ 20 final Selector selector; 21 22 final ServerSocketChannel serverSocket; 23 24 private final static int SUB_REACTOR_COUNT = 3; 25 26 private final Selector[] selectors = new Selector[SUB_REACTOR_COUNT]; 27 28 MainReactor(int port) throws Exception{ 29 30 //建立ServerSocketChannel,綁定端口,設置爲非阻塞,選擇器上註冊ACCEPT事件 31 selector = Selector.open(); 32 serverSocket = ServerSocketChannel.open(); 33 34 serverSocket.bind(new InetSocketAddress(port)); 35 serverSocket.configureBlocking(false); 36 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); 37 38 for(int i=0; i<selectors.length; i++){ 39 40 //建立SUB-REACTOR,並保存對應的Selector對象 41 SubReactor subReactor = new SubReactor(); 42 selectors[i] = subReactor.getSelector(); 43 //爲SUB-REACTOR啓動獨立的線程 44 new Thread(subReactor).start(); 45 } 46 47 sk.attach(new Acceptor()); 48 } 49 50 @Override 51 public void run() { 52 try { 53 while (!Thread.interrupted()) { 54 //阻塞,直到註冊的事件發生 55 selector.select(); 56 Set selected = selector.selectedKeys(); 57 Iterator it = selected.iterator(); 58 while (it.hasNext()){ 59 //任務派發 60 dispatch((SelectionKey)(it.next())); 61 } 62 selected.clear(); 63 } 64 } catch (IOException ex) { 65 ex.printStackTrace(); 66 } 67 68 } 69 70 void dispatch(SelectionKey k) { 71 //經過將不一樣的附件綁定到SelectionKey上,實現dispatch統一派發Acceptor和Handler的邏輯 72 Runnable r = (Runnable)(k.attachment()); 73 if (r != null) 74 r.run(); 75 } 76 77 class Acceptor implements Runnable{ 78 private int idx = 0; 79 @Override 80 public void run() { 81 try { 82 //ACCEPT負責接收連接 83 SocketChannel sc = serverSocket.accept(); 84 if(sc != null)//將SocketChannel與SubReactor的Selector均勻綁定 85 new ThreadPooledHandler(selectors[idx], sc); 86 87 idx++; 88 if(idx == SUB_REACTOR_COUNT) 89 idx = 0; 90 } catch (IOException e) { 91 e.printStackTrace(); 92 } 93 } 94 } 95 96 97 98 }
作個對比圖,比較下和以前的版本的差別:
MainReactor:
區別主要在MainReactor內部保存了一些SubReactor,在MainReactor被建立時,同時建立了幾個SubReactor。而且建立線程獨立的運行SubReactor。
再看看Acceptor:
兩者Acceptor的區別就是當把Handler提交給線程池時,非主從複合結構的版本仍然是用一個Selector。而主從複合結構的Handler在處理時,用的多路複用器是SubReactor中的。所以分離出了ACCEPT和IO讀寫。
本文參考:Scalable IO in Java
本文代碼:Github