java NIO AIO

NIO簡介

NIO究竟是什麼的簡稱?有人喜歡稱之爲New IO,由於它相對於之前的IO是新增的,因此官方稱之爲New IO。可是,因爲以前的IO類庫是阻塞的,New IO就是要讓Java可以支持非阻塞IO,因此,也有人喜歡稱之爲Non-block IO。 java

1.緩衝區Buffer

Buffer 是一個對象, 它包含一些要寫入或者剛讀出的數據。 在 NIO 中加入 Buffer 對象,體現了新庫與原 I/O 的一個重要區別。在面向流的 I/O 中,您將數據直接寫入或者將數據直接讀到 Stream 對象中。
在 NIO 庫中,全部數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的。在寫入數據時,它是寫入到緩衝區中的。任什麼時候候訪問 NIO 中的數據,您都是將它放到緩衝區中。
緩衝區實質上是一個數組。一般它是一個字節數組,可是也可使用其餘種類的數組。可是一個緩衝區不 僅僅 是一個數組。緩衝區提供了對數據的結構化訪問,並且還能夠跟蹤系統的讀/寫進程。
最經常使用的緩衝區類型是 ByteBuffer。一個 ByteBuffer 能夠在其底層字節數組上進行 get/set 操做(即字節的獲取和設置)。react

2.通道Channel

Channel是一個通道,能夠經過它讀取與寫入數據,它就像自來水管同樣,網絡數據經過Channel讀取和寫入。通道與流的不一樣之處在於通道是雙向的,流只是在一個方向上移動(一個流必須是 InputStream 或者 OutputStream 的子類),並且通道能夠用於讀、寫或者同時用於讀寫。
由於Channel是全雙工的,因此它能夠比流更好的映射底層操做系統的API。特別是在UNIX網絡編程模型中,底層操做系統的通道都是全雙工的,同時支持讀寫操做。編程

3.多路複用器Selector

多路複用器提供選擇已經就緒的任務的能力。簡單來說,Selector會不斷地輪詢註冊在其上的Channel,若是某個channel上有新的TCP鏈接接入、讀和寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,而後經過SelectionKey能夠獲取就緒Channel的集合,進行後續的I/O操做。
一個多路複用器Selector能夠同時輪詢多個Channel,因爲JDK使用epool()代替傳統的select實現,因此它並無最大連續句柄1024/2048的限制。這也就意味着只須要一個線程負責Selector的輪詢,就能夠接入成千上萬的客戶端。數組

NIO服務端序列圖

通常流程

打開ServerSocketChannel,用於監聽客戶端的連接

ServerSocketChannel acceptor = ServerSocketChannel.open();

綁定監聽端口,設置鏈接爲非阻塞模式

int port = 8080;
    acceptor.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"),port));
    acceptor.configureBlocking(false);

建立Reactor線程,建立多路複用器並啓動線程

Selector selector = Selector.open();
    new Thread(new ReactorTask()).start();

將ServerSocketChannel註冊到Reactor線程的多路複用器Selcetor上

SelectionKey key = acceptor.register(selector,SelectionKey.OP_ACCEPT,ioHandler);

輪詢

int num = selector.select();
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> keys = selectedKeys.iterator();

    while(keys.hasNext()){
        SelectionKey key = keys.next();
        //doWhat
    }

新的客戶端接入

SocketChannel sc = serverChannel.accept();

設置位非阻塞模式

sc.configureBlocking(false);
    sc.socket().setReuseAddress(true);

將新接入的客戶端鏈接註冊到Reactor上的多路複用器

SelectionKey key = sc.register(selector,SelectionKey.OP_READ);

異步讀取客戶端消息到緩衝區

int number = sc.read(receivedBuffer);

最後讀取bytebuffer

while(buffer.hasRemain){
        writeBuffer();
    }

TimeServer示例

MultiplexerTimeServer.class

/**
 * used to test nio
 * Created by spark on 10/14/16.
 */
public class MultiplexerTimeServer implements Runnable {
    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    private volatile boolean stop;

