java網絡通訊:異步非阻塞I/O (NIO)

首先是channel,是一個雙向的全雙工的通道,可同時讀寫,而輸入輸出流都是單工的,要麼讀要麼寫。Channel分爲兩大類,分別是用於網絡數據的SelectableChannel和用於文件操做的FileChannel。java

注意:在java NIO庫中,全部的數據都是用緩衝區處理,經常使用的是ByteBuffer。數組

多路複用器Selector:緩存

Selector會不斷輪詢註冊在其上的Channel,若是某個Channel上又新的鏈接接入、讀和寫事件,這個Channel就處於就緒狀態,經過SelectorKey能夠獲取就緒Channel的集合。底層使用了epoll()實現,沒有最大鏈接句柄的限制。服務器

服務端代碼:網絡

public class TimeServer {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}
public class MultiplexerTimeServer implements Runnable {

    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;

    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);//設置非阻塞模式
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);//將Channel註冊到selector,監聽accept事件
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
    this.stop = true;
    }

    @Override
    public void run() {
    while (!stop) {
        try {
            selector.select(1000);//每隔一秒輪詢一次
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectedKeys.iterator();
            SelectionKey key = null;
            while (it.hasNext()) {//若是有新的客戶端接入
                key = it.next();
                it.remove();
                try {
                    handleInput(key);//處理請求
                } catch (Exception e) {
                    if (key != null) {
                        key.cancel();
                        if (key.channel() != null)
                            key.channel().close();
                    }
                }
            }
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

    // 多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,因此不須要重複釋放資源
    if (selector != null)
        try {
        selector.close();
        } catch (IOException e) {
        e.printStackTrace();
        }
    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 處理新接入的請求消息
            if (key.isAcceptable()) {
                // 獲取客戶端的鏈接通道
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                //將新鏈接註冊到selector,並監聽讀事件
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                //讀取通道數據寫入字節緩存
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);//字節緩存寫入字節數組
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : " + body);
                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
                            ? new java.util.Date(System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    doWrite(sc, currentTime);//向socketchannel寫入數據
                } else if (readBytes < 0) {
                    // 對端鏈路關閉
                    key.cancel();
                    sc.close();
                } 
            }
        }
    }

    private void doWrite(SocketChannel channel, String response)
        throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}

客戶端:socket

public class TimeClient {
    
    public static void main(String[] args) {
        int port = 8080;
        new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();
    }
}
public class TimeClientHandle implements Runnable {

    private String host;
    private int port;

    private Selector selector;
    private SocketChannel socketChannel;

    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
        // 多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,因此不須要重複釋放資源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

    }
    private void doConnect() throws IOException {
        // 若是直接鏈接成功,則註冊到多路複用器上,發送請求消息,讀應答
        if (!socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else//沒有直接鏈接成功,不表明失敗,而是說明服務器尚未返回TCP握手的應答消息,因此註冊OP_CONNECT事件,監聽消息。
            socketChannel.register(selector, SelectionKey.OP_CONNECT);    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 判斷是否鏈接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {//判斷是否有鏈接事件,是的話,說明未鏈接
                if (sc.finishConnect()) {//再鏈接
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                } else
                    System.exit(1);// 鏈接失敗,進程退出
            }
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("Now is : " + body);
                    this.stop = true;
                } else if (readBytes < 0) {
                    // 對端鏈路關閉
                    key.cancel();
                    sc.close();
                }
            }
        }

    }
    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining())
            System.out.println("Send order 2 server succeed.");
    }
}
相關文章
相關標籤/搜索