Reactor模式

 

瘋狂創客圈,一個Java 高併發研習社羣博客園 總入口html

瘋狂創客圈,傾力推出:面試必備 + 面試必備 + 面試必備 的基礎原理+實戰 書籍 《Netty Zookeeper Redis 高併發實戰java

 


寫在前面 

 

 你們好,我是 高併發的實戰社羣【瘋狂創客圈】尼恩。Reactor模式很是重要,不管開發、仍是面試。react

本文的內容,在《Netty Zookeeper Redis 高併發實戰》一書時,進行內容的完善和更新,而且進行的源碼的升級。 博客和書不同,書更加層層升入、井井有條,請你們以書的內容爲準。 具體請參考書的第四章 —— 鼎鼎大名的Reactor反應器模式 。面試

 

基礎篇:netty源碼  死磕3-編程

傳說中神同樣的Reactor反應器模式設計模式

本文目錄

1. 爲何是Reactor模式
2. Reactor模式簡介
3. 多線程IO的致命缺陷
4. 單線程Reactor模型
4.1. 什麼是單線程Reactor呢?
4.2. 單線程Reactor的參考代碼
4.3. 單線程模式的缺點:
5. 多線程的Reactor
5.1. 基於線程池的改進
5.2. 改進後的完整示意圖
5.3. 多線程Reactor的參考代碼
6. Reactor持續改進
7. Reactor編程的優勢和缺點
7.1. 優勢
7.2. 缺點tomcat

 

1. 爲何是Reactor模式

寫多了代碼的兄弟們都知道,JAVA代碼因爲處處面向接口及高度抽象,用到繼承多態和設計模式,程序的組織不是按照正常的理解順序來的,對代碼跟蹤非常個問題。因此,在閱讀別人的源碼時,若是不瞭解代碼的組織方式,每每是暈頭轉向,不知在何處。尤爲是閱讀經典代碼的時候,更是如此。服務器

反過來,若是先了解代碼的設計模式,再來去代碼,就會閱讀的很輕鬆,不會那麼難懂。網絡

像netty這樣的精品中的極品,確定也是須要先從設計模式入手的。netty的總體架構,基於了一個著名的模式——Reactor模式。Reactor模式,是高性能網絡編程的必知必會模式。多線程

首先熟悉Reactor模式,必定是磨刀不誤砍柴工。

2. Reactor模式簡介

Netty是典型的Reactor模型結構,關於Reactor的詳盡闡釋,本文站在巨人的肩膀上,藉助 Doug Lea(就是那位讓人無限景仰的大爺)的「Scalable IO in Java」中講述的Reactor模式。

「Scalable IO in Java」的地址是:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

Reactor模式也叫反應器模式,大多數IO相關組件如Netty、Redis在使用的IO模式,爲何須要這種模式,它是如何設計來解決高性能併發的呢?

3. 多線程IO的致命缺陷

最最原始的網絡編程思路就是服務器用一個while循環,不斷監聽端口是否有新的套接字鏈接,若是有,那麼就調用一個處理函數處理,相似:

while(true){

socket = accept();

handle(socket)

}

這種方法的最大問題是沒法併發,效率過低,若是當前的請求沒有處理完,那麼後面的請求只能被阻塞,服務器的吞吐量過低。

以後,想到了使用多線程,也就是很經典的connection per thread,每個鏈接用一個線程處理,相似:

package com.crazymakercircle.iodemo.base;

import com.crazymakercircle.config.SystemConfig;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

class BasicModel implements Runnable {
    public void run() {
        try {
            ServerSocket ss =
                    new ServerSocket(SystemConfig.SOCKET_SERVER_PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
            //建立新線程來handle
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[SystemConfig.INPUT_SIZE];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] input) {
            byte[] output=null;
            /* ... */
            return output;
        }
    }
}

對於每個請求都分發給一個線程,每一個線程中都獨自處理上面的流程。

tomcat服務器的早期版本確實是這樣實現的。

多線程併發模式,一個鏈接一個線程的優勢是:

必定程度上極大地提升了服務器的吞吐量,由於以前的請求在read阻塞之後,不會影響到後續的請求,由於他們在不一樣的線程中。這也是爲何一般會講「一個線程只能對應一個socket」的緣由。另外有個問題,若是一個線程中對應多個socket鏈接不行嗎?語法上確實能夠,可是實際上沒有用,每個socket都是阻塞的,因此在一個線程裏只能處理一個socket,就算accept了多個也沒用,前一個socket被阻塞了,後面的是沒法被執行到的。

