NIO初識

  Java編程中的NIO,俗稱new I/O,是在JDK1.4版本以後開始引入的,在JDK1.4以前,Java服務端大多使用同步阻塞式來處理網絡請求,在低流量、低併發狀況還能抗住,在現在互聯網時代,信息量很明顯差遠了,在沒有NIO以前,服務器端通訊模塊基本被C/C++佔據着,它們能夠利用操做系統的原生API來處理非阻塞事件,隨着java的nio類庫發佈,通過不斷髮展完善,性能也逐漸與c++媲美了,加上JAVA不少優秀的開源類庫,使用更普遍了,如今,來了解一下nio的原理,作一個感官上的認識。java

  使用NIO,必須記住以下3個核心概念,編程實現就是圍繞他們的關係的:c++

  1. 緩衝區Buffer:          在nio編程中,讀寫都在基於緩衝區的,區別於以前的基於流的,根據用途,可使用字節緩衝區、字符緩衝區等編程

  2. 通道Channel:        在Buffer裏的數據經過Channel與網絡交互,是全雙工的,而流數單工操做的數組

  3. 多路複用器Selector: 管理Channel,最基本的就是讀寫Channel,一個線程使用Selector來輪詢讀寫Channel,通道上有事件發生時,就會進行處理,相似一個函數指針集合,在BLE開發的底層OS上也是這樣處理的,增長一個模塊,只要寫好模塊函數,而後把函數指針放到功能數組就能夠了,後面就輪詢這個註冊了的函數,有置位就調用指針進行操做。這種模式能夠實現單線程就能支持上千萬併發鏈接。服務器

  下面新建一個工程來測試一下:網絡

  1. 新建一個TestNIO工程,目錄結構設爲以下:併發

    

  2. 實現服務器端,代碼以下:框架

    

package cn.linjk.testnio.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by LinJK on 19/11/2016.
 */
public class NioServer {
    private static final int serverPort = 8889;

    public static void main(String[] argc) {
        //啓動一個線程來處理Selector
        HelloServer helloServer = new HelloServer(serverPort);
        if (!helloServer.getInitResult()) {
            System.out.println("Init Error");
            System.exit(-1);
        }
        System.out.println("Hello Server listening on localhost:" + serverPort);

        new Thread(helloServer).start();
    }

}

class HelloServer implements Runnable {
    private Selector            selector;
    private ServerSocketChannel serverSocketChannel;
    private volatile boolean    stop;
    private ByteBuffer byteBufferWrite;
    private boolean             contrustorFlag;

    public HelloServer(int port) {
        try {
            selector            = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();

            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            contrustorFlag = true;
        }
        catch (Exception e) {
            contrustorFlag = false;
            e.printStackTrace();
        }
    }

    public boolean getInitResult() {
        return contrustorFlag;
    }

