深刻的聊聊 Java NIO

趁着三天假期,把Java NIOReactor模式整理總結了下,文章特別細節的知識點沒有寫,如一些API的具體實現。相似數據讀到Buffer後再寫出時,爲何須要復位操做,這些都屬於NIO基礎知識,是學習Reactor模式的前置條件。java

1. 原始Ractor模式

相關組件的解釋

  1. Handle(句柄或是描述符):本質上表示一種資源,是操做系統提供的;該資源用於表示一個個事件,好比文件描述符,或者是針對於網絡編程中的Socket描述符。事件既能夠來自於外部,也能夠來自內部;外部事件好比說客戶端的鏈接請求,客戶端發送過來數據等;內部事件好比說操縱系統產生的定時器事件等。它本質上就是一個文件描述符。Handle是事件產生的發源地。
  2. Synchronous Event Demultiplexer(同步事件分離器):它自己是一個系統調用,用於等待事件的發生(事件多是一個,也多是多個)。調用方在調用它的時候會被阻塞,一直阻塞到同步事件分離器上有事件產生爲止。對於Linux來講,同步事件分離器指的就是經常使用的I/O多路複用機制,好比說selectpollepoll等。在Java NIO中,同步事件分離器對應的組件就是Selector;對應的阻塞方法就是select方法。
  3. Event Handler(事件處理器) 自己由多個回調方法構成,這些回調構成了與應用相關的對於某個事件的反饋機制。Netty相比於Java NIO來講,在事件處理器這個角色上進行一個升級,它爲咱們開發者提供了大量的回調方法,供咱們在待定事件產生時實現相應的回調方法進行業務邏輯的處理。
  4. Concrete Event Handler(具體事件處理器):它自己實現了事件處理所提供的各個回調方法,從而實現了特定於業務的邏輯。它本質上就是咱們所編寫的一個個的處理器實現。
  5. Initiation Dispatcher(初始分發器):實際上就是Reactor角色。它自己定義了一些規範,這些規範用於控制事件的調度方式,同時又提供了應用進行事件處理器的註冊、刪除等。Initiation Dispatcher會經過同步事件分離器來等待事件的發生,一旦事件發生,Initiation Dispatcher首先會分離出每個事件,而後調用事件處理器,最後調用相關的回調方法來處理事件。

執行流程分析

  1. 當應用像Initiation Dispatcher註冊具體的事件處理器時,應用會標識出事件處理器但願Initiation Dispatcher在某個事件發生時向其通知該事件,該事件與Handle關聯。
  2. Initiation Dispatcher會要求每一個事件向其傳遞內部的Handle。該Handle向操做系統標識了事件處理器。
  3. 當全部事件處理器註冊完畢後,應用會調用handle_events方法來啓動Initiation Dispatcher的事件循環。這時,Initiation Dispatcher會將每一個註冊的事件管理器的Handle合併起來,並使用同步事件分離器等待這些事件的發生。好比說,TCP協議層使用select同步事件分離器操做來等待客戶端發送的數據到達鏈接的socker handle上。
  4. 當與某個事件源對應的Handle變爲ready狀態時(好比說,TCP socker變爲等待讀狀態時),同步事件分離器就會通知Initiation Dispatcher
  5. Initiation Dispatcher會觸發事件處理器的回調方法,從而響應這個處於ready狀態的HandleInitiation Dispatcher會回調事件處理器的handle_events回調方法來執行特定於應用的功能(開發者本身所編寫的功能),從而響應這個事件。所發生的事件類型能夠做爲該方法參數並被該方法內部使用來執行額外的特定於服務的功能。

以上描述的內容彷佛和本文的標題不大,其實否則,它正是下面介紹的內容的開端。react

2. 經過一個例子拉近與Java NIO的距離

/**
 * @Author CoderJiA
 * @Description NIOServer
 * @Date 13/2/19 下午4:59
 **/
public class NIOServer {

