初識NIO之Java小Demo

Java中的IO、NIO、AIO:

BIO:在Java1.4以前,咱們創建網絡鏈接均使用BIO,屬於同步阻塞IO。默認狀況下,當有一條請求接入就有一條線程專門接待。因此,在客戶端向服務端請求時,會詢問是否有空閒線程進行接待,如若沒有則一直等待或拒接。當併發量小時還能夠接受,當請求量一多起來則會有許多線程生成,在Java中,多線程的上下文切換會消耗計算機有限的資源和性能,形成資源浪費。java

NIO:NIO的出現是爲了解決再BIO下的大併發量問題。其特色是能用一條線程管理全部鏈接。以下圖所示: express

圖片來自網絡
NIO是同步非阻塞模型,經過一條線程控制選擇器(Selector)來管理多個Channel,減小建立線程和上下文切換的浪費。當線程經過選擇器向某條Channel請求數據但其沒有數據時,是不會阻塞的,直接返回,繼續幹其餘事。而一旦某Channel就緒,線程就能去調用請求數據等操做。當該線程對某條Channel進行寫操做時一樣不會被阻塞,因此該線程可以對多個Channel進行管理。

NIO是面向緩衝流的,即數據寫入寫出都是經過 Channel —— Buffer 這一途徑。(雙向流通)api

AIO:與以前兩個IO模型不一樣的是,AIO屬於異步非阻塞模型。當進行讀寫操做時只須調用api的read方法和write方法,這兩種方法均是異步。對於讀方法來講,當有流可讀取時,操做系統會將可讀的流傳入read方法的緩衝區,並通知應用程序;對於寫操做而言,當操做系統將write方法傳遞的流寫入完畢時,操做系統主動通知應用程序。換言之就是當調用完api後,操做系統完成後會調用回調函數。數組

總結:通常IO分爲同步阻塞模型(BIO),同步非阻塞模型(NIO),異步阻塞模型,異步非阻塞模型(AIO)bash

同步阻塞模型指的是當調用io操做時必須等到其io操做結束服務器

同步非阻塞模型指當調用io操做時沒必要等待能夠繼續幹其餘事,但必須不斷詢問io操做是否完成。網絡

異步阻塞模型指應用調用io操做後,由操做系統完成io操做,但應用必須等待或去詢問操做系統是否完成。多線程

異步非阻塞指應用調用io操做後,由操做系統完成io操做並調用回調函數,應用完成放手無論。併發

NIO的小Demo之服務端

首先,先看下服務端的大致代碼異步

public class ServerHandle implements Runnable{
    //帶參數構造函數
    public ServerHandle(int port){
        
    }
    //中止方法
    public void shop(){
        
    }
    //寫方法
    private void write(SocketChannel socketChannel, String  response)throws IOException{
        
    }
    //當有鏈接進來時的處理方法
    private void handleInput(SelectionKey key) throws IOException{
        
    } 
    
    //服務端運行主體方法
    @Override
    public void run() {
    
    }
}
複製代碼

首先咱們先看看該服務端的構造函數的實現:

