JavaNIO基礎

NIO主要包含兩部分,Selector和Channel、Buffer。java

爲何須要須要NIO

基本的Java套接字對於小規模系統能夠很好地運行,但當涉及同時處理上千個客戶端的服務器時,可能就會產生一些問題。因爲建立、維護、切換線程須要的系統開銷,一客戶一線程的方式在系統擴展性方面受到了限制。使用線程池能夠節省那種系統開銷,同時容許實現者利用並行硬件的優點。程序員

可是對於鏈接生存期比較長的協議來講,線程池的大小仍然限制了系統同時能夠處理的客戶端數量。數組

另外對於服務器須要由不一樣客戶端同時訪問和修改的信息時,對於多線程就得進行同步,這會變得更加複雜,即便用同步機制將增長更多的系統調度和上下文切換開銷,而程序員對這些開銷又沒法控制。服務器

因爲多線程的同步的複雜性,一些程序員寧願繼續使用單線程方法,這類服務器只用一個線程來處理全部客戶端——不是順序處理,而是一次所有處理。這種服務器不能爲任何客戶端提供I/O操做的阻塞等待,而必須排他地使用非阻塞方式(nonblocking)I/O。網絡

前面的while true,不斷地輪詢(poll)accept方法,這種忙等(busy waiting)方法會引入系統開銷,由於程序須要反覆循環地鏈接I/O源,卻又發現什麼都不用作。多線程

咱們須要一種方法來一次輪詢一組客戶端,以查找那個客戶端須要服務,這正是NIO要介紹的Selector和Channel的抽象關鍵點。socket

一個Channel實例表明了一個可輪詢(pollable)的I/O目標,如套接字(或一個文件、設備等)。Channel可以註冊一個Selector類的實例。this

Selector的select方法容許你詢問在一組信道中,哪個當前須要服務(被接受、讀、寫)。操作系統

Stream的抽象,好處是隱藏了底層緩衝區的有限性,提供了一個可以容納任意長度數據的容器的假象。要實現這樣一個假象,要麼會產生大量的內存開銷,要麼會引入大量的上下文切換,很差控制。線程

使用Buffer抽象的緣由是:Buffer抽象表明了一個有限容量(finite-capacity)的數據容器——其本質是一個數組,由指針指示了在哪存放數據和從哪讀取數據。使用Buffer的好處是:
1)與讀寫緩衝區數據相關聯的系統開銷都暴露給了程序員。例如,若是想要向緩衝區存入數據,可是又沒有足夠的空間時,就必須採起一些措施來得到空間(即移出一些數據,或移開已經在那個位置的數據來得到空間,或者建立一個新的新的實例)。這意味着須要額外的工做,可是你能夠控制它何時發生,如何發生,以及是否發生。
2)一些對Java對象的特殊Buffer映射操做可以直接操做底層平臺的資源(例如操做系統的緩衝區),這些操做節省了在不一樣地址空間中複製數據的開銷。

綜上,Channel實例表明了一個與設備的鏈接,經過它能夠進行輸入輸出操做。信道(channel)和套接字(socket)的不一樣之處在於:channel一般須要調用靜態工廠方法來獲取實例。channel使用的不是流,而是使用緩衝區來發送或讀取數據。

Buffer有固定的、有限的容量,並由內部狀態記錄了有多少數據放入或取出,就像是一個有限容量的隊列同樣。

Selector

NIO的強大功能部分來自於channel的非阻塞特性。accept可能由於等待一個客戶端鏈接而阻塞,read可能由於沒有數據可讀而阻塞,直到鏈接的另外一端傳來新數據。

總的來講,建立/接收鏈接或讀寫數據等I/O調用,均可能無限期地阻塞等待,直到底層的網絡實現發生了什麼。慢速的、有損耗的網絡,或僅僅是簡單的網絡故障均可能致使任意時間的延遲。

而NIO則當即返回:

public class TCPEchoClientNonblocking {
    public static void main(String args[]) throws Exception {
        if ((args.length < 2) || (args.length > 3)) // Test for correct # of args
            throw new IllegalArgumentException("Parameter(s): <Server> <Word> [<Port>]");
        String server = args[0]; // Server name or IP address
        // Convert input String to bytes using the default charset
        byte[] argument = args[1].getBytes();
        int servPort = (args.length == 3) ? Integer.parseInt(args[2]) : 7;
        // Create channel and set to nonblocking
        SocketChannel clntChan = SocketChannel.open();
        clntChan.configureBlocking(false);
        // Initiate connection to server and repeatedly poll until complete
        if (!clntChan.connect(new InetSocketAddress(server, servPort))) {
            while (!clntChan.finishConnect()) {
                System.out.print(".");  // Do something else
            }
        }
        ByteBuffer writeBuf = ByteBuffer.wrap(argument);
        ByteBuffer readBuf = ByteBuffer.allocate(argument.length);
        int totalBytesRcvd = 0; // Total bytes received so far
        int bytesRcvd; // Bytes received in last read
        while (totalBytesRcvd < argument.length) {
            if (writeBuf.hasRemaining()) {
                clntChan.write(writeBuf);
            }
            if ((bytesRcvd = clntChan.read(readBuf)) == -1) {
                throw new SocketException("Connection closed prematurely");
            }
            totalBytesRcvd += bytesRcvd;
            System.out.print(".");   // Do something else
        }
        System.out.println("Received: " +  // convert to String per default charset
                new String(readBuf.array(), 0, totalBytesRcvd));
        clntChan.close();
    }
}