    public static void main(String[] args) throws Exception{

        // 1.建立ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8899));

        // 2.建立Selector,並ServerSocketChannel註冊OP_ACCEPT事件,接收鏈接。
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 3.開啓輪詢
        while (selector.select() > 0) {
            // 從selector全部事件就緒的key,並遍歷處理。
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            selectionKeys.forEach(selectionKey -> {
                SocketChannel client;
                try {
                    if (selectionKey.isAcceptable()) {  // 接受事件就緒
                        // 獲取serverSocketChannel
                        ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
                        // 接收鏈接
                        client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_READ);
                    } else if (selectionKey.isReadable()) {  // 讀事件就緒
                        // 獲取socketChannel
                        client = (SocketChannel) selectionKey.channel();
                        // 建立buffer,並將獲取socketChannel中的數據讀入到buffer中
                        ByteBuffer readBuf = ByteBuffer.allocate(1024);
                        int readCount = client.read(readBuf);
                        if (readCount <= 0) {
                            return;
                        }
                        Charset charset = Charset.forName(StandardCharsets.UTF_8.name());
                        readBuf.flip();
                        System.out.println(String.valueOf(charset.decode(readBuf).array()));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                selectionKeys.remove(selectionKey);
            });
        }

    }
複製代碼

經過這個例子,與原始Reactor模式相對應的理解,好比同步事件分離器對應着Selectorselect()方法,再好比ServerSocketChannel註冊給SelectorOP_ACCEPT,還有SocketChannelOP_READOP_WRITE,這些事件保存在操做系統上,其實就是原始Reactor中的Handlegit

四個重要api

  1. Channel:Connections to files,sockets etc that support non-blocking reads.
  2. Buffer:Array-like objects that can be directly read or written by Channels.
  3. Selector:Tell which of a set of Channels have IO events.
  4. SelectionKeys:Maintain IO event status and bingdings.

3.用Java NIO對Reactor模式的應用。

3.1 Single threaded version

/**
 * @Author CoderJiA
 * @Description Reactor
 * @Date 5/4/19 下午2:25
 **/
public abstract class Reactor implements Runnable{


    protected final Selector selector;
    protected final ServerSocketChannel serverSocket;

    protected final long port;
    protected final long timeout;

    public Reactor(int port, long timeout) throws IOException {
        this.port = port;
        this.timeout = timeout;
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket
                .socket()
                .bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(newAcceptor(selector));
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                if (selector.select(timeout) > 0) {
                    Set<SelectionKey> selected = selector.selectedKeys();
                    selected.forEach(sk -> {
                        dispatch(sk);
                        selected.remove(sk);
                    });
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void dispatch(SelectionKey sk) {
        Runnable r = (Runnable)(sk.attachment());
        if (Objects.nonNull(r)) {
            r.run();
        }
    }

    public abstract Acceptor newAcceptor(Selector selector);

}

複製代碼
/**
 * @Author CoderJiA
 * @Description Acceptor
 * @Date 5/4/19 下午2:58
 **/
public class Acceptor implements Runnable {

    private final Selector selector;
    private final ServerSocketChannel serverSocket;

    public Acceptor(Selector selector, ServerSocketChannel serverSocket) {
        this.selector = selector;
        this.serverSocket = serverSocket;
    }

    @Override
    public void run() {
        try {
            SocketChannel socket = serverSocket.accept();
            if (Objects.nonNull(socket)) {
                new Handler(selector, socket);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
複製代碼
/**
 * @Author CoderJiA
 * @Description Handler
 * @Date 5/4/19 下午4:25
 **/
public class Handler implements Runnable {

    private static final int MB = 1024 * 1024;

    protected final SocketChannel socket;
    protected final SelectionKey sk;
    protected final ByteBuffer input = ByteBuffer.allocate(MB);
    protected final ByteBuffer output = ByteBuffer.allocate(MB);

    private static final int READING = 0, SENDING = 1;
    private int state = READING;

    public Handler(Selector selector, SocketChannel socket) throws IOException {
        this.socket = socket;
        socket.configureBlocking(false);
        sk = socket.register(selector, SelectionKey.OP_READ);
        sk.attach(this);
    }

    @Override
    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            state = SENDING;
            sk.interestOps(SelectionKey.OP_WRITE);
        }
        input.clear();
    }

    private void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) {
            sk.cancel();
        }
    }

    private boolean inputIsComplete() {
        return input.position() > 0;
    }

    private boolean outputIsComplete() {
        return !output.hasRemaining();
    }

}
複製代碼
/**
 * @Author CoderJiA
 * @Description EchoReactor
 * @Date 5/4/19 下午5:01
 **/
public class EchoReactor extends Reactor {

    private static final int PORT = 9999;
    private static final long TIME_OUT = TimeUnit.MILLISECONDS.toMillis(10);

    public EchoReactor(int port, long timeout) throws IOException {
        super(port, timeout);
    }

    @Override
    public Acceptor newAcceptor(Selector selector) {
        return new Acceptor(selector, this.serverSocket);
    }

    public static void main(String[] args) throws IOException {
        new EchoReactor(PORT, TIME_OUT).run();
    }

}
複製代碼
核心組件組件分析
  1. Reactor等同於原始Reactor模式Initiation Dispatcher,它負責全部就緒事件統一分發到事件處理器,如AcceptorHanlder
  2. Acceptor用於將接收到的SocketChannel交給Handler處理。
  3. Handler處理讀寫操做。

這是Reactor的單線程版本,這個版本一個線程處理客戶端的接收數據處理以及讀寫操做,數據處理每每就是咱們實際開發中的業務處理,是比較耗時的。若是一個處理過程處於阻塞,那麼這個模型所表現出的就處於阻塞,因此一個數據處理的阻塞會致使不能處理客戶端鏈接的接收。所以衍生出來下面的多工做線程版原本優化Handlergithub

3.2 Worker Threads version

調整下Handler編程

package cn.coderjia.nio.douglea.reactor2;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Author CoderJiA
 * @Description Handler
 * @Date 5/4/19 下午4:25
 **/
public class Handler implements Runnable {

    private static final int MB = 1024 * 1024;

    protected final SocketChannel socket;
    protected final SelectionKey sk;
    protected final ByteBuffer input = ByteBuffer.allocate(MB);
    protected final ByteBuffer output = ByteBuffer.allocate(MB);

    private static final int READING = 0, SENDING = 1, PROCESSING = 3;
    private int state = READING;

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public Handler(Selector selector, SocketChannel socket) throws IOException {
        this.socket = socket;
        socket.configureBlocking(false);
        sk = socket.register(selector, SelectionKey.OP_READ);
        sk.attach(this);
    }

    @Override
    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            EXECUTOR_SERVICE.execute(new Processer());
        }
        input.clear();
    }

    private void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) {
            sk.cancel();
        }
    }