多線程併發模式,一個鏈接一個線程的缺點是:

缺點在於資源要求過高,系統中建立線程是須要比較高的系統資源的,若是鏈接數過高,系統沒法承受,並且,線程的反覆建立-銷燬也須要代價。

改進方法是:

採用基於事件驅動的設計,當有事件觸發時,纔會調用處理器進行數據處理。使用Reactor模式,對線程的數量進行控制,一個線程處理大量的事件。

4. 單線程Reactor模型

 

Reactor模型的樸素原型

Java的NIO模式的Selector網絡通信,其實就是一個簡單的Reactor模型。能夠說是Reactor模型的樸素原型。

 static class Server
    {

        public static void testServer() throws IOException
        {

            // 一、獲取Selector選擇器
            Selector selector = Selector.open();

            // 二、獲取通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 3.設置爲非阻塞
            serverSocketChannel.configureBlocking(false);
            // 四、綁定鏈接
            serverSocketChannel.bind(new InetSocketAddress(SystemConfig.SOCKET_SERVER_PORT));

            // 五、將通道註冊到選擇器上,並註冊的操做爲:「接收」操做
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            // 六、採用輪詢的方式,查詢獲取「準備就緒」的註冊過的操做
            while (selector.select() > 0)
            {
                // 七、獲取當前選擇器中全部註冊的選擇鍵(「已經準備就緒的操做」)
                Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
                while (selectedKeys.hasNext())
                {
                    // 八、獲取「準備就緒」的時間
                    SelectionKey selectedKey = selectedKeys.next();

                    // 九、判斷key是具體的什麼事件
                    if (selectedKey.isAcceptable())
                    {
                        // 十、若接受的事件是「接收就緒」 操做,就獲取客戶端鏈接
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        // 十一、切換爲非阻塞模式
                        socketChannel.configureBlocking(false);
                        // 十二、將該通道註冊到selector選擇器上
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                    else if (selectedKey.isReadable())
                    {
                        // 1三、獲取該選擇器上的「讀就緒」狀態的通道
                        SocketChannel socketChannel = (SocketChannel) selectedKey.channel();

                        // 1四、讀取數據
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int length = 0;
                        while ((length = socketChannel.read(byteBuffer)) != -1)
                        {
                            byteBuffer.flip();
                            System.out.println(new String(byteBuffer.array(), 0, length));
                            byteBuffer.clear();
                        }
                        socketChannel.close();
                    }

                    // 1五、移除選擇鍵
                    selectedKeys.remove();
                }
            }

            // 七、關閉鏈接
            serverSocketChannel.close();
        }

        public static void main(String[] args) throws IOException
        {
            testServer();
        }
    }

 

實際上的Reactor模式,是基於Java NIO的,在他的基礎上,抽象出來兩個組件——Reactor和Handler兩個組件:

(1)Reactor:負責響應IO事件,當檢測到一個新的事件,將其發送給相應的Handler去處理;新的事件包含鏈接創建就緒、讀就緒、寫就緒等。

(2)Handler:將自身(handler)與事件綁定,負責事件的處理,完成channel的讀入,完成處理業務邏輯後,負責將結果寫出channel。

 

4.1. 什麼是單線程Reactor呢?

 

以下圖所示:

wpsC334.tmp

這是最簡單的單Reactor單線程模型。Reactor線程是個多面手,負責多路分離套接字,Accept新鏈接,並分派請求到Handler處理器中。

下面的圖,來自於「Scalable IO in Java」,和上面的圖的意思,差很少。Reactor和Hander 處於一條線程執行。

wpsC345.tmp

順便說一下,能夠將上圖的accepter,看作是一種特殊的handler。

 

4.2. 單線程Reactor的參考代碼

「Scalable IO in Java」,實現了一個單線程Reactor的參考代碼,Reactor的代碼以下:

 

package com.crazymakercircle.ReactorModel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class Reactor implements Runnable
{
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException
    { //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverSocket.configureBlocking(false);

        //分步處理,第一步,接收accept事件
        SelectionKey sk =
                serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new Acceptor());
    }

    public void run()
    {
        try
        {
            while (!Thread.interrupted())
            {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                {
                    //Reactor負責dispatch收到的事件
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void dispatch(SelectionKey k)
    {
        Runnable r = (Runnable) (k.attachment());
        //調用以前註冊的callback對象
        if (r != null)
        {
            r.run();
        }
    }

    // inner class
    class Acceptor implements Runnable
    {
        public void run()
        {
            try
            {
                SocketChannel channel = serverSocket.accept();
                if (channel != null)
                    new Handler(selector, channel);
            } catch (IOException ex)
            { /* ... */ }
        }
    }
}

 

Handler的代碼以下:

 

package com.crazymakercircle.ReactorModel;


import com.crazymakercircle.config.SystemConfig;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

class Handler implements Runnable
{
    final SocketChannel channel;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
    ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector selector, SocketChannel c) throws IOException
    {
        channel = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = channel.register(selector, 0);

        //將Handler做爲callback對象
        sk.attach(this);

        //第二步,註冊Read就緒事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete()
    {
        /* ... */
        return false;
    }

    boolean outputIsComplete()
    {

        /* ... */
        return false;
    }

    void process()
    {
        /* ... */
        return;
    }

    public void run()
    {
        try
        {
            if (state == READING)
            {
                read();
            }
            else if (state == SENDING)
            {
                send();
            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void read() throws IOException
    {
        channel.read(input);
        if (inputIsComplete())
        {

            process();

            state = SENDING;
            // Normally also do first write now

            //第三步,接收write就緒事件
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException
    {
        channel.write(output);

        //write完就結束了, 關閉select key
        if (outputIsComplete())
        {
            sk.cancel();
        }
    }
}

 

這兩段代碼,是創建在JAVA NIO的基礎上的,這兩段代碼建議必定要看懂。能夠在IDE中去看源碼,這樣直觀感受更佳。

若是對NIO的Seletor不徹底瞭解,影響到上面的代碼閱讀,請閱讀瘋狂創客圈的Java NIO死磕 文章。

 

4.3. 單線程模式的缺點:

一、 當其中某個 handler 阻塞時, 會致使其餘全部的 client 的 handler 都得不到執行, 而且更嚴重的是, handler 的阻塞也會致使整個服務不能接收新的 client 請求(由於 acceptor 也被阻塞了)。 由於有這麼多的缺陷, 所以單線程Reactor 模型用的比較少。這種單線程模型不能充分利用多核資源,因此實際使用的很少。

二、所以,單線程模型僅僅適用於handler 中業務處理組件能快速完成的場景。

 

5. 多線程的Reactor

5.1. 基於線程池的改進

在線程Reactor模式基礎上,作以下改進:

(1)將Handler處理器的執行放入線程池,多線程進行業務處理。

(2)而對於Reactor而言,能夠仍爲單個線程。若是服務器爲多核的CPU,爲充分利用系統資源,能夠將Reactor拆分爲兩個線程。

一個簡單的圖以下:

image

5.2. 改進後的完整示意圖

下面的圖,來自於「Scalable IO in Java」,和上面的圖的意思,差很少,只是更加詳細。Reactor是一條獨立的線程,Hander 處於線程池中執行。

wpsC376.tmp

 

5.3. 多線程Reactor的參考代碼

「Scalable IO in Java」,的多線程Reactor的參考代碼,是基於單線程作一個線程池的改進,改進的Handler的代碼以下:

 

package com.crazymakercircle.ReactorModel;


import com.crazymakercircle.config.SystemConfig;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class MthreadHandler implements Runnable
{
    final SocketChannel channel;
    final SelectionKey selectionKey;
    ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
    ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
    static final int READING = 0, SENDING = 1;
    int state = READING;


    ExecutorService pool = Executors.newFixedThreadPool(2);
    static final int PROCESSING = 3;

    MthreadHandler(Selector selector, SocketChannel c) throws IOException
    {
        channel = c;
        c.configureBlocking(false);
        // Optionally try first read now
        selectionKey = channel.register(selector, 0);

        //將Handler做爲callback對象
        selectionKey.attach(this);

        //第二步,註冊Read就緒事件
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete()
    {
       /* ... */
        return false;
    }

    boolean outputIsComplete()
    {

       /* ... */
        return false;
    }

    void process()
    {
       /* ... */
        return;
    }

    public void run()
    {
        try
        {
            if (state == READING)
            {
                read();
            }
            else if (state == SENDING)
            {
                send();
            }
        } catch (IOException ex)
        { /* ... */ }
    }


    synchronized void read() throws IOException
    {
        // ...
        channel.read(input);
        if (inputIsComplete())
        {
            state = PROCESSING;
            //使用線程pool異步執行
            pool.execute(new Processer());
        }
    }

    void send() throws IOException
    {
        channel.write(output);

        //write完就結束了, 關閉select key
        if (outputIsComplete())
        {
            selectionKey.cancel();
        }
    }

    synchronized void processAndHandOff()
    {
        process();
        state = SENDING;
        // or rebind attachment
        //process完,開始等待write就緒
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }

    class Processer implements Runnable
    {
        public void run()
        {
            processAndHandOff();
        }
    }

}

 

Reactor 類沒有大的變化,參考前面的代碼。

6. Reactor持續改進

對於多個CPU的機器,爲充分利用系統資源,將Reactor拆分爲兩部分。代碼以下:

 

package com.crazymakercircle.ReactorModel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class MthreadReactor implements Runnable
{

    //subReactors集合, 一個selector表明一個subReactor
    Selector[] selectors=new Selector[2];
    int next = 0;
    final ServerSocketChannel serverSocket;

    MthreadReactor(int port) throws IOException
    { //Reactor初始化
        selectors[0]=Selector.open();
        selectors[1]= Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverSocket.configureBlocking(false);


        //分步處理,第一步,接收accept事件
        SelectionKey sk =
                serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new Acceptor());
    }

    public void run()
    {
        try
        {
            while (!Thread.interrupted())
            {
                for (int i = 0; i <2 ; i++)
                {
                    selectors[i].select();
                    Set selected =  selectors[i].selectedKeys();
                    Iterator it = selected.iterator();
                    while (it.hasNext())
                    {
                        //Reactor負責dispatch收到的事件
                        dispatch((SelectionKey) (it.next()));
                    }
                    selected.clear();
                }

            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void dispatch(SelectionKey k)
    {
        Runnable r = (Runnable) (k.attachment());
        //調用以前註冊的callback對象
        if (r != null)
        {
            r.run();
        }
    }


    class Acceptor { // ...
        public synchronized void run() throws IOException
        {
            SocketChannel connection =
                    serverSocket.accept(); //主selector負責accept
            if (connection != null)
            {
                new Handler(selectors[next], connection); //選個subReactor去負責接收到的connection
            }
            if (++next == selectors.length) next = 0;
        }
    }
}

 

7. Reactor編程的優勢和缺點

6.1. 優勢

1)響應快,沒必要爲單個同步時間所阻塞,雖然Reactor自己依然是同步的;

2)編程相對簡單,能夠最大程度的避免複雜的多線程及同步問題,而且避免了多線程/進程的切換開銷;

3)可擴展性,能夠方便的經過增長Reactor實例個數來充分利用CPU資源;

4)可複用性,reactor框架自己與具體事件處理邏輯無關,具備很高的複用性;

6.2. 缺點

1)相比傳統的簡單模型,Reactor增長了必定的複雜性,於是有必定的門檻,而且不易於調試。

2)Reactor模式須要底層的Synchronous Event Demultiplexer支持,好比Java中的Selector支持,操做系統的select系統調用支持,若是要本身實現Synchronous Event Demultiplexer可能不會有那麼高效。

3) Reactor模式在IO讀寫數據時仍是在同一個線程中實現的,即便使用多個Reactor機制的狀況下,那些共享一個Reactor的Channel若是出現一個長時間的數據讀寫,會影響這個Reactor中其餘Channel的相應時間,好比在大文件傳輸時,IO操做就會影響其餘Client的相應時間,於是對這種操做,使用傳統的Thread-Per-Connection或許是一個更好的選擇,或則此時使用改進版的Reactor模式如Proactor模式。

 

在開啓Netty源碼前,上面的經典代碼,必定要看懂哦!

 

 

瘋狂創客圈 實戰計劃
  • Netty 億級流量 高併發  IM後臺 開源項目實戰

  • Netty 源碼、原理、JAVA NIO 原理

  • Java 面試題 一網打盡

  • 瘋狂創客圈 【 博客園 總入口 】

相關文章
相關標籤/搜索