Netty(七):EventLoop學習前導——Reactor模式

瞭解Netty的人多少都會知道Netty的高性能的一個緣由就是它是基於事件驅動的,而這一事件的原型就是Reactor模式。java

因此在學習EventLoop前,頗有必要先搞懂Reactor模式。react

本文目錄:git

  • 傳統的服務器設計
  • Basic Reactor(單線程模式)
  • MultiThreadReactor(多線程模式)
  • 主從多線程模型

 

 

 

傳統的服務器設計模式:github

  先來簡單的介紹下傳統的服務器設計模式。設計模式

  看從圖例瞭解:服務器

  

  傳統的服務器設計模式是基於IO實現的。服務器在等待鏈接,及IO準備就緒前都會被阻塞。網絡

  代碼示例以下:多線程

  

class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
            new Thread(new Handler(ss.accept())).start();      
        } catch (IOException ex) { /* ... */ }
    }
 
    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }       
        private byte[] process(byte[] cmd) { /* ... */ }
    }
}

 

 傳統的服務器模式的優點在於實現簡便,相對於NIO的服務器,它的代碼量更少,更直接。但它最大的缺點就是IO阻塞致使運行效率低下。socket

 

 Reactor模式:ide

  Reactor模式是利用NIO的多路複用而設計的一種基於事件驅動的服務器模式。主要的設計目的是經過分而治之的思想讓服務器實現可擴容的目標。

  

  Basic Reactor(單線程版本):

  Basic Reactor是Reactor模式最基礎的版本,能夠說是定義了整個Reactor模式的大骨架,其餘複雜的版本也是在此基礎上演變而來。

  深刻了解Basic Reactor是掌握Reactor模式的基本,所以咱們會用最多的內容去理解Basic Reactor。

  

  不管是Reactor模式的哪些變化,基本上都離不開下列三種角色:

  Reactor(反應堆):服務器啓動的主入口

  Acceptor(接收器):主要負責處理IO鏈接事件

  Handler(處理器):負責處理IO讀寫以及業務邏輯處理等

 

  先結合圖例來了解下Reactor:

  

  圖中已經明顯畫出了Reactor和Acceptor的角色,而未畫出的Handler部分就是黃色圓圈的部分(read,decode, compute, encode, send 構成了一個Handler的基本職能)

 

  在經過代碼來分析下:

  

  1 package com.insaneXs.netty.reactor.basic;
  2 
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.ServerSocketChannel;
  9 import java.nio.channels.SocketChannel;
 10 import java.util.Iterator;
 11 import java.util.Set;
 12 
 13 /**
 14  * @Author: insaneXs
 15  * @Description:
 16  * @Date: Create at 2018-12-19
 17  */
 18 public class Reactor implements Runnable{
 19 
 20     final Selector selector;
 21 
 22     final ServerSocketChannel serverSocket;
 23 
 24     Reactor(int port) throws Exception{
 25 
 26         //建立ServerSocketChannel,綁定端口,設置爲非阻塞,選擇器上註冊ACCEPT事件
 27         selector = Selector.open();
 28         serverSocket = ServerSocketChannel.open();
 29 
 30         serverSocket.bind(new InetSocketAddress(port));
 31         serverSocket.configureBlocking(false);
 32         SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
 33 
 34         sk.attach(new Acceptor());
 35     }
 36 
 37     @Override
 38     public void run() {
 39         try {
 40             while (!Thread.interrupted()) {
 41                 //阻塞,直到註冊的事件發生
 42                 selector.select();
 43                 Set selected = selector.selectedKeys();
 44                 Iterator it = selected.iterator();
 45                 while (it.hasNext()){
 46                     //任務派發
 47                     dispatch((SelectionKey)(it.next()));
 48                 }
 49                 selected.clear();
 50             }
 51         } catch (IOException ex) {
 52             ex.printStackTrace();
 53         }
 54 
 55     }
 56 
 57     void dispatch(SelectionKey k) {
 58         //經過將不一樣的附件綁定到SelectionKey上,實現dispatch統一派發Acceptor和Handler的邏輯
 59         Runnable r = (Runnable)(k.attachment());
 60         if (r != null)
 61             r.run();
 62     }
 63 
 64     class Acceptor implements Runnable{
 65         @Override
 66         public void run() {
 67             try {
 68                 //ACCEPT負責接收連接
 69                 SocketChannel sc = serverSocket.accept();
 70                 if(sc != null)
 71                     new Handler(selector, sc);
 72             } catch (IOException e) {
 73                 e.printStackTrace();
 74             }
 75         }
 76     }
 77 
 78     class Handler implements Runnable{
 79         final SocketChannel socket;
 80 
 81         final SelectionKey sk;
 82 
 83         ByteBuffer input = ByteBuffer.allocate(1024);
 84         ByteBuffer output = ByteBuffer.allocate(1024);
 85 
 86         static final int READING = 0, SENDING = 1;
 87         int state = READING;
 88 
 89         Handler(Selector sel, SocketChannel c) throws IOException{
 90             socket = c;
 91             c.configureBlocking(false);
 92             // Optionally try first read now
 93             //返回了新的SelectionKey,將Handler添加爲SelectionKey的附件,先註冊READ事件
 94             sk = socket.register(sel, 0);
 95             sk.attach(this);
 96             sk.interestOps(SelectionKey.OP_READ);
 97             sel.wakeup();
 98         }
 99 
100         boolean inputIsComplete() {
101             return true;
102         }
103         boolean outputIsComplete() {
104             return true;
105         }
106         void process() {
107             //DO SOME THING
108         }
109 
110         @Override
111         public void run() {
112             try {
113                 if (state == READING) read();
114                 else if (state == SENDING) send();
115             } catch (IOException ex) {
116                 ex.printStackTrace();
117             }
118         }
119 
120         void read() throws IOException {
121             socket.read(input);
122             if (inputIsComplete()) {
123                 process();
124                 state = SENDING;
125             // Normally also do first write now
126                 sk.interestOps(SelectionKey.OP_WRITE);
127             }
128         }
129 
130         void send() throws IOException {
131             socket.write(output);
132             if (outputIsComplete()) sk.cancel();
133         }
134 
135     }
136 }

 

  瞭解完Reactor中的角色分工,再看代碼其實並不複雜。代碼關鍵的部分也都加上了註釋。

  每一個角色的業務處理邏輯都是以run方法爲入口,

  Reactor中run方法處理的主要邏輯就是監聽NIO的多路複用,並經過dispatch方法分發任務。

  Acceptor中run方法處理的主要邏輯就是接收鏈接,併爲處理讀寫作準備。

  Handler中run方法處理的主要邏輯就是讀寫和業務邏輯的處理。

  

  有幾點值得注意的:

  第一,這段代碼最關鍵的地方就是在Reactor進行任務分發時,利用SelectionKey的Attach添加附件的方法實現了用同一入口分發給Acceptor和Handler(這是設計的比較巧妙的部分)。

  第二,不管是哪一個角色都實現了Runnable,這也保證了即便是其餘多線程版本,只須要修改部分代碼,而不用動整個Reactor模式的骨架。

  第三,咱們能夠看到上面的代碼都是直接調用run方法,而不是經過Thread.start方法來運行,說明Basic Reactor的處理過程確實是單線程下的。

  另外提到一點就是Handler的構造函數中先是register的0,而後再設置SelectionKey的interestOps爲OP_READ。這點在以前的Netty源碼分析中,咱們也瞭解到,Netty正是這樣的過程。  

 

  將代碼轉換成時序圖,加深對代碼的印象:

  

 

 

  Basic Reactor優勢與不足:

  優勢:利用了NIO的特性,能夠僅用一條線程處理多個通道的鏈接處理。相較於傳統的服務器模式,這樣對資源的消耗更少。

  不足:咱們能夠看到不只IO的部分由Reactor的線程處理,連業務處理的邏輯一樣是放在Reactor的線程中處理,這樣可能就會致使Reactor線程積累愈來愈多的請求,致使效率降低。

  

  MultiThreads版本的Reactor模型,正是爲了解決上述的問題。

  一樣先經過圖例來了解這個模式下,各個角色的關係:

  

  

  這個圖和Basic Reactor的區別是什麼?咱們又該如何理解呢?

  咱們能夠看到以前的Handler處理的角色被一分爲二,read,send(也就是IO的讀寫)和Basic Reactor中的模式不變,可是decode,compute,encode(也就是業務處理的邏輯)被拆出來,提交給ThreadPool運行。

  

  新的Reactor模式對比Basic Reactor,其餘代碼不變,只是咱們修改了Handler,增長了一個新的角色,叫作Processor,做爲負責處理業務邏輯的單元:

  

 1 public class ThreadPooledHandler implements Runnable{
 2     final SocketChannel socket;
 3     final SelectionKey sk;
 4     ByteBuffer input = ByteBuffer.allocate(1024);
 5     ByteBuffer output = ByteBuffer.allocate(1024);
 6     static final int READING = 0, SENDING = 1;
 7     static final int PROCESSING = 3;
 8     int state = READING;
 9 
10     // uses util.concurrent thread pool
11     static ExecutorService pool = Executors.newFixedThreadPool(4);
12 
13     ThreadPooledHandler(Selector sel, SocketChannel c) throws IOException {
14         socket = c;
15         c.configureBlocking(false);
16         // Optionally try first read now
17         //返回了新的SelectionKey,將Handler添加爲SelectionKey的附件,先註冊READ事件
18         sk = socket.register(sel, 0);
19         sk.attach(this);
20         sk.interestOps(SelectionKey.OP_READ);
21         sel.wakeup();
22     }
23 
24     boolean inputIsComplete() {
25         return true;
26     }
27     boolean outputIsComplete() {
28         return true;
29     }
30 
31     void process() {
32         //DO SOME THING
33     }
34 
35     @Override
36     public void run() {
37         try {
38             if (state == READING) read();
39             else if (state == SENDING) send();
40         } catch (IOException ex) {
41             ex.printStackTrace();
42         }
43     }
44 
45     synchronized void read() throws IOException { // ...
46         socket.read(input);
47         if (inputIsComplete()) {
48             state = PROCESSING;
49             pool.execute(new Processer());
50         }
51     }
52 
53     void send() throws IOException {
54         socket.write(output);
55         if (outputIsComplete()) sk.cancel();
56     }
57     synchronized void processAndHandOff() {
58         process();
59         state = SENDING; // or rebind attachment
60         sk.interestOps(SelectionKey.OP_WRITE);
61     }
62 
63     //增長Processer角色,處理業務邏輯
64     class Processer implements Runnable {
65         public void run() { processAndHandOff(); }
66     }
67 
68 }

 

