漫談NIO(2)之Java的NIO

1.前言

    上章提到過Java的NIO採起的是多路IO複用模式,其衍生出來的模型就是Reactor模型。多路IO複用有兩種方式,一種是select/poll,另外一種是epoll。在windows系統上使用的是select/poll方式,在linux上使用的是epoll方式,主要是因爲DefaultSelectorProvider具體選擇的selector決定。epoll是在linux2.6以後才支持的,select的方式時間複雜度爲O(N),最大fd限制是1024。epoll沒有數量限制,時間複雜度是O(1)。html

    再溫習一遍多路IO複用的基本思路,阻塞發生在select上面,select管理全部註冊在其上的socket請求,socket準備徹底就會交由用戶程序處理。下面結合java的nio例子,來更細緻的講解一下這種模式,強化理解一下。要寫出java的nio不難但要徹底正確毫不容易,相關概念不清楚就會產生難以理解的bug,這裏有一些相關的陷阱。java

    另外說明一下,這個例子不必定徹底正確,用於演示足夠了。對於Java的NIO而言,有幾個概念比較重要,這裏先提兩個channel和buffer。不論是客戶端發送服務端接收,仍是服務端發送客戶端接收,基本的流程都是:發送方發送數據->buffer->發送方channel->接收方channel->buffer->接收方接收數據。linux

2.具體實現

2.1 服務端

    對於服務端而言首先須要的就是肯定監聽的端口,其次是與之對應的channel,然後就是selector,最後還須要一個線程池。爲何會須要線程池呢?道理很簡單,select模式獲取了全部channel的change,對於服務端而言,change的可能有很是多的客戶端channel,而用戶程序只有一個線程,若是這麼多個channel一個個順序執行,若是有耗時嚴重的操做,那麼後果是很是糟糕的,全部客戶端都會延時處理,這也是多路IO複用的一個糟糕點。線程池就是爲每一個客戶端分配一個線程去處理,減緩這種狀況的後果。Server的基本四個內容就出來了:windows

    private int port;
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private ExecutorService executorService;

    接下來就是初始化服務端。初始化的步驟也是通常化:1.初始化鏈接池;2.初始化Selector;3.初始化綁定端口;4.將socket註冊到select上。大體步驟就是這些,可是還有些額外的細節。具體代碼以下:服務器

1. executorService = Executors.newCachedThreadPool();
2. selector = Selector.open();
3. serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.socket().bind(new InetSocketAddress(port));
4. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

 這裏的一個細節就是socket必須是非阻塞模式。初始化完成以後就是正式的邏輯了,再來回憶一下多路IO複用的邏輯,管理多個IO的change事件,阻塞在select上,若是有change事件,select就能繼續執行下去,選出change了的IO,只對這部分IO進行操做。這段描述就下面這段簡單的代碼了:多線程

int event = selector.select();
if(event != 0) {
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> it = keys.iterator();
    while(it.hasNext()) {
        SelectionKey key = it.next();
        it.remove();
    }      
}

    這裏就是調用selector.select()方法進行阻塞,若是change事件不爲0(這個判斷應該去掉好點),獲取當前全部change事件。遍歷處理,移除該事件。不移除,下次該事件依舊存在,至關於認爲是沒處理,會出現屢次觸發錯誤。異步

    下面詳細介紹一下事件的類型,Java定義了4種類型:socket

  1.針對服務端的ACCEPT事件,接收到客戶端的鏈接請求;ide

  2.針對客戶端的CONNECT事件,發起對服務端的鏈接請求;性能

  3.針對獲取對端發送的數據的READ事件;

  4.針對請求發送數據給對端時準備好了緩衝區的WRITE事件。

    其中WRITE事件通常不進行使用,由於大部分狀況緩衝區都是空閒的,會馬上觸發該事件,這個浪費CPU的性能,還會形成bug。下面代碼就是server端處理的一個基本邏輯,也是有些要注意的點。

