我所理解的NIO

在Java的NIO中,有三個比較重要的概念:Buffer、Channel和Selector。java

結合上一篇文章提到的送花的例子。Buffer對應花,Channel對應A和B與花之間的聯繫,Selector就是不斷進行輪詢的線程。服務器

Channel分爲ServerSocketChannel和SocketChannel,是客戶端與服務端進行通訊的通道。socket

ServerSocketChannel用戶服務器端,職責就是監聽客戶端的鏈接請求。一旦經過容許,就會創建與該客戶端對應的SocketChannel。一個服務端的一個端口只能創建一個ServerSocketChannel用來監聽鏈接。ide

SocketChannel具備惟一性。一個客戶端可能連接多個服務端,那就是多個SocketChannel。服務端與多個客戶端創建的鏈接就有多個SocketChannel。學習

Selector是用來負責阻塞輪詢的線程,能夠經過其靜態方法Seletor.open()建立。服務端建立後經過Channel的register方法註冊到ServerSocketChannel上,等待客戶端鏈接。客戶端一樣建立Seletor後經過Channel的register方法註冊到SocketChannel上。ui

當客戶端的SocketChannel指定服務端的port和ip進行connect請求以後,服務端的Selector就能夠檢測到客戶端的connect請求。而後服務端accept表示繼續監聽下一個請求,同時能夠繼續在與客戶端創建了SocketChannel上監聽讀寫請求。客戶端同理。this

Selector的做用就是監聽SelectionKey.OP_ACCEPT(服務端專屬)、SelectionKey.OP_CONNECT(客戶端專屬)、SelectionKey.OP_READ、SelectionKey.OP_Write四種註冊的請求,一旦有請求被容許,就會調用相關的方法進行處理。.net

Buffer是用於在Channel中傳遞的數據。Buffer裏有4個屬性,來表示數據在Buffer中的存取狀況:線程

  • capacity:容量。Buffer的最大存儲量,建立時指定,使用過程當中不會改變
  • limit:上線。Buffer中已有數據的最大值,<=capacity
  • position:索引位置。position從0開始,隨着get和put方法自動更新,用來記錄實時數據的位置
  • mark:用來暫存position的值。mark後能夠經過reset方法將mark的值恢復到position

這4個屬性的大小關係是:mark<=position<=limit<=capacitycode

接下來經過一個客戶端與服務端通訊的例子,來學習使用NIO。客戶端每隔1秒向服務端發送請求,服務端響應並返回數據。

服務端:

package cn.testNio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/** 
 * @Description : TODO
 * @Author : houshuiqiang@163.com, 2017年10月2日 下午2:58:31
 * @Modified :houshuiqiang@163.com, 2017年10月2日
 */
public class NioDemoServer {

    public static void main(String[] args) {
        NioServer nioServer = new NioServer(8181);
        new Thread(nioServer, "nio-server-test").start();
    }
}

class NioServer implements Runnable {
    
    private Selector selector;
    
    private ServerSocketChannel serverSocketChannel;
    
    private volatile boolean stop;
    
