Java網絡編程學習A輪_06_NIO入門

參考資料:
老外寫的教程,很適合入門:http://tutorials.jenkov.com/java-nio/index.html
上面教程的譯文:http://ifeve.com/overview/html

示例代碼:
https://github.com/gordonklg/study,socket modulejava

A. 摘要

由於有現成的教程,本文只作摘要。git

NIO 有三寶,channel、buffer、selectorgithub

Channel 與 Stream 很類似,除了:編程

  • Channel 同時支持讀操做與寫操做,而 Stream 是單向的
  • Channel 支持異步讀寫
  • Channel 讀寫操做與 Buffer 綁定,只能把數據從 Channel 讀取出來放到 Buffer 中,或是把 Buffer 中的數據寫到 Channel 中

Buffer 本質上是一個內存塊,Buffer 包裝了這個內存塊,提供一系列方法簡化在該內存塊上的數據讀寫操做。服務器

Buffer 有三個屬性:網絡

  • capacity:容量
  • position:當前操做位置
  • limit:容許到達的界限

其中 capacity 只能在建立時指定,沒法修改。其它兩個屬性都有對應的讀取與設值方法。dom

Buffer 及 Channel 主要方法的手繪示意圖以下:
一張圖片異步

Selector 設計目的是使單線程能夠處理多個網絡鏈接(多個 Channel)。對於存在大量鏈接可是每一個鏈接佔用帶寬都很少的應用,例如聊天工具、滴滴收集車輛位置信息、物聯網收集設備信息等,傳統 Socket 編程須要爲每個鏈接分配一個處理線程,佔用大量系統資源。咱們須要一種方案,可讓一個線程負責多個鏈接。
一張圖片socket

Selector 容許 Channel 註冊到本身身上,SelectionKey 表示 channel 與 selector 的註冊關係。

Channel 能產生4種事件,分別是:

  • SelectionKey.OP_CONNECT // channel 已成功鏈接到服務器
  • SelectionKey.OP_ACCEPT // server channel 已成功接受一個鏈接
  • SelectionKey.OP_READ // channel 中有可讀數據
  • SelectionKey.OP_WRITE // channel 能夠發送數據

能夠設置 Selector 關注 Channel 的哪些事件。Selector 的 select() 方法會阻塞,直到註冊的 Channel 產生了指定類型的事件(實際意義就是 Channel 已經準備好作某事了)。接着就能夠經過 Selector 獲取全部已經準備好的 SelectionKey(即Channel),依次處理相應事件,例如創建鏈接、獲取數據、業務處理、發送數據等。

顯然,同一個 selector 的全部 channel 對數據的讀寫以及業務邏輯的實現,在默認狀況下,都是在同一個線程中的。須要注意業務邏輯是否會過分佔用當前線程資源,致使整個 Selector 效率低下。能夠引入工做線程池解決以上問題。

SelectionKey 對象包含如下屬性:

  • The interest set,Selector 感興趣的 Channel 事件類型
  • The ready set,Channel 已經準備好的事件。顯然,被 Selector.select() 方法選中的 SelectionKey,其 ready set 應該與 interest set 有交集
  • The Channel,經過 SelectionKey 能夠獲取 Channel 對象
  • The Selector,經過 SelectionKey 能夠獲取 Selector 對象
  • An attached object (optional)

Selector 用法示意:

Selector selector = Selector.open(); // 獲取一個 Selector 實例
    channel.configureBlocking(false);  // 只有非阻塞模式的 channel 才能使用 Selector
    SelectionKey key = channel.register(selector, SelectionKey.OP_READ); // 將 channel 註冊到 Selector 上,同時指定 Selector 只關注 channel 的 READ 事件
    while(true) {
        int readyChannels = selector.select(); // Selector 的 select 方法會阻塞,直到有已經準備好的(有數據可讀的) channel,或是 Selector 被 wakeup,或是線程被中斷
        if(readyChannels == 0) continue;
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
        while(keyIterator.hasNext()) {
            SelectionKey key = keyIterator.next();
            if(key.isAcceptable()) {
                    // a connection was accepted by a ServerSocketChannel.
            } else if (key.isConnectable()) {
                    // a connection was established with a remote server.
            } else if (key.isReadable()) {
                    // a channel is ready for reading
            } else if (key.isWritable()) {
                    // a channel is ready for writing
            }
            keyIterator.remove();
        }
    }

B. 示例代碼

gordon.study.socket.nio.basic.SimpleFileChannel.java

public class SimpleFileChannel {

    public static void main(String[] args) throws Exception {
        String path = SimpleFileChannel.class.getResource("/file1").getPath();
        RandomAccessFile aFile = new RandomAccessFile(path, "rw");
        FileChannel inChannel = aFile.getChannel();
        ByteBuffer buf = ByteBuffer.allocate(48);
        int bytesRead = inChannel.read(buf);
        while (bytesRead != -1) {
            System.out.print("(Read " + bytesRead + ")");
            buf.flip();
            while (buf.hasRemaining()) {
                System.out.print((char) buf.get());
            }
            buf.clear();
            bytesRead = inChannel.read(buf);
            System.out.println();
        }
        aFile.close();
    }
}

以上示例代碼演示了最基本的 Channel 與 Buffer API。