    /**
     * 初始化多路複用器,綁定監聽端口
     * @param port
     */
    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            //設爲異步非阻塞
            serverSocketChannel.configureBlocking(false);
            //backlog設爲1024
            serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port:" + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    /**
     * 根據key的操做位獲取網絡事件的類型   TCP三次握手過程
     * @param key
     * @throws IOException
     */
    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            if (key.isAcceptable()) {

                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                sc.register(selector, SelectionKey.OP_READ);

            }

            if(key.isReadable()){
                SocketChannel sc = (SocketChannel) key.channel();
                //經過ByteBuffer讀取客戶端的請求信息   開闢1K的緩衝區
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if(readBytes > 0){
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes,"UTF-8");
                    System.out.println("The time server received order : " + body );
                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
                    doWrite(sc,currentTime);
                }else if(readBytes < 0){
                    key.cancel();
                    sc.close();
                }else{

                }
            }
        }
    }

    /**
     * 經過ByteBuffer將應答消息異步發送給客戶端
     * @param socketChannel
     * @param response
     * @throws IOException
     */
    private void doWrite(SocketChannel socketChannel,String response) throws IOException {
        if(response != null && response.trim().length() > 0){
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            socketChannel.write(writeBuffer);
        }
    }

    @Override
    public void run() {
        //遍歷selector,間隔爲1s
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                //有就緒狀態的Channel時,selector返回就緒狀態的Channel的SelectionKey集合,經過對就緒狀態的Channel集合進行迭代,進行異步讀寫操做
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (IOException e) {
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }

                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

TimeServer.class

public class TimeServer {
    public static void main(String[] args) {
        int port = 8080;
        if(args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }

        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        
        new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();

    }
}

NIO客戶端序列圖

通常流程

打開SocketChannel,綁定客戶端本地地址

SocketChannel clientChannel = SocketChannel.open();

設置SocketChannel爲非阻塞模式

clientChannel.configureBlocking(false);
    clientChannel.socket().setReuseAddress(true);
    clientChannel.socket().setReceiveBufferSize(BUFFER_SIZE);
    clientChannel.socket().setSendBufferSize(BUFFER_SIZE);

異步鏈接服務端

boolean connected = clientChannel.connect(new InetSocketAddress("ip",port));

判斷 註冊

if(connected){
        clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);
    }else{
        clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler);
    }

建立Reactor線程,建立多路複用器並啓動線程

Selector selector = Selector.open();
    new Thread(new ReactorTask()).start();

輪詢

int num = selector.select();
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> keys = selectedKeys.iterator();

    while(keys.hasNext()){
        SelectionKey key = keys.next();

    }

接受connect事件進行處理

if(key.isConnectable()){
        //handleConnect
    }

鏈接成功,註冊讀事件

if(clientChannel.finishConnect()) registerRead();

異步讀和消息讀取

int number = sc.read(receivedBuffer);
    while(buffer.hasRemain){

    }

TimeClient示例

TimeClientHandler.class

public class TimeClientHandler implements Runnable {
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public TimeClientHandler(int port, String host) {
        this.port = port;
        this.host = host == null ? "127.0.0.1" : host;

        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    private void handleInput(SelectionKey key) throws IOException {
        if(key.isValid()){
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()){
                if(sc.finishConnect()){
                    sc.register(selector,SelectionKey.OP_READ);
                    doWrite(sc);
                }else{
                    System.exit(1);
                }
            }
            if(key.isReadable()){
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if(readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("Now is " + body);
                    this.stop = true;
                }else if(readBytes < 0){
                    key.cancel();
                    sc.close();
                }else{

                }
            }
        }
    }

    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if(!writeBuffer.hasRemaining()){
            System.out.println("Send order 2 server succeed.");
        }
    }

    private void doConnect() throws IOException {
        if(socketChannel.connect(new InetSocketAddress(host,port))){
            socketChannel.register(selector,SelectionKey.OP_READ);
            doWrite(socketChannel);
        }else{
            socketChannel.register(selector,SelectionKey.OP_CONNECT);
        }
    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (IOException e) {
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }

                }
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
        if(selector != null){
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

TimeClient.class

public class TimeClient {
    public static void main(String[] args) {
        int port = 8080;
        if(args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }

        new Thread(new TimeClientHandler(port,"127.0.0.1"),"TimeClient-001").start();
    }
}

AIO

NIO2.0引入了新的異步通道概念,並提供了異步文件通道和異步套接字通道的實現。
異步通道提供兩種方式獲取操做結果網絡

  1. 經過java.util.concurrent.Feature類來表示異步操做的結果;
  2. 在執行異步操做的時候傳入一個java.nio.channels。
    CompletionHandler接口的實現類做爲操做完成的回調。

AsyncTimeServerHandler.class

public class AsyncTimeServerHandler implements Runnable {
    private int port;

    CountDownLatch latch;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    public AsyncTimeServerHandler(int port) {
        this.port = port;
        try {
            asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); // 建立一個異步服務端通道。
            asynchronousServerSocketChannel.bind(new InetSocketAddress(port));// bind 一個監聽端口
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {

        latch = new CountDownLatch(1); // 在完成一組正在執行的操做以前,容許當前的線程一直阻塞。
        doAccept();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void doAccept() {
        asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());// 處理接受消息的通知。
    }
}

AcceptCompletionHandler.class

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {
    @Override
    public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
        attachment.asynchronousServerSocketChannel.accept(attachment, this);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        result.read(buffer, buffer, new ReadCompletionHandler(result));
    }

    @Override
    public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
        exc.printStackTrace();
        attachment.latch.countDown();
    }
}