爲了方便看出變化,我將兩個版本的代碼放在一塊兒,作了對比圖:

 

最大的區別就是原先Handler中process方法被交給了Processer執行,而且在執行時,是提交給線程池去執行。而Handler負責的IO讀寫邏輯仍然在Reactor的線程中執行(只是非網絡IO的業務邏輯部分在新的線程中執行)。

相對於BasicReactor,這個版本的Reactor能更好的利用現代多核CPU的性能。讓一條線程負責處理IO,而其餘線程執行業務邏輯。多路複用上監聽的阻塞,並不會阻塞業務邏輯的執行。

 

主從複合的Reactor模型

多線程的Reactor模型處理能力已經很是的高效,可是IO的鏈接過程仍然多是個耗時的過程(好比SSL認證)。所以引出了一個新的變化——主從複合的Reactor模型。

先看圖例:

 

 

 和上一個版本比較,這個版本的Reactor區別主要是將Reactor拆分一個MainReactor(負責處理Accept事件)和多個SubReactor(負責處理IO讀寫事件)。

而MainReactor和SubReactor的關聯只要是經過Acceptor。

咱們知道Reactor和Selector的關係是一對一的關係。一般一個Reactor由一條獨立的線程執行。該線程在Reactor關聯的Selector是監聽事件。

