Reactor線程模型及其在Netty中的應用

什麼是Reactor線程模型

Java中線程模型大體能夠分爲:java

  1. 單線程模型
  2. 多線程模型
  3. 線程池模型(executor)
  4. Reactor線程模型

單線程模型中,server端使用一個線程來處理全部的請求,全部的請求必須串行化處理,效率低下。 多線程模型中,server端會爲每一個請求分配一個線程去處理請求,相對單線程模型而言多線程模型效率更高,可是多線程模型的缺點也很明顯:server端爲每一個請求都開闢一個線程來處理請求,若是請求數量很大,則會形成大量線程被建立,形成內存溢出。Java中線程是比較昂貴的對象。線程的數量不該該無限量的增大,當線程數超過必定數目後,增長線程不只不能提升效率,反而會下降效率。 藉助"對象複用"的思想,線程池應運而生,線程池中通常有必定數目的線程,當請求數目超過線程數以後須要排隊等待。這樣就避免了線程會不停地增加。這裏不打算對Java的線程池作過多介紹,有興趣的能夠去看我以前的文章:java線程池git

Reactor是一種處理模式。Reactor模式是處理併發I/O比較常見的一種模式,用於同步I/O,中心思想是將全部要處理的IO事件註冊到一箇中心I/O多路複用器上,同時主線程/進程阻塞在多路複用器上;一旦有I/O事件到來或是準備就緒(文件描述符或socket可讀、寫),多路複用器返回並將事先註冊的相應I/O事件分發到對應的處理器中。程序員

Reactor也是一種實現機制。Reactor利用事件驅動機制實現,和普通函數調用的不一樣之處在於:應用程序不是主動的調用某個API完成處理,而是偏偏相反,Reactor逆置了事件處理流程,應用程序須要提供相應的接口並註冊到Reactor上,若是相應的事件發生,Reactor將主動調用應用程序註冊的接口,這些接口又稱爲「回調函數」。用「好萊塢原則」來形容Reactor再合適不過了:不要打電話給咱們,咱們會打電話通知你。github

爲何須要Reactor模型

Reactor模型實質上是對I/O多路複用的一層包裝,理論上來講I/O多路複用的效率已經夠高了,爲何還須要Reactor模型呢?答案是I/O多路複用雖然性能已經夠高了,可是編碼複雜,在工程效率上仍是過低。所以出現了Reactor模型。編程

一個個網絡請求可能涉及到多個I/O請求,相比傳統的單線程完整處理請求生命期的方法,I/O複用在人的大腦思惟中並不天然,由於,程序員編程中,處理請求A的時候,假定A請求必須通過多個I/O操做A1-An(兩次IO間可能間隔很長時間),每通過一次I/O操做,再調用I/O複用時,I/O複用的調用返回裏,很是可能再也不有A,而是返回了請求B。即請求A會常常被請求B打斷,處理請求B時,又被C打斷。這種思惟下,編程容易出錯。後端

Reactor模型

通常來講處理一個網絡請求須要通過如下五個步驟:數組

  1. 讀取請求數據(read request)
  2. 解碼數據(decode request)
  3. 計算,生成響應(compute)
  4. 編碼響應(encode response)
  5. 發送響應數據(send response) 以下圖所示:
ClassicServiceDesign
不難看出,上面的模型中讀取請求數據和發送響應和業務處理邏輯耦合在一塊兒,都是用一個handler線程處理,當發生讀寫事件時線程將被阻塞,沒法處理其餘的事情。既然不能耦合在一塊兒,那天然解決方案是將讀寫操做和其餘三個步驟進行分離:有專門的線程或者線程池負責鏈接請求,而後使用多路複用器來監測Socket上的讀寫事件,其餘的處理委派給業務線程池進行處理。讀寫操做不阻塞主線程。

Reactor模型有三種線程模型:bash

  1. 單線程模型
  2. 多線程模型(單Reactor)
  3. 多線程模型(多Reactor)