上面的輪詢僅僅是演示用。

須要使用Selector類來避免忙等的輪詢。考慮一個即時的消息服務器,可能有上千個客戶端同時鏈接到了服務器,但任什麼時候刻都只有很是少許的消息須要讀取和分發。這就須要一種方法阻塞等待,直到至少有一個信道能夠進行I/O操做,並指出是哪一個信道。NIO的選擇器就實現了這樣的功能。一個Selector實例能夠同時檢查一組信道的I/O狀態。用專業術語來講,選擇器就是一個多路開關選擇器,由於一個選擇器可以管理多個信道上的I/O操做。

要使用選擇器,須要建立一個Selector實例並將其註冊到想要監控的信道上(注意,這要經過channel的方法實現,而不是使用selector的方法)。最後,調用選擇器的select方法,該方法會阻塞等待,直到還有一個或更多的信道準備好了I/O操做或等待超時。select方法返回可進行I/O操做的信道數量。

public class TCPServerSelector {
    private static final int BUFSIZE = 256;  // Buffer size (bytes)
    private static final int TIMEOUT = 3000; // Wait timeout (milliseconds)
    public static void main(String[] args) throws IOException {
        if (args.length < 1) { // Test for correct # of args
            throw new IllegalArgumentException("Parameter(s): <Port> ...");
        }
        // Create a selector to multiplex listening sockets and connections
        Selector selector = Selector.open();
        // Create listening socket channel for each port and register selector
        for (String arg : args) {
            ServerSocketChannel listnChannel = ServerSocketChannel.open();
            listnChannel.socket().bind(new InetSocketAddress(Integer.parseInt(arg)));
            listnChannel.configureBlocking(false); // must be nonblocking to register
            // Register selector with channel. The returned key is ignored
            listnChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
        // Create a handler that will implement the protocol
        TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE);
        while (true) { // Run forever, processing available I/O operations
            // Wait for some channel to be ready (or timeout)
            if (selector.select(TIMEOUT) == 0) { // returns # of ready chans
                System.out.print(".");
                continue;
            }
            // Get iterator on set of keys with I/O to process
            Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
            while (keyIter.hasNext()) {
                SelectionKey key = keyIter.next(); // Key is bit mask
                // Server socket channel has pending connection requests?
                if (key.isAcceptable()) {
                    protocol.handleAccept(key);
                }
                // Client socket channel has pending data?
                if (key.isReadable()) {
                    protocol.handleRead(key);
                }
                // Client socket channel is available for writing and
                // key is valid (i.e., channel not closed)?
                if (key.isValid() && key.isWritable()) {
                    protocol.handleWrite(key);
                }
                keyIter.remove(); // remove from set of selected keys
            }
        }
    }
}

因爲select方法只是向selector所關聯的鍵集合中添加元素,所以,若是不移除每一個處理過的鍵,它就會在下次調用select方法時仍然保留在集合中,並且可能會有無用的操做來調用它。

具體的處理方法

public class EchoSelectorProtocol implements TCPProtocol {
    private int bufSize; // Size of I/O buffer
    public EchoSelectorProtocol(int bufSize) {
        this.bufSize = bufSize;
    }
    public void handleAccept(SelectionKey key) throws IOException {
        SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
        clntChan.configureBlocking(false); // Must be nonblocking to register
        // Register the selector with new channel for read and attach byte buffer
        clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer
                .allocate(bufSize));
    }
    public void handleRead(SelectionKey key) throws IOException {
        // Client socket channel has pending data
        SocketChannel clntChan = (SocketChannel) key.channel();
        ByteBuffer buf = (ByteBuffer) key.attachment();
        long bytesRead = clntChan.read(buf);
        if (bytesRead == -1) { // Did the other end close?
            clntChan.close();
        } else if (bytesRead > 0) {
            // Indicate via key that reading/writing are both of interest now.
            key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }
    }
    public void handleWrite(SelectionKey key) throws IOException {
    /*
     * Channel is available for writing, and key is valid (i.e., client channel
     * not closed).
     */
        // Retrieve data read earlier
        ByteBuffer buf = (ByteBuffer) key.attachment();
        buf.flip(); // Prepare buffer for writing
        SocketChannel clntChan = (SocketChannel) key.channel();
        clntChan.write(buf);
        if (!buf.hasRemaining()) { // Buffer completely written?
            // Nothing left, so no longer interested in writes
            key.interestOps(SelectionKey.OP_READ);
        }
        buf.compact(); // Make room for more data to be read in
    }
}
相關文章
相關標籤/搜索