所以這個模式下,當Accept在爲鏈接進來的SocketChannel綁定Selector時,再也不是綁定到MainReactor對應的Selector中,而是綁定到其餘Reactor對應的Selector上(對應其餘線程)。

這也所以讓MainReactor只負責執行ACCEPT,而SubReactor負責IO讀寫。也使得ACCEPT上費時的操做將不會影響IO讀寫和業務邏輯處理。

 

貼上代碼:

增長SubReactor:

 1 public class SubReactor implements Runnable{
 2     private final Selector selector;
 3 
 4     public SubReactor() throws IOException {
 5         selector = Selector.open();
 6     }
 7 
 8     @Override
 9     public void run() {
10         while(!Thread.interrupted()){
11             try {
12                 selector.select();
13                 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
14                 while(iter.hasNext()){
15                     SelectionKey sk = iter.next();
16                     ((Runnable)sk.attachment()).run();
17                 }
18             } catch (IOException e) {
19                 e.printStackTrace();
20             }
21         }
22     }
23 
24     public Selector getSelector(){
25         return selector;
26     }
27 }

SubReactor的代碼和Basic Reactor中Reactor的代碼很類似,由於不處理鏈接部分,因此沒有ServerSocketChannel和綁定監聽端口的操做。

 

接下來看MainReactor和Acceptor的代碼:

 1 package com.insaneXs.netty.reactor.multiple;
 2 
 3 import com.insaneXs.netty.reactor.threadpooled.ThreadPooledHandler;
 4 
 5 import java.io.IOException;
 6 import java.net.InetSocketAddress;
 7 import java.nio.channels.SelectionKey;
 8 import java.nio.channels.Selector;
 9 import java.nio.channels.ServerSocketChannel;