    public NioServer(int port){
        stop = false;
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
    
    public void stop(){
        this.stop = true;
    }
    
    @Override
    public void run(){
        while (!stop){
            try {
                selector.select(); // 阻塞等待
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                while(iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    try{
                        handleKey(selectionKey); // 可能發生客戶端失聯的錯誤
                    }catch(IOException e){
                        e.printStackTrace();
                        if (selectionKey != null) { // 將發生異常的客戶端關閉,不然會一直被selector輪詢到
                            selectionKey.cancel();
                            if (selectionKey.channel() != null) {
                                selectionKey.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    private void handleKey(SelectionKey selectionKey) throws IOException {
        if (selectionKey.isValid()) {
            if (selectionKey.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel)selectionKey.channel();
                SocketChannel socketChannel = ssc.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
            if (selectionKey.isReadable()) {
                SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
                String body = getBodyFromSocketChannel(socketChannel);
                if (null == body) {
                    // 斷開鏈路
                    selectionKey.cancel();
                    selectionKey.channel().close();
                }else if ("".equals(body)) {
                    // 心跳檢測 ,忽略
                }else{
                    String resultBody = handleBody(socketChannel, body);
                    write2Client(socketChannel, resultBody);
                }
            }
        }
        
    }


    private String getBodyFromSocketChannel(SocketChannel socketChannel) throws IOException{
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int byteBufferSize = socketChannel.read(byteBuffer);
        if (byteBufferSize == 0) { // 心跳檢測,忽略
            return "";
        }else if (byteBufferSize > 0) {
            byteBuffer.flip();
            byte[] array = new byte[byteBuffer.remaining()];
            byteBuffer.get(array);
            return new String(array);
        }else{
            return null;
        }
    }
    
    private String handleBody(SocketChannel socketChannel, String body) {
        String hostAddress = socketChannel.socket().getInetAddress().getHostAddress();
        
        System.out.println("message from client : " + hostAddress + ", content: " + body); // 模擬請求處理
        
        return "server received message: " + body; // 模擬返回處理結果
    }
    
    private void write2Client(SocketChannel socketChannel, String resultBody) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 真實場景每每比1024要大
        byteBuffer.put(resultBody.getBytes());
        byteBuffer.flip();
        socketChannel.register(selector, SelectionKey.OP_READ);
        socketChannel.write(byteBuffer);
    }
    
}

客戶端:

package cn.testNio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/** 
 * @Description : TODO
 * @Author : houshuiqiang@163.com, 2017年10月2日 下午5:58:49
 * @Modified :houshuiqiang@163.com, 2017年10月2日
 */
public class NioDemoClient {

    public static void main(String[] args) throws InterruptedException {
        NioClient nioClient = new NioClient("192.168.10.47", 8181);
        new Thread(nioClient, "nio-client-test").start();
        
        for (int i = 0; i < 10; i++) {
            nioClient.getQueue().offer("time" + i);
            Thread.sleep(1000);
        }
        nioClient.stop();
    }
}

class NioClient implements Runnable {
    private Selector selector;
    
    private SocketChannel socketChannel;
    
    private String address;
    
    private int port;
    
    private volatile boolean stop;
    
    private LinkedBlockingQueue<String> queue;
    
    public NioClient(String address, int port){
        this.address = address;
        this.port = port;
        this.stop = false;
        queue = new LinkedBlockingQueue<String>();
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
    
    public BlockingQueue<String> getQueue(){
        return queue;
    }
    
    public void stop(){
        this.stop = true;
    }
    
    @Override
    public void run(){
        
        doConnect();
        
        while (!stop) {
            try {
                selector.select(); // 阻塞等待
                
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                
                while(iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    try{
                        handleKey(selectionKey); 
                    }catch(IOException e){
                        e.printStackTrace();
                        if (selectionKey != null) {
                            selectionKey.cancel();
                            if (selectionKey.channel() != null) {
                                selectionKey.channel().close();
                            }
                        }
                    }catch(InterruptedException e){
                        e.printStackTrace();
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        try {
            socketChannel.close(); // 優雅關閉連接
            selector.close(); // 直接selector.close()會關閉全部該seletor上的全部channel,可是服務器會接收到客戶端強制關閉的錯誤信息。
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void doConnect() {
        try {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            socketChannel.connect(new InetSocketAddress(address, port));
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
    
    private void handleKey(SelectionKey selectionKey) throws IOException, InterruptedException {
        if (selectionKey.isValid()) {
            SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
            if (selectionKey.isConnectable()) {
                if (socketChannel.finishConnect()) {
                    socketChannel.register(selector, SelectionKey.OP_WRITE);
                }
            }
            if (selectionKey.isReadable()) {
                String resultBody = getBodyFromSocketChannel(socketChannel);
                if (null == resultBody) {
                    // 斷開鏈路
                    selectionKey.cancel();
                    selectionKey.channel().close();
                }else if ("".equals(resultBody)) {
                    // 心跳檢測 ,忽略
                }else{
                    System.out.println("received result : " + resultBody);
                    socketChannel.register(selector, SelectionKey.OP_WRITE);
                }
            }
            if (selectionKey.isWritable()) {
                sendRequest(socketChannel);
            }
        }
    }

    private void sendRequest(SocketChannel socketChannel) throws IOException, InterruptedException {

        String requestBody = queue.poll(100, TimeUnit.MILLISECONDS);
        if (null != requestBody) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            byteBuffer.put(requestBody.getBytes());
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            socketChannel.register(selector, SelectionKey.OP_READ);
            
            if (! byteBuffer.hasRemaining()) {
                System.out.println("send request to server : " + requestBody);
            }
        }else {
            socketChannel.register(selector, SelectionKey.OP_WRITE);
        }
    }
    
    private String getBodyFromSocketChannel(SocketChannel socketChannel) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int byteBufferSize = socketChannel.read(byteBuffer);
        if (byteBufferSize == 0) { // 心跳檢測,忽略
            return "";
        }else if (byteBufferSize > 0) {
            byteBuffer.flip();
            byte[] array = new byte[byteBuffer.remaining()];
            byteBuffer.get(array);
            return new String(array);
        }else{
            return null;
        }
    }
    
}
相關文章
相關標籤/搜索