    public void stop() {
        stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                selector.select(1000); //1秒輪詢週期,能夠按需修改

                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey selectionKey = null;

                while (it.hasNext()) {
                    selectionKey = it.next();
                    it.remove();

                    try {
                        //handle event
                        handleIncomeEvent(selectionKey);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        if (selectionKey != null) {
                            selectionKey.cancel();
                            if (selectionKey.channel() != null) {
                                selectionKey.channel().close();
                            }
                        }
                    }
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        //User set stop listening, clear something
        if (selector != null) {
            try {
                selector.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleIncomeEvent(SelectionKey key) {
        if (key.isValid()) {
            //鏈接事件
            if (key.isAcceptable()) {
                try {
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    //監聽到了鏈接事件,原有基礎上註冊監聽讀取用戶端數據事件
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }

            //讀到客戶端數據事件
            if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel)key.channel();
                ByteBuffer byteBufferRead = ByteBuffer.allocate(1024);
                try {
                    int readCnt = socketChannel.read(byteBufferRead);

                    if (readCnt > 0) {
                        byteBufferRead.flip();//刷新緩衝區,而後從緩衝區讀取數據

                        byte[] bytes = new byte[byteBufferRead.remaining()];
                        byteBufferRead.get(bytes);

                        String request = new String(bytes, "UTF-8");
                        System.out.println("Server receive: " + request);

                        //say hello to client
                        byteBufferWrite = ByteBuffer.allocate(20);
                        byteBufferWrite.put("[<<-]Hello".getBytes());
                        byteBufferWrite.flip();//刷新數據到緩衝區
                        socketChannel.write(byteBufferWrite);
                        //避免緩衝區已滿,形成寫數據不全現象,註冊寫事件,輪詢是否全部數據已寫完
                        socketChannel.register(selector, SelectionKey.OP_WRITE);
                    }
                    else if (readCnt < 0) {
                        key.cancel();
                        socketChannel.close();
                    }
                    else {
                        //
                    }
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }

            if (key.isWritable()) {
                SocketChannel socketChannel = (SocketChannel)key.channel();

                while (byteBufferWrite.hasRemaining()){
                    //.....
                }
            }
        }
        else {
            System.out.println("Input key unvalid");
        }
    }
}

 

  3. 實現客戶端,測試功能,有些異常沒有寫全,也沒實現重連服務器機制,只把框架寫了,代碼以下:socket

    

package cn.linjk.testnio.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by LinJK on 19/11/2016.
 */
public class NioClient {
    private static Selector selector;
    private static SocketChannel socketChannel;
    private static volatile boolean stop;

    public static void main(String[] argc) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    selector = Selector.open();
                    socketChannel = SocketChannel.open();
                    socketChannel.configureBlocking(false);
                    //connect to server
                    if (socketChannel.connect(new InetSocketAddress("127.0.0.1", 8889))) {
                        //註冊監聽服務器返回事件
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        //send request to server
                        ByteBuffer byteBufferWrite = ByteBuffer.allocate(100);
                        byteBufferWrite.put("I am Jim".getBytes());
                        byteBufferWrite.flip();
                        socketChannel.write(byteBufferWrite);
                        if (!byteBufferWrite.hasRemaining()) {
                            System.out.println("Send Finish.");
                        }
                    }
                    else {
                        socketChannel.register(selector, SelectionKey.OP_CONNECT);
                    }

                    while (!stop) {
                        selector.select(1000);

                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> it = selectionKeys.iterator();

                        SelectionKey selectionKey = null;
                        while (it.hasNext()) {
                            selectionKey = it.next();
                            it.remove();

                            if (selectionKey.isValid()) {
                                SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
                                if (selectionKey.isConnectable()) {
                                    if (socketChannel.finishConnect()) {
                                        socketChannel.register(selector, SelectionKey.OP_READ);
                                        //send data
                                        ByteBuffer byteBufferWrite = ByteBuffer.allocate(100);
                                        byteBufferWrite.put("I am Jim".getBytes());
                                        byteBufferWrite.flip();
                                        socketChannel.write(byteBufferWrite);
                                        if (!byteBufferWrite.hasRemaining()) {
                                            System.out.println("Send Finish.");
                                        }
                                    }
                                }
                                //收到服務器返回數據事件
                                if (selectionKey.isReadable()) {
                                    ByteBuffer byteBufferRead = ByteBuffer.allocate(100);

                                    int readCnt = socketChannel.read(byteBufferRead);
                                    if (readCnt > 0) {
                                        byteBufferRead.flip();

                                        byte[] bytes = new byte[byteBufferRead.remaining()];
                                        byteBufferRead.get(bytes);

                                        System.out.println("Receive from server: " + new String(bytes, "UTF-8"));
                                        stop = true;
                                    }
                                    else if (readCnt < 0){
                                        selectionKey.channel();
                                        socketChannel.close();
                                    }
                                }
                            }
                        }
                    }

                    if (selector != null) {
                        selector.close();
                    }
                }
                catch (IOException e) {
                    //資源清理....
                    System.exit(1);
                }
            }
        }).start();
    }

}

   4. 代碼分析:ide

    對比服務端和客戶端的代碼邏輯,有以下兩點類似:

    a. 程序啓動後建立一個線程來管理Selctor

    b. 都配置爲非阻塞操做,而後註冊SelctionKey到SocketChanell,而後在線程的run()函數裏輪詢哪一個事件發生了再進行操做

    流程都類似,稍微有點不同,看代碼並運行一下就明白了。

  5. 運行結果:

    先運行Server端,而後運行Client端,兩者輸出分別以下:

    Server:

    

    Client:

    

   6. 總結:

    NIO和IO直接最大區別就是,NIO是面向緩衝區的,IO是面向流的,面向緩衝區數據處理比較靈活,數據處理速度與吞吐量更大,同時保證數據完整性比較重要,前面提到緩衝區滿時,須要檢測"半包"也是這個意思,使用NIO的非阻塞避免了因網絡狀況阻塞形成的高併發環境下時延問題,在高併發通信狀況下,可使用它來處理通訊仍是很好的。

相關文章
相關標籤/搜索