10 import java.nio.channels.SocketChannel;
11 import java.util.Iterator;
12 import java.util.Set;
13 
14 /**
15  * @Author: insaneXs
16  * @Description:
17  * @Date: Create at 2018-12-21
18  */
19 public class MainReactor implements Runnable{
20     final Selector selector;
21 
22     final ServerSocketChannel serverSocket;
23 
24     private final static int SUB_REACTOR_COUNT = 3;
25 
26     private final Selector[] selectors = new Selector[SUB_REACTOR_COUNT];
27 
28     MainReactor(int port) throws Exception{
29 
30         //建立ServerSocketChannel,綁定端口,設置爲非阻塞,選擇器上註冊ACCEPT事件
31         selector = Selector.open();
32         serverSocket = ServerSocketChannel.open();
33 
34         serverSocket.bind(new InetSocketAddress(port));
35         serverSocket.configureBlocking(false);
36         SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
37 
38         for(int i=0; i<selectors.length; i++){
39 
40             //建立SUB-REACTOR,並保存對應的Selector對象
41             SubReactor subReactor = new SubReactor();
42             selectors[i] = subReactor.getSelector();
43             //爲SUB-REACTOR啓動獨立的線程
44             new Thread(subReactor).start();
45         }
46 
47         sk.attach(new Acceptor());
48     }
49 
50     @Override
51     public void run() {
52         try {
53             while (!Thread.interrupted()) {
54                 //阻塞,直到註冊的事件發生
55                 selector.select();
56                 Set selected = selector.selectedKeys();
57                 Iterator it = selected.iterator();
58                 while (it.hasNext()){
59                     //任務派發
60                     dispatch((SelectionKey)(it.next()));
61                 }
62                 selected.clear();
63             }
64         } catch (IOException ex) {
65             ex.printStackTrace();
66         }
67 
68     }
69 
70     void dispatch(SelectionKey k) {
71         //經過將不一樣的附件綁定到SelectionKey上,實現dispatch統一派發Acceptor和Handler的邏輯
72         Runnable r = (Runnable)(k.attachment());
73         if (r != null)
74             r.run();
75     }
76 
77     class Acceptor implements Runnable{
78         private int idx = 0;
79         @Override
80         public void run() {
81             try {
82                 //ACCEPT負責接收連接
83                 SocketChannel sc = serverSocket.accept();
84                 if(sc != null)//將SocketChannel與SubReactor的Selector均勻綁定
85                     new ThreadPooledHandler(selectors[idx], sc);
86 
87                 idx++;
88                 if(idx == SUB_REACTOR_COUNT)
89                     idx = 0;
90             } catch (IOException e) {
91                 e.printStackTrace();
92             }
93         }
94     }
95 
96 
97 
98 }

 

作個對比圖,比較下和以前的版本的差別:

MainReactor:

區別主要在MainReactor內部保存了一些SubReactor,在MainReactor被建立時,同時建立了幾個SubReactor。而且建立線程獨立的運行SubReactor。

 

再看看Acceptor:

兩者Acceptor的區別就是當把Handler提交給線程池時,非主從複合結構的版本仍然是用一個Selector。而主從複合結構的Handler在處理時,用的多路複用器是SubReactor中的。所以分離出了ACCEPT和IO讀寫。

 

 

本文參考:Scalable IO in Java

本文代碼:Github

相關文章
相關標籤/搜索