單線程Reactor模型

單線程模型中Reactor既負責Accept新的鏈接請求,又負責分派請求到具體的handler中進行處理,通常不使用這種模型,由於單線程效率比較低下。網絡

BasicReactorPattern

下面是基於Java NIO單線程Reactor模型的實現:多線程

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException { // Reactor設置
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(
                new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk =
                serverSocket.register(selector,
                        SelectionKey.OP_ACCEPT); // 監聽Socket鏈接事件
        sk.attach(new Acceptor());
    }

    public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey) (it.next());
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null)
            r.run();
    }

    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            } catch (IOException ex) { /* ... */ }
        }
    }
}


final class Handler implements Runnable {
    static final int READING = 0, SENDING = 1;
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(1024);
    ByteBuffer output = ByteBuffer.allocate(1024);
    int state = READING;

    Handler(Selector sel, SocketChannel c)
            throws IOException {
        socket = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() { /* ... */ }

    boolean outputIsComplete() { /* ... */ }

    void process() { /* ... */ }

    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel();
    }
}
複製代碼

多線程模型(單Reactor)

該模型在事件處理器(Handler)鏈部分採用了多線程(線程池),也是後端程序經常使用的模型。模型圖以下:

WorkerThreadPools
其實現大體代碼以下: ``` class Handler implements Runnable { // 使用線程池 static PooledExecutor pool = new PooledExecutor(...); static final int PROCESSING = 3; // ... synchronized void read() { // ... socket.read(input); if (inputIsComplete()) { state = PROCESSING; pool.execute(new Processer()); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment sk.interest(SelectionKey.OP_WRITE); } class Processer implements Runnable { public void run() { processAndHandOff(); } } } ```

多線程模型(多Reactor)

比起多線程單Rector模型,它是將Reactor分紅兩部分,mainReactor負責監聽並Accept新鏈接,而後將創建的socket經過多路複用器(Acceptor)分派給subReactor。subReactor負責多路分離已鏈接的socket,讀寫網絡數據;業務處理功能,其交給worker線程池完成。一般,subReactor個數上可與CPU個數等同。其模型圖以下:

UsingMultiplyReactors

Netty線程模型

Netty的線程模型相似於Reactor模型。Netty中ServerBootstrap用於建立服務端,下圖是它的結構:

Netty-ServerBootstrap
ServerBootstrap繼承自AbstractBootstrap,AbstractBootstrap中的group屬性就是Netty中的Acceptor,用於接受請求。而ServerBootstrap中的childGroup對應於Reactor模型中的worker線程池。 請求過來後Netty從group線程池中選出一個線程來創建鏈接,鏈接創建後委派給childGroup中的worker線程處理。 服務端線程模型工做原理以下圖:
Netty服務端線程工做流程

下面是一個完整的Netty服務端的例子:

public class TimeServer {
    public void bind(int port) {
        // Netty的多Reactor線程模型,bossGroup是Acceptor線程池,用於接受鏈接。workGroup是Worker線程池,處理業務。
        // bossGroup是Acceptor線程池
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // workGroup是Worker線程池
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            // 綁定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服務端監聽端口關閉
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new TimeServerHandler());
        }
    }
}

public class TimeServerHandler extends ChannelHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // msg轉Buf
        ByteBuf buf = (ByteBuf) msg;
        // 建立緩衝中字節數的字節數組
        byte[] req = new byte[buf.readableBytes()];
        // 寫入數組
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        String currenTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(
                System.currentTimeMillis()).toString() : "BAD ORDER";
        // 將要返回的信息寫入Buffer
        ByteBuf resp = Unpooled.copiedBuffer(currenTime.getBytes());
        // buffer寫入通道
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // write讀入緩衝數組後經過invoke flush寫入通道
        ctx.flush();
    }
}
複製代碼

參考資料

Scalable IO in Java
Netty 系列之 Netty 線程模型

相關文章
相關標籤/搜索