if(key.isValid() && key.isAcceptable()) {
     ServerSocketChannel server = (ServerSocketChannel) key.channel();
     SocketChannel client = server.accept();
     client.configureBlocking(false);
     client.register(selector, SelectionKey.OP_READ); 
} else if(key.isValid() && key.isReadable()) {
      key.interestOps(0);
      executorService.execute(new Task(key));
}

  服務端的事件就3個,write事件不用管,因此只須要關注accept和read事件。有請求進來,就接收這個請求,設置成非阻塞式,再註冊到selector中,監聽該請求的讀事件。讀事件到來,先將監聽的時間改爲無,這裏是由於異步執行,可能沒有讀完數據,再次觸發了該channel的讀事件,重複讀取,形成問題。Task就是一個runnabel任務,處理讀取,發送應答,這裏還須要從新將監聽的事件改爲讀事件,即處理完了本次內容,等待下次內容。

    Task的具體內容以下:

            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            int size = -1;
            try {
                while((size = channel.read(buffer)) > 0) {
                    buffer.flip();
                    baos.write(buffer.array(), 0, size);
                    buffer.clear();
                }
                if(baos.size() == 0) {
                    key.cancel();
                } else {
                    // 協議解析
                    String msg = new String(baos.toByteArray(), "UTF-8");
                    // 返回該數據
                    String reply = "get client msg: " + msg;
                    ByteBuffer re = ByteBuffer.wrap(reply.getBytes());
                    while(re.hasRemaining()) {
                        channel.write(re);
                    }
                    // 處理完畢後後設置成讀狀態,繼續獲取相關數據
                    key.interestOps(SelectionKey.OP_READ);
                    key.selector().wakeup();
                }
            } catch (Exception e) {
                key.cancel();   // 異常鏈接中斷
            }    

    這裏的邏輯就是使用buffer將數據取出來了。取出爲0,或者拋出異常,意味着客戶端斷開了鏈接,直接取消掉該channel的管理。回寫了一個數據。以後就是將事件監聽設置回監聽讀取事件,最後一步須要wakeup一下。wakeup是爲了喚醒一下select,緣由以下:這個是因爲前面先將監聽的事件改爲了0,後面才改回了read事件。不論是怎麼修改,都不是馬上生效的,須要下次select事件觸發才能生效,問題也只會出在多線程中。試想一下下面這個過程:

  1.A通道有數據了,A先置爲0了,開始讀取數據,由於是異步的,因此又走到了select阻塞了;

  2.B鏈接進來,觸發的select方法,這時A的0才正式生效,這也是咱們想要的,由於A以前的數據還在處理,並非新的數據到來,不須要再次觸發讀操做。這裏先置爲0的動做是正確的。

  3.此時主線程又走到了select方法阻塞了,注意,此時A生效的是0,A結束這次讀操做,等待下次讀事件。問題就出在這裏,若是不觸發一下select方法,此時A即便有新的讀事件,其也不會觸發,由於重置爲read並無生效,要等select觸發才能生效。這就至關於A沒接到消息了,若是B有讀事件,觸發了select方法,則A才能接到消息。wakeup在這裏必須添加的目的就是強制觸發一下select,使A更新回read事件,而不是不關係任何事件。

  實際上觸發沒有這麼麻煩,在客戶端還會說到這個問題,有更簡單的觸發方法。

    上面的代碼也能夠看出nio都是基於buffer操做的。buffer也有不少陷阱,使用正確不容易。下面給出一個個人完整例子,能夠運行試試,不保證沒bug。瞭解了上面的知識,測出bug調試應該也不難。

