簡單的聊聊NIO的三種線程模型

這篇文章主要是接下來Netty源碼閱讀過程的預熱,Netty的線程模型與這邊文章提供的三種相關。Netty沒有第二種,可是在主從Reactor多工做線程上又多了個多主線程模型。個人全部的NIO相關文章,都是爲接下來Netty源碼閱讀系列的準備。java

NIO對應的線程模型的設計模型

  • Reactor單線程模型
  • Reactor多工做線程模型
  • 主從Reactor多工做線程模型

Reactor單線程模型

/**
 * @Author CoderJiA
 * @Description NIOServer
 * @Date 13/2/19 下午4:59
 **/
public class NIOServer {

    public static void main(String[] args) throws Exception{

        // 1.建立ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8899));

        // 2.建立Selector,並ServerSocketChannel註冊OP_ACCEPT事件,接收鏈接。
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 3.開啓輪詢
        while (selector.select() > 0) {
            // 從selector全部事件就緒的key,並遍歷處理。
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            selectionKeys.forEach(selectionKey -> {
                SocketChannel client;
                try {
                    if (selectionKey.isAcceptable()) {  // 接受事件就緒
                        // 獲取serverSocketChannel
                        ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
                        // 接收鏈接
                        client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_READ);
                    } else if (selectionKey.isReadable()) {  // 讀事件就緒
                        // 獲取socketChannel
                        client = (SocketChannel) selectionKey.channel();
                        // 建立buffer,並將獲取socketChannel中的數據讀入到buffer中
                        ByteBuffer readBuf = ByteBuffer.allocate(1024);
                        int readCount = client.read(readBuf);
                        if (readCount <= 0) {
                            return;
                        }
                        Charset charset = Charset.forName(StandardCharsets.UTF_8.name());
                        readBuf.flip();
                        System.out.println(String.valueOf(charset.decode(readBuf).array()));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                selectionKeys.remove(selectionKey);
            });
        }

    }
複製代碼

Reactor多工做線程模型

/**
 * @Author CoderJiA
 * @Description NIOServer
 **/
public class NIOServer {

    public static void main(String[] args) throws Exception{

        // 1.建立ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8899));

        // 2.建立Selector,並ServerSocketChannel註冊OP_ACCEPT事件,接收鏈接。
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 3.開啓輪詢
        while (selector.select() > 0) {
            // 從selector全部事件就緒的key,並遍歷處理。
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            selectionKeys.forEach(selectionKey -> {
                SocketChannel client;
                try {
                    if (selectionKey.isAcceptable()) {  // 接受事件就緒
                        // 獲取serverSocketChannel
                        ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
                        // 接收鏈接
                        client = server.accept();
                        client.configureBlocking(false);
                        SelectionKey readKey = client.register(selector, SelectionKey.OP_READ);
                        readKey.attach(new Processor());
                    } else if (selectionKey.isReadable()) {  // 讀事件就緒
                        Processor processor = (Processor) selectionKey.attachment();
                        processor.process(selectionKey);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                selectionKeys.remove(selectionKey);
            });
        }

    }

}
複製代碼
/**
 * @Author CoderJiA
 * @Description Processor
 **/
public class Processor {

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(16);

    public void process(SelectionKey selectionKey) {
        EXECUTOR_SERVICE.execute(() -> {
            try {
                // 獲取socketChannel
                SocketChannel client = (SocketChannel) selectionKey.channel();
                // 建立buffer,並將獲取socketChannel中的數據讀入到buffer中
                ByteBuffer readBuf = ByteBuffer.allocate(1024);
                int readCount = client.read(readBuf);
                if (readCount <= 0) {
                    return;
                }
                Charset charset = Charset.forName(StandardCharsets.UTF_8.name());
                readBuf.flip();
                System.out.println(charset.decode(readBuf).array());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

}
複製代碼

主從Reactor多線程模型

/**
 * @Author CoderJiA
 * @Description NIOServer
 * @Date 13/2/19 下午4:59
 **/
public class NIOServer {

    public static void main(String[] args) throws Exception{

        // 1.建立ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8899));

        // 2.建立Selector,並ServerSocketChannel註冊OP_ACCEPT事件,接收鏈接。
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 3.建立processor
        int coreProcessorNum = Runtime.getRuntime().availableProcessors();
        Processor[] processors = new Processor[coreProcessorNum];
        IntStream.range(0, processors.length).forEach(i -> processors[i] = new Processor());

        // 4.開啓輪詢
        MutableInt index = new MutableInt(0);
        while (selector.select() > 0) {
            // 主Reactor監聽鏈接
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            selectionKeys.forEach(selectionKey -> {
                try {
                    if (selectionKey.isAcceptable()) {  // 接受事件就緒
                        // 獲取serverSocketChannel
                        ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
                        // 接收鏈接
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        Processor processor = processors[(index.getValue()) % coreProcessorNum];
                        addVal(coreProcessorNum, index);
                        processor.addSocketChannel(client);
                        processor.wakeup();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                selectionKeys.remove(selectionKey);
            });
        }

    }

    private static void addVal(int coreProcessorNum, MutableInt index) {
        if (index.getValue() > coreProcessorNum * 100) {
            index.setValue(0);
        } else {
            index.increment();
        }
    }

}
複製代碼
/**
 * @Author CoderJiA
 * @Description Processor
 **/
public class Processor {

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

    private Selector selector;

    public Processor() {
        try {
            this.selector = Selector.open();
            start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void addSocketChannel(SocketChannel socketChannel) throws ClosedChannelException {
        System.out.println("processor addSocketChannel...");
        socketChannel.register(this.selector, SelectionKey.OP_READ);
    }

    public void wakeup() {
        System.out.println("processor wakeup...");
        this.selector.wakeup();
    }

    private void start() {
        System.out.println("processor start...");
        EXECUTOR_SERVICE.execute(this::run);
    }

    private void run() {
        System.out.println("processor run...");
        try {
            while (true) {
                if (selector.select(100) <= 0) {
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                selectionKeys.forEach(selectionKey -> {
                    try {
                        if (selectionKey.isReadable()) {
                            // 獲取socketChannel
                            SocketChannel client = (SocketChannel) selectionKey.channel();
                            // 建立buffer,並將獲取socketChannel中的數據讀入到buffer中
                            ByteBuffer readBuf = ByteBuffer.allocate(1024);
                            int readCount = client.read(readBuf);
                            if (readCount <= 0) {
                                return;
                            }
                            Charset charset = Charset.forName("UTF-8");
                            readBuf.flip();
                            System.out.println(charset.decode(readBuf).array());
                        }
                        selectionKeys.remove(selectionKey);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
複製代碼

參考文章地址:www.jasongj.com/java/nio_re…react

代碼地址: github.com/coderjia061…git

相關文章
相關標籤/搜索