gordon.study.socket.nio.basic.SimpleSelector.java

public class SimpleSelector {

    public static void main(String[] args) throws Exception {
        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Selector selector = Selector.open();
                    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                    serverSocketChannel.bind(new InetSocketAddress(8888));
                    serverSocketChannel.configureBlocking(false);
                    System.out.println("##valid ops for server socket channel: " + serverSocketChannel.validOps());
                    SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                    System.out.println("##Selection key ready ops before Selector.select(): " + sk.readyOps());

                    while (true) {
                        int readyChannels = selector.select();
                        System.out.println("readyChannels by Selector.select(): " + readyChannels);
                        if (readyChannels == 0) {
                            continue;
                        }
                        Set<SelectionKey> selectedKeys = selector.selectedKeys();
                        System.out.println("selected keys by Selector.select(): " + selectedKeys.size());
                        Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();
                            System.out.println("##Selection key ready ops after Selector.select(): " + key.readyOps());
                            SocketChannel channel = serverSocketChannel.accept();
                            if (channel != null) {
                                // create a new thread to handle this client
                            }
                            keyIterator.remove();
                        }
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        for (int i = 0; i < 3; i++) {
            Thread.sleep(400);
            new Thread(new Client()).start();
        }
    }

    private static class Client implements Runnable {

        @Override
        public void run() {
            try (Socket socket = new Socket()) {
                socket.connect(new InetSocketAddress(8888));
                System.out.println("    Connected to server!");
                while (true) {
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

以上示例代碼使用 Selector 處理服務端鏈接創建過程。

代碼第14行將 ServerSocketChannel 註冊到 Selector 上,同時代表關注 ServerSocketChannel 的 Accept 事件(ServerSocketChannel 只支持這一種事件),顯然,這時候 ServerSocketChannel 還沒有準備好 Accept 事件,因此第15行代碼打印出的 ready ops 爲 0。

片刻後(400ms),第一個客戶端成功鏈接到服務端,此時 ServerSocketChannel 產生 Accept 事件,Selector.select() 方法返回,因爲 Selector 只註冊了一個 Channel,返回值顯然是1。而後遍歷被選中的 SelectionKey 列表,建立 SocketChannel 處理本次鏈接。

代碼第35行經過 sleep 的方法模擬複雜環境下建立 SocketChannel 耗時較長的狀況。這產生了一個有趣的現象:客戶端很早就完成了鏈接(socket.isConnected() == true),可是服務端要等待 sleep 時間耗盡後才能創建一個 SocketChannel,也就是說,雖然服務端尚未經過 ServerSocketChannel.accept() 方法建立出一個 SocketChannel,可是實際上 TCP 鏈接已經創建完成??(不甚理解)

大概推測,ServerSocketChannel 內部有地方保存已創建好的 TCP 鏈接(操做系統層面的已創建),accept() 方法被調用時,會將一個底層 TCP 鏈接包裝爲 SocketChannel。推斷的理由一是客戶端 socket 狀態是已鏈接(也就是三次握手已經完成),另外一點是,若是註釋掉代碼第29行的 accept() 方法調用,會發現 Selector.select() 方法在第一個客戶端鏈接過來後,幾乎就不會被阻塞了(注掉第35行的 sleep 更加明顯),也就是說,ServerSocketChannel 的 Accept 事件是按照有沒有待處理的客戶端鏈接來肯定的。

代碼執行輸出以下:

##valid ops for server socket channel: 16
##Selection key ready ops before Selector.select(): 0
    Connected to server!
readyChannels by Selector.select(): 1
selected keys by Selector.select(): 1
##Selection key ready ops after Selector.select(): 16
    Connected to server!
    Connected to server!
readyChannels by Selector.select(): 1
selected keys by Selector.select(): 1
##Selection key ready ops after Selector.select(): 16
readyChannels by Selector.select(): 1
selected keys by Selector.select(): 1
##Selection key ready ops after Selector.select(): 16


觀察輸出,顯然,每一個 ServerSocketChannel 在一次 Selector.select 大輪詢中,只創建了一個 Socket 鏈接,哪怕實際上當時有多個鏈接能夠創建。若是咱們把創建鏈接的 ServerSocketChannel 與處理數據讀寫的 SocketChannel 註冊到同一個 Selector 上,可能致使鏈接請求來不及處理。
若是將代碼第29行優化爲如下邏輯:

SocketChannel channel = serverSocketChannel.accept();
                            while (channel != null) {
                                channel = serverSocketChannel.accept();
                            }

這樣改後,若是短期有大量鏈接,會致使業務處理收到衝擊,可能長時間得不到響應(線程資源都花在創建鏈接上了)。因此,更合理的方法是將負責創建鏈接的 ServerSocketChannel 與處理數據讀寫的 SocketChannel 註冊到不一樣的 Selector 上。

最後一個細節是代碼第33行的移除 selection key。Java NIO 的 Selector 會將已準備好而且用戶關注的 SelectionKey 加入 selectedKeys 集合,可是不會主動刪除。所以,當咱們肯定本次事件已經處理完畢時,要主動移除掉該 selection key,不然下次獲取 selectedKeys 集合時,該 selection key 仍是在集合中。(此段還沒有徹底確認)

相關文章
相關標籤/搜索