ReadCompletionHandler.class

public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel channel;

    public ReadCompletionHandler(AsynchronousSocketChannel channel) {
        if (this.channel == null)
            this.channel = channel;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        attachment.flip();
        byte[] body = new byte[attachment.remaining()];
        attachment.get(body);
        try {
            String req = new String(body, "UTF-8");
            System.out.println("The time server receive order : " + req);
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(
                    System.currentTimeMillis()).toString() : "BAD ORDER";
            doWrite(currentTime);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void doWrite(String currentTime) {
        if (currentTime != null && currentTime.trim().length() > 0) {
            byte[] bytes = (currentTime).getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer, writeBuffer,
                    new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer buffer) {
                            // 若是沒有發送完成,繼續發送
                            if (buffer.hasRemaining())
                                channel.write(buffer, buffer, this);
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            try {
                                channel.close();
                            } catch (IOException e) {
                                // ingnore on close
                            }
                        }
                    });
        }
    }

}

TimeServer.class

public class TimeServer {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用默認值
            }
        }
        AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
        new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
    }
}

AsyncTimeClientHandler.class

public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable{

    private AsynchronousSocketChannel client;
    private String host;
    private int port;
    private CountDownLatch latch;

    public AsyncTimeClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            client = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void completed(Void result, AsyncTimeClientHandler attachment) {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        client.write(writeBuffer, writeBuffer,
                new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer buffer) {
                        if (buffer.hasRemaining()) {
                            client.write(buffer, buffer, this);
                        } else {
                            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                            client.read(
                                    readBuffer,
                                    readBuffer,
                                    new CompletionHandler<Integer, ByteBuffer>() {
                                        @Override
                                        public void completed(Integer result, ByteBuffer buffer) {
                                            buffer.flip();
                                            byte[] bytes = new byte[buffer.remaining()];
                                            buffer.get(bytes);
                                            String body;
                                            try {
                                                body = new String(bytes, "UTF-8");
                                                System.out.println("Now is : " + body);
                                                latch.countDown();
                                            } catch (UnsupportedEncodingException e) {
                                                e.printStackTrace();
                                            }
                                        }

                                        @Override
                                        public void failed(Throwable exc, ByteBuffer attachment) {
                                            try {
                                                client.close();
                                                latch.countDown();
                                            } catch (IOException e) {
                                                // ingnore on close
                                            }
                                        }
                                    });
                        }
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        try {
                            client.close();
                            latch.countDown();
                        } catch (IOException e) {
                            // ingnore on close
                        }
                    }
                });
    }

    @Override
    public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
        exc.printStackTrace();
        try {
            client.close();
            latch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        latch = new CountDownLatch(1);
        client.connect(new InetSocketAddress(host, port), this, this);
        try {
            latch.await();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        try {
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

TimeClient.class

public class TimeClient {
    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用默認值
            }
        }
        new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start();

    }
}
相關文章
相關標籤/搜索