public ServerHandle(int port){
        try {
            //建立選擇器
            selector = Selector.open();
            //打開監聽通道
            serverSocketChannel = ServerSocketChannel.open();
            //設置爲非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //傳入端口,並設定鏈接隊列最大爲1024
            serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
            //監聽客戶端請求
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //標記啓動標誌
            started = true;
            System.out.println("服務器已啓動,端口號爲:" + port);
        } catch (IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }
複製代碼

在這裏建立了選擇器和監聽通道,並將該監聽通道註冊到選擇器上並選擇其感興趣的事件(accept)。後續其餘接入的鏈接都將經過該 監聽通道 傳入。

而後就是寫方法的實現:

private void doWrite(SocketChannel channel, String response) throws IOException {
        byte[] bytes = response.getBytes();
        ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
        wirteBuffer.put(bytes);
        //將寫模式改成讀模式
        wirteBuffer.flip();
        //寫入管道
        channel.write(wirteBuffer);
    }
複製代碼

其次是當由事件傳入時,即對鏈接進來的連接的處理方法

private void handleInput(SelectionKey key) throws IOException{
        //當該鍵可用時
        if (key.isValid()){
            if (key.isAcceptable()){
                //返回該密鑰建立的通道。
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                經過該通道獲取連接進來的通道
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()){
                //返回該密鑰建立的通道。
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int readBytes = socketChannel.read(byteBuffer);
                if (readBytes > 0){
                    byteBuffer.flip();
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String expression = new String(bytes, "UTF-8");
                    System.out.println("服務器收到的信息:" + expression);
                    //此處是爲了區別打印在工做臺上的數據是由客戶端產生仍是服務端產生
                    doWrite(socketChannel, "+++++" + expression + "+++++");
                } else if(readBytes == 0){
                    //無數據,忽略
                }else if (readBytes < 0){
                    //資源關閉
                    key.cancel();
                    socketChannel.close();
                }
            }
        }
    }
複製代碼

這裏要說明的是,只要ServerSocketChannel及SocketChannel向Selector註冊了特定的事件,Selector就會監控這些事件是否發生。 如在構造方法中有一通道serverSocketChannel註冊了accept事件。當其就緒時就能夠經過調用selector的selectorKeys()方法,訪問」已選擇鍵集「中的就緒通道。

壓軸方法:

@Override
    public void run() {
        //循環遍歷
        while (started) {
            try {
                //當沒有就緒事件時阻塞
                selector.select();
                //返回就緒通道的鍵
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                SelectionKey key;
                while (iterator.hasNext()){
                    key = iterator.next();
                    //獲取後必須移除,不然會陷入死循環
                    iterator.remove();
                    try {
                        //對就緒通道的處理方法,上述有描述
                        handleInput(key);
                    } catch (Exception e){
                        if (key != null){
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (Throwable throwable){
                throwable.printStackTrace();
            }
        }
    }
複製代碼

此方法爲服務端的主體方法。大體流程以下:

  1. 打開ServerSocketChannel,監聽客戶端鏈接
  2. 綁定監聽端口,設置鏈接爲非阻塞模式(阻塞模式下不能註冊到選擇器)
  3. 建立Reactor線程,建立選擇器並啓動線程
  4. 將ServerSocketChannel註冊到Reactor線程中的Selector上,監聽ACCEPT事件
  5. Selector輪詢準備就緒的key
  6. Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路
  7. 設置客戶端鏈路爲非阻塞模式
  8. 將新接入的客戶端鏈接註冊到Reactor線程的Selector上,監聽讀操做,讀取客戶端發送的網絡消息 異步讀取客戶端消息到緩衝區
  9. 調用write將消息異步發送給客戶端

NIO的小Demo之客戶端

public class ClientHandle implements Runnable{
    //構造函數,構造時順便綁定
    public ClientHandle(String ip, int port){
        
    }
    //處理就緒通道
    private void handleInput(SelectionKey key) throws IOException{
        
    }
    //寫方法(與服務端的寫方法一致)
    private void doWrite(SocketChannel channel,String request) throws IOException{
        
    }
    //鏈接到服務端
    private void doConnect() throws IOException{
        
    }
    //發送信息
    public void sendMsg(String msg) throws Exception{
        
    }
}
複製代碼

首先先看構造函數的實現:

public ClientHandle(String ip,int port) {
        this.host = ip;
        this.port = port;
        try{
            //建立選擇器
            selector = Selector.open();
            //打開監聽通道
            socketChannel = SocketChannel.open();
            //若是爲 true,則此通道將被置於阻塞模式;若是爲 false,則此通道將被置於非阻塞模式
            socketChannel.configureBlocking(false);
            started = true;
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }
複製代碼

接下來看對就緒通道的處理辦法:

private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()){
                //這裏的做用將在後面的代碼(doConnect方法)說明
                if(sc.finishConnect()){
                    System.out.println("已鏈接事件");
                }
                else{
                    System.exit(1);
                }
            }
            //讀消息
            if(key.isReadable()){
                //建立ByteBuffer,並開闢一個1k的緩衝區
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //讀取請求碼流,返回讀取到的字節數
                int readBytes = sc.read(buffer);
                //讀取到字節,對字節進行編解碼
                if(readBytes>0){
                    buffer.flip();
                    //根據緩衝區可讀字節數建立字節數組
                    byte[] bytes = new byte[buffer.remaining()];
                    //將緩衝區可讀字節數組複製到新建的數組中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("客戶端收到消息:" + result);
                }lse if(readBytes==0){
                    //忽略
                }else if(readBytes<0){
                    //鏈路已經關閉,釋放資源
                    key.cancel();
                    sc.close();
                }
            }
        }
    }
複製代碼

在run方法以前需先看下此方法的實現:

private void doConnect() throws IOException{
        
        if(socketChannel.connect(new InetSocketAddress(host,port))){
            System.out.println("connect");
        }
        else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            System.out.println("register");
        }
    }
複製代碼

當SocketChannel工做於非阻塞模式下時,調用connect()時會當即返回: 若是鏈接創建成功則返回的是true(好比鏈接localhost時,能當即創建起鏈接),不然返回false。

在非阻塞模式下,返回false後,必需要在隨後的某個地方調用finishConnect()方法完成鏈接。 當SocketChannel處於阻塞模式下時,調用connect()時會進入阻塞,直至鏈接創建成功或者發生IO錯誤時,才從阻塞狀態中退出。

因此該代碼在connect服務端後返回false(但仍是有做用的),並在else語句將該通道註冊在選擇器上並選擇connect事件。

客戶端的run方法:

@Override
    public void run() {
        try{
            doConnect();
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
        //循環遍歷selector
        while(started){
            try{
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key ;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        //selector關閉後會自動釋放裏面管理的資源
        if(selector != null){
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
複製代碼

發送信息到服務端的方法:

public void sendMsg(String msg) throws Exception{
        //覆蓋其以前感興趣的事件(connect),將其更改成OP_READ
        socketChannel.register(selector, SelectionKey.OP_READ);
        doWrite(socketChannel, msg);
    }
複製代碼

完整代碼:

服務端:

/**
 * Created by innoyiya on 2018/8/20.
 */
public class Service {
    private static int DEFAULT_POST = 12345;
    private static ServerHandle serverHandle;
    public static void start(){
        start(DEFAULT_POST);
    }

    public static synchronized void start(int post) {
        if (serverHandle != null){
            serverHandle.shop();
        }
        serverHandle = new ServerHandle(post);
        new Thread(serverHandle,"server").start();
    }
}
複製代碼

服務端主體:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by innoyiya on 2018/8/20.
 */
public class ServerHandle implements Runnable{

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private volatile boolean started;

    public ServerHandle(int port){
        try {
            //建立選擇器
            selector = Selector.open();
            //打開監聽通道
            serverSocketChannel = ServerSocketChannel.open();
            //設置爲非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //斷定端口,並設定鏈接隊列最大爲1024
            serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
            //監聽客戶端請求
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //標記啓動標誌
            started = true;
            System.out.println("服務器已啓動,端口號爲:" + port);
        } catch (IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }
    public void shop(){
        started = false;
    }

    private void doWrite(SocketChannel channel, String response) throws IOException {
        byte[] bytes = response.getBytes();
        ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
        wirteBuffer.put(bytes);
        wirteBuffer.flip();
        channel.write(wirteBuffer);
    }

    private void handleInput(SelectionKey key) throws IOException{
        if (key.isValid()){
            if (key.isAcceptable()){
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()){
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int readBytes = socketChannel.read(byteBuffer);
                if (readBytes > 0){
                    byteBuffer.flip();
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String expression = new String(bytes, "UTF-8");
                    System.out.println("服務器收到的信息:" + expression);
                    doWrite(socketChannel, "+++++" + expression + "+++++");
                } else if (readBytes < 0){
                    key.cancel();
                    socketChannel.close();
                }
            }
        }
    }

    @Override
    public void run() {
        //循環遍歷
        while (started) {
            try {
                selector.select();
                //System.out.println(selector.select());
                Set<SelectionKey> keys = selector.selectedKeys();
                //System.out.println(keys.size());
                Iterator<SelectionKey> iterator = keys.iterator();
                SelectionKey key;
                while (iterator.hasNext()){
                    key = iterator.next();
                    iterator.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e){
                        if (key != null){
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (Throwable throwable){
                throwable.printStackTrace();
            }
        }
    }
}
複製代碼

客戶端:

/**
 * Created by innoyiya on 2018/8/20.
 */
public class Client {
    private static String DEFAULT_HOST = "localhost";
    private static int DEFAULT_PORT = 12345;
    private static ClientHandle clientHandle;
    private static final String EXIT = "exit";

    public static void start() {
        start(DEFAULT_HOST, DEFAULT_PORT);
    }

    public static synchronized void start(String ip, int port) {
        if (clientHandle != null){
            clientHandle.stop();
        }
        clientHandle = new ClientHandle(ip, port);
        new Thread(clientHandle, "Server").start();
    }

    //向服務器發送消息
    public static boolean sendMsg(String msg) throws Exception {
        if (msg.equals(EXIT)){
            return false;
        }
        clientHandle.sendMsg(msg);
        return true;
    }

}
複製代碼

客戶端主體代碼:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by innoyiya on 2018/8/20.
 */

public class ClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean started;

    public ClientHandle(String ip,int port) {
        this.host = ip;
        this.port = port;
        try{
            //建立選擇器
            selector = Selector.open();
            //打開監聽通道
            socketChannel = SocketChannel.open();
            //若是爲 true,則此通道將被置於阻塞模式;若是爲 false,則此通道將被置於非阻塞模式
            socketChannel.configureBlocking(false);
            started = true;
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }
    public void stop(){
        started = false;
    }
    
    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()){
                if(sc.finishConnect()){
                    System.out.println("已鏈接事件");
                }
                else{
                    System.exit(1);
                }
            }
            //讀消息
            if(key.isReadable()){
                //建立ByteBuffer,並開闢一個1M的緩衝區
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //讀取請求碼流,返回讀取到的字節數
                int readBytes = sc.read(buffer);
                //讀取到字節,對字節進行編解碼
                if(readBytes>0){
                    //將緩衝區當前的limit設置爲position=0,用於後續對緩衝區的讀取操做
                    buffer.flip();
                    //根據緩衝區可讀字節數建立字節數組
                    byte[] bytes = new byte[buffer.remaining()];
                    //將緩衝區可讀字節數組複製到新建的數組中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("客戶端收到消息:" + result);
                } else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }
            }
        }
    }
    //異步發送消息
    private void doWrite(SocketChannel channel,String request) throws IOException{
        byte[] bytes = request.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        //flip操做
        writeBuffer.flip();
        //發送緩衝區的字節數組
        channel.write(writeBuffer);

    }
    private void doConnect() throws IOException{
        if(socketChannel.connect(new InetSocketAddress(host,port))){
            System.out.println("connect");
        }
        else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            System.out.println("register");
        }
    }
    public void sendMsg(String msg) throws Exception{
        //覆蓋其以前感興趣的事件,將其更改成OP_READ
        socketChannel.register(selector, SelectionKey.OP_READ);
        doWrite(socketChannel, msg);
    }

    @Override
    public void run() {
        try{
            doConnect();
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
        //循環遍歷selector
        while(started){
            try{
                selector.select();

                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key ;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        //selector關閉後會自動釋放裏面管理的資源
        if(selector != null){
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
}
複製代碼

測試類:

import java.util.Scanner;

/**
 * Created by innoyiya on 2018/8/20.
 */
public class Test {
    public static void main(String[] args) throws Exception {
        Service.start();
        Thread.sleep(1000);
        Client.start();
        while(Client.sendMsg(new Scanner(System.in).nextLine()));
    }
}
複製代碼

控制檯打印:

服務器已啓動,端口號爲:12345
register
已鏈接事件
1234
服務器收到的信息:1234
客戶端收到消息:+++++1234+++++
5678
服務器收到的信息:5678
客戶端收到消息:+++++5678+++++
複製代碼

若有不妥之處,請告訴我。

相關文章
相關標籤/搜索