import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NioServer {

    private int port;
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private ExecutorService executorService;

    public NioServer(int port) {
        this.port = port;
    }

    public void open() {
        this.executorService = Executors.newCachedThreadPool();
        try {
            this.selector = Selector.open();
            this.serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("server端啓動...");
            for(;;) {
                System.out.println("======>select的keys數量:" + selector.keys().size());
                int event = selector.select();
                System.out.println("======>select的keys數量:" + selector.keys().size() + ", change事件數量:" + event);
                if(event != 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
//                    System.out.println("======>真實未處理的change事件數量:" + keys.size());
                    while(it.hasNext()) {
                        SelectionKey key = it.next();
                        it.remove();    // 移除這個key
                        if(key.isValid() && key.isAcceptable()) {
                            ServerSocketChannel server = (ServerSocketChannel) key.channel();
                            SocketChannel client = server.accept();
                            client.configureBlocking(false);
                            client.register(selector, SelectionKey.OP_READ);
                            System.out.println("===>獲取client鏈接,準備讀取數據:"+ client.socket().getRemoteSocketAddress());
                        } else if(key.isValid() && key.isReadable()) {
                            // 先置爲0,防止異步線程未處理完該事件又被select
//                            key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
                            key.interestOps(0);
                            executorService.execute(new Task(key));
                        } else {
                            System.out.println("其它事件:" + key.interestOps());
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private class Task implements Runnable {

        private SelectionKey key;

        public Task(SelectionKey key) {
            this.key = key;
        }

        @Override
        public void run() {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            int size = -1;
            try {
//                System.out.println("===>開始讀取數據");
                while((size = channel.read(buffer)) > 0) {
                    buffer.flip();
                    baos.write(buffer.array(), 0, size);
                    buffer.clear();
                }
                if(baos.size() == 0) {
                    key.cancel();
                    System.out.println("======<client斷開鏈接:"+ channel.socket().getRemoteSocketAddress());
                } else {
                    // 協議解析
                    String msg = new String(baos.toByteArray(), "UTF-8");
                    System.out.println("===>獲取client數據: " + msg);
                    // 返回該數據
                    String reply = "get client msg: " + msg;
                    ByteBuffer re = ByteBuffer.wrap(reply.getBytes());
                    while(re.hasRemaining()) {
                        channel.write(re);
                    }
                    // 處理完畢後後設置成讀狀態,繼續獲取相關數據
//                    key.interestOps((key.interestOps() | SelectionKey.OP_READ));
                    key.interestOps(SelectionKey.OP_READ);
                    key.selector().wakeup();
                    System.out.println("===<返回server的獲取結果");
                }
            } catch (Exception e) {
                key.cancel();   // 異常鏈接中斷
                System.out.println("======<異常client斷開鏈接:"+ channel.socket().getRemoteSocketAddress());
            }
        }
    }

    public static void main(String[] args) {
        NioServer nioServer = new NioServer(7777);
        nioServer.open();
    }
}

2.2 客戶端

     第一節說過,在單個鏈接的時候,多路IO複用方式甚至沒有阻塞式IO性能好,多路IO複用是針對了多個IO操做。這裏仍是給出客戶端的NIO寫法。一樣的,客戶端須要上面的內容,不包括線程池,咱們只處理一個客戶端鏈接。須要增長的一個字段就是服務端地址,因此總共也是4個內容:服務端地址、端口、鏈接通道、select。

    private String host;
    private int port;
    private SocketChannel socketChannel;
    private Selector selector;

    初始化也是基本操做:1.獲取select;2.創建鏈接;3.註冊到select

1.    selector = Selector.open();
2.    socketChannel = SocketChannel.open();
      socketChannel.configureBlocking(false);
      socketChannel.socket().setTcpNoDelay(true);
      socketChannel.connect(new InetSocketAddress(host, port));
3.    socketChannel.register(selector, SelectionKey.OP_CONNECT);

    這裏要注意的也就是要以非阻塞式的方式進行。後面的步驟也同樣,進行select,獲取change事件,根據不一樣的事件處理不一樣。write事件不使用,客戶端關注的就connect和read事件了。

                int event = selector.select();
                if(event != 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    while(it.hasNext()) {
                        SelectionKey sk = it.next();
                        it.remove();
                        if(sk.isValid() && sk.isConnectable()) {
                            if(socketChannel.isConnectionPending()) {
                                if(socketChannel.finishConnect()) {
                                    sk.interestOps(SelectionKey.OP_READ);
                                } else {
                                    sk.cancel();
                                }
                            }
                        } else if(sk.isValid() && sk.isReadable()) {
                            SocketChannel sc = (SocketChannel) sk.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            ByteArrayOutputStream baos = new ByteArrayOutputStream();
                            int size = -1;
                            while((size = sc.read(buffer)) > 0) {
                                buffer.flip();
                                baos.write(buffer.array(), 0, size);
                                buffer.clear();
                            }
                        }
                    }
                }    

    這裏要注意的是connect並無真正連上,要觸發了connect事件,執行finishConnect纔會鏈接成功。鏈接成功後更新成read事件。這裏會有一個疑惑,server端的時候intersetOps設置成0或者read不是直接生效,要select執行後才能生效,爲何這邊connect設置成read事件就能直接改過來???...這是一個思惟陷阱:不是要執行後才能改變狀態,而是select認準的狀態是select操做以前一瞬間的狀態。server端的例子,哪怕不須要兩個線程,單個線程也能觸發,只要是異步操做。主線程先接收到A的讀取操做,設置A成0,而後又進行select了,此一瞬間A的狀態是0,後面A處理完後,再來一條消息就沒用了,由於此時select阻塞時檢測的狀態是0,後續改過來也沒用,因此才須要wakeup一下,讓其認識到其狀態應該修改後的read。而上述例子爲何不須要,就是由於這是一個同步的過程,這次connect事件,下次再select的時候必定變成了read。

    其餘的也沒有什麼值得一提的了,下面是客戶端的完整代碼。

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

public class NioClient {

    private String host;
    private int port;
    private SocketChannel socketChannel;
    private Selector selector;

    public NioClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void open() {
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.socket().setTcpNoDelay(true);
            socketChannel.connect(new InetSocketAddress(host, port));
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            System.out.println("client端啓動...");
            for(;;) {
                System.out.println("======>select的keys數量:" + selector.keys().size());
                int event = selector.select();
                System.out.println("======>select的keys數量:" + selector.keys().size() + ", change事件數量:" + event);
                if(event != 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    while(it.hasNext()) {
                        SelectionKey sk = it.next();
                        it.remove();
                        if(sk.isValid() && sk.isConnectable()) {
                            if(socketChannel.isConnectionPending()) {
                                if(socketChannel.finishConnect()) {
                                    sk.interestOps(SelectionKey.OP_READ);
                                    System.out.println("鏈接上遠程服務器:" + socketChannel.getRemoteAddress());
                                } else {
                                    sk.cancel();
                                    System.out.println("鏈接未創建...");
                                }
                            }
                        } else if(sk.isValid() && sk.isReadable()) {
                            SocketChannel sc = (SocketChannel) sk.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            ByteArrayOutputStream baos = new ByteArrayOutputStream();
                            int size = -1;
                            while((size = sc.read(buffer)) > 0) {
                                buffer.flip();
                                baos.write(buffer.array(), 0, size);
                                buffer.clear();
                            }
                            System.out.println("接收服務器消息:" + new String(baos.toByteArray(), "UTF-8"));
                        } else {
                            System.out.println("其它事件:" + sk.interestOps());
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void close() {
        try {
            socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void send(String msg) {
        byte[] b = msg.getBytes();
        ByteBuffer buffer = ByteBuffer.wrap(b);
        try {
            while (buffer.hasRemaining()) {
                socketChannel.write(buffer);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        NioClient client = new NioClient("127.0.0.1", 7777);
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                client.open();
            }
        });
        thread.setDaemon(true);
        thread.start();
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String msg = scanner.nextLine();
            if("close".equals(msg)) {
                client.close();
                System.out.println("退出成功");
                break;
            } else {
                client.send(msg);
            }
        }
    }
}

3.總結

    此章結合java nio的實際demo增強一下對多路IO複用的理解,理解Java的nio基本流程,對於理解後面的netty設計的結構有很大的幫助。

相關文章
相關標籤/搜索