    private void process() {
        System.out.println("Handler.process()...");
    }

    private boolean inputIsComplete() {
        return input.position() > 0;
    }

    private boolean outputIsComplete() {
        return !output.hasRemaining();
    }

    class Processer implements Runnable {
        public void run() {
            processAndHandOff();
        }
    }

    synchronized void processAndHandOff() {
        process();
        state = SENDING;
        sk.interestOps(SelectionKey.OP_WRITE);
    }

}
複製代碼

Handler多工做線程版本將耗時的process(),建立線程去處理。這個版本Reactor既負責客戶端的接收事件,又負責讀寫事件,由於對於高併發場景鏈接數巨大,Reactor可能有時候會力不從心。所以衍生出下面的主從Reactor模型。api

3.3 Multiple Reactors Version

調整Acceptor

/**
 * @Author CoderJiA
 * @Description Acceptor3
 * @Date 6/4/19 下午6:51
 **/
public class Acceptor3 implements Runnable {

    private final ServerSocketChannel serverSocket;

    public Acceptor3(ServerSocketChannel serverSocket) {
        this.serverSocket = serverSocket;
    }

    @Override
    public void run() {
        try {
            SocketChannel socket = serverSocket.accept();
            if (Objects.nonNull(socket)) {
                new Handler(EchoReactor.nextSubReactor().selector, socket);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
}
複製代碼

調整Reactorbash

/**
 * @Author CoderJiA
 * @Description Reactor3
 * @Date 6/4/19 下午6:51
 **/
public abstract class Reactor3 implements Runnable {


    protected Selector selector;
    protected ServerSocketChannel serverSocket;

    protected final int port;
    protected final long timeout;
    protected final boolean isMainReactor;

    public Reactor3(int port, long timeout, boolean isMainReactor) {
        this.port = port;
        this.timeout = timeout;
        this.isMainReactor = isMainReactor;
    }

    @Override
    public void run() {
        try {
            init();
            while (!Thread.interrupted()) {
                if (selector.select(timeout) > 0) {
                    System.out.println("isMainReactor:" + isMainReactor);
                    Set<SelectionKey> selected = selector.selectedKeys();
                    selected.forEach(sk -> {
                        dispatch(sk);
                        selected.remove(sk);
                    });
                    selected.clear();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void init() throws IOException {
        selector = Selector.open();
        if (isMainReactor) {
            serverSocket = ServerSocketChannel.open();
            serverSocket
                    .socket()
                    .bind(new InetSocketAddress(port));
            serverSocket.configureBlocking(false);
            SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            sk.attach(newAcceptor());
        }
    }

    private void dispatch(SelectionKey sk) {
        Runnable r = (Runnable)(sk.attachment());
        if (Objects.nonNull(r)) {
            r.run();
        }
    }

    public abstract Acceptor3 newAcceptor();

}

複製代碼
/**
 * @Author CoderJiA
 * @Description EchoReactor
 * @Date 6/4/19 下午5:35
 **/
public class EchoReactor extends Reactor3 {

    private static final int PORT = 9999;
    private static final long TIME_OUT = TimeUnit.MILLISECONDS.toMillis(10);

    private static final int SUB_REACTORS_SIZE = 2;
    private static final Reactor3[] SUB_REACTORS = new Reactor3[SUB_REACTORS_SIZE];
    private static final AtomicInteger NEXT_INDEX = new AtomicInteger(0);

    static {
        // 初始化子Reactor
        IntStream.range(0, SUB_REACTORS_SIZE).forEach(i -> SUB_REACTORS[i] = new EchoReactor(PORT, TIME_OUT, false));
    }

    public static Reactor3 nextSubReactor(){

        int curIdx = NEXT_INDEX.getAndIncrement();

        if(curIdx >= SUB_REACTORS_SIZE){
            NEXT_INDEX.set(0);
            curIdx = 0;
        }
        return SUB_REACTORS[(curIdx % SUB_REACTORS_SIZE)];
    }

    public EchoReactor(int port, long timeout, boolean isMainReactor) {
        super(port, timeout, isMainReactor);
    }

    @Override
    public Acceptor3 newAcceptor() {
        return new Acceptor3(this.serverSocket);
    }

    public static void main(String[] args) {

        Reactor3 mainReactor = new EchoReactor(PORT, TIME_OUT, true);

        // 啓動主Reactor
        new Thread(mainReactor).start();

        // 啓動子Reactor
        IntStream.range(0, SUB_REACTORS_SIZE).forEach(i -> new Thread(SUB_REACTORS[i]).start());
    }

}
複製代碼

主從Reactor模型,主Reactor用於處理客戶端鏈接的接收轉發給Acceptor處理,子Reactor處理讀寫事件的接收轉發給Handler處理。網絡

參考文章

Scalable IO in Java併發

源碼地址

github.com/coderjia061…socket

相關文章
相關標籤/搜索