上章提到過Java的NIO採起的是多路IO複用模式,其衍生出來的模型就是Reactor模型。多路IO複用有兩種方式,一種是select/poll,另外一種是epoll。在windows系統上使用的是select/poll方式,在linux上使用的是epoll方式,主要是因爲DefaultSelectorProvider具體選擇的selector決定。epoll是在linux2.6以後才支持的,select的方式時間複雜度爲O(N),最大fd限制是1024。epoll沒有數量限制,時間複雜度是O(1)。html
再溫習一遍多路IO複用的基本思路,阻塞發生在select上面,select管理全部註冊在其上的socket請求,socket準備徹底就會交由用戶程序處理。下面結合java的nio例子,來更細緻的講解一下這種模式,強化理解一下。要寫出java的nio不難但要徹底正確毫不容易,相關概念不清楚就會產生難以理解的bug,這裏有一些相關的陷阱。java
另外說明一下,這個例子不必定徹底正確,用於演示足夠了。對於Java的NIO而言,有幾個概念比較重要,這裏先提兩個channel和buffer。不論是客戶端發送服務端接收,仍是服務端發送客戶端接收,基本的流程都是:發送方發送數據->buffer->發送方channel->接收方channel->buffer->接收方接收數據。linux
對於服務端而言首先須要的就是肯定監聽的端口,其次是與之對應的channel,然後就是selector,最後還須要一個線程池。爲何會須要線程池呢?道理很簡單,select模式獲取了全部channel的change,對於服務端而言,change的可能有很是多的客戶端channel,而用戶程序只有一個線程,若是這麼多個channel一個個順序執行,若是有耗時嚴重的操做,那麼後果是很是糟糕的,全部客戶端都會延時處理,這也是多路IO複用的一個糟糕點。線程池就是爲每一個客戶端分配一個線程去處理,減緩這種狀況的後果。Server的基本四個內容就出來了:windows
private int port; private Selector selector; private ServerSocketChannel serverSocketChannel; private ExecutorService executorService;
接下來就是初始化服務端。初始化的步驟也是通常化:1.初始化鏈接池;2.初始化Selector;3.初始化綁定端口;4.將socket註冊到select上。大體步驟就是這些,可是還有些額外的細節。具體代碼以下:服務器
1. executorService = Executors.newCachedThreadPool(); 2. selector = Selector.open(); 3. serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); 4. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
這裏的一個細節就是socket必須是非阻塞模式。初始化完成以後就是正式的邏輯了,再來回憶一下多路IO複用的邏輯,管理多個IO的change事件,阻塞在select上,若是有change事件,select就能繼續執行下去,選出change了的IO,只對這部分IO進行操做。這段描述就下面這段簡單的代碼了:多線程
int event = selector.select(); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); } }
這裏就是調用selector.select()方法進行阻塞,若是change事件不爲0(這個判斷應該去掉好點),獲取當前全部change事件。遍歷處理,移除該事件。不移除,下次該事件依舊存在,至關於認爲是沒處理,會出現屢次觸發錯誤。異步
下面詳細介紹一下事件的類型,Java定義了4種類型:socket
1.針對服務端的ACCEPT事件,接收到客戶端的鏈接請求;ide
2.針對客戶端的CONNECT事件,發起對服務端的鏈接請求;性能
3.針對獲取對端發送的數據的READ事件;
4.針對請求發送數據給對端時準備好了緩衝區的WRITE事件。
其中WRITE事件通常不進行使用,由於大部分狀況緩衝區都是空閒的,會馬上觸發該事件,這個浪費CPU的性能,還會形成bug。下面代碼就是server端處理的一個基本邏輯,也是有些要注意的點。
if(key.isValid() && key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); } else if(key.isValid() && key.isReadable()) { key.interestOps(0); executorService.execute(new Task(key)); }
服務端的事件就3個,write事件不用管,因此只須要關注accept和read事件。有請求進來,就接收這個請求,設置成非阻塞式,再註冊到selector中,監聽該請求的讀事件。讀事件到來,先將監聽的時間改爲無,這裏是由於異步執行,可能沒有讀完數據,再次觸發了該channel的讀事件,重複讀取,形成問題。Task就是一個runnabel任務,處理讀取,發送應答,這裏還須要從新將監聽的事件改爲讀事件,即處理完了本次內容,等待下次內容。
Task的具體內容以下:
SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; try { while((size = channel.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } if(baos.size() == 0) { key.cancel(); } else { // 協議解析 String msg = new String(baos.toByteArray(), "UTF-8"); // 返回該數據 String reply = "get client msg: " + msg; ByteBuffer re = ByteBuffer.wrap(reply.getBytes()); while(re.hasRemaining()) { channel.write(re); } // 處理完畢後後設置成讀狀態,繼續獲取相關數據 key.interestOps(SelectionKey.OP_READ); key.selector().wakeup(); } } catch (Exception e) { key.cancel(); // 異常鏈接中斷 }
這裏的邏輯就是使用buffer將數據取出來了。取出爲0,或者拋出異常,意味着客戶端斷開了鏈接,直接取消掉該channel的管理。回寫了一個數據。以後就是將事件監聽設置回監聽讀取事件,最後一步須要wakeup一下。wakeup是爲了喚醒一下select,緣由以下:這個是因爲前面先將監聽的事件改爲了0,後面才改回了read事件。不論是怎麼修改,都不是馬上生效的,須要下次select事件觸發才能生效,問題也只會出在多線程中。試想一下下面這個過程:
1.A通道有數據了,A先置爲0了,開始讀取數據,由於是異步的,因此又走到了select阻塞了;
2.B鏈接進來,觸發的select方法,這時A的0才正式生效,這也是咱們想要的,由於A以前的數據還在處理,並非新的數據到來,不須要再次觸發讀操做。這裏先置爲0的動做是正確的。
3.此時主線程又走到了select方法阻塞了,注意,此時A生效的是0,A結束這次讀操做,等待下次讀事件。問題就出在這裏,若是不觸發一下select方法,此時A即便有新的讀事件,其也不會觸發,由於重置爲read並無生效,要等select觸發才能生效。這就至關於A沒接到消息了,若是B有讀事件,觸發了select方法,則A才能接到消息。wakeup在這裏必須添加的目的就是強制觸發一下select,使A更新回read事件,而不是不關係任何事件。
實際上觸發沒有這麼麻煩,在客戶端還會說到這個問題,有更簡單的觸發方法。
上面的代碼也能夠看出nio都是基於buffer操做的。buffer也有不少陷阱,使用正確不容易。下面給出一個個人完整例子,能夠運行試試,不保證沒bug。瞭解了上面的知識,測出bug調試應該也不難。
import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NioServer { private int port; private Selector selector; private ServerSocketChannel serverSocketChannel; private ExecutorService executorService; public NioServer(int port) { this.port = port; } public void open() { this.executorService = Executors.newCachedThreadPool(); try { this.selector = Selector.open(); this.serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("server端啓動..."); for(;;) { System.out.println("======>select的keys數量:" + selector.keys().size()); int event = selector.select(); System.out.println("======>select的keys數量:" + selector.keys().size() + ", change事件數量:" + event); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); // System.out.println("======>真實未處理的change事件數量:" + keys.size()); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); // 移除這個key if(key.isValid() && key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); System.out.println("===>獲取client鏈接,準備讀取數據:"+ client.socket().getRemoteSocketAddress()); } else if(key.isValid() && key.isReadable()) { // 先置爲0,防止異步線程未處理完該事件又被select // key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); key.interestOps(0); executorService.execute(new Task(key)); } else { System.out.println("其它事件:" + key.interestOps()); } } } } } catch (Exception e) { e.printStackTrace(); } } private class Task implements Runnable { private SelectionKey key; public Task(SelectionKey key) { this.key = key; } @Override public void run() { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; try { // System.out.println("===>開始讀取數據"); while((size = channel.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } if(baos.size() == 0) { key.cancel(); System.out.println("======<client斷開鏈接:"+ channel.socket().getRemoteSocketAddress()); } else { // 協議解析 String msg = new String(baos.toByteArray(), "UTF-8"); System.out.println("===>獲取client數據: " + msg); // 返回該數據 String reply = "get client msg: " + msg; ByteBuffer re = ByteBuffer.wrap(reply.getBytes()); while(re.hasRemaining()) { channel.write(re); } // 處理完畢後後設置成讀狀態,繼續獲取相關數據 // key.interestOps((key.interestOps() | SelectionKey.OP_READ)); key.interestOps(SelectionKey.OP_READ); key.selector().wakeup(); System.out.println("===<返回server的獲取結果"); } } catch (Exception e) { key.cancel(); // 異常鏈接中斷 System.out.println("======<異常client斷開鏈接:"+ channel.socket().getRemoteSocketAddress()); } } } public static void main(String[] args) { NioServer nioServer = new NioServer(7777); nioServer.open(); } }
第一節說過,在單個鏈接的時候,多路IO複用方式甚至沒有阻塞式IO性能好,多路IO複用是針對了多個IO操做。這裏仍是給出客戶端的NIO寫法。一樣的,客戶端須要上面的內容,不包括線程池,咱們只處理一個客戶端鏈接。須要增長的一個字段就是服務端地址,因此總共也是4個內容:服務端地址、端口、鏈接通道、select。
private String host; private int port; private SocketChannel socketChannel; private Selector selector;
初始化也是基本操做:1.獲取select;2.創建鏈接;3.註冊到select
1. selector = Selector.open(); 2. socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.socket().setTcpNoDelay(true); socketChannel.connect(new InetSocketAddress(host, port)); 3. socketChannel.register(selector, SelectionKey.OP_CONNECT);
這裏要注意的也就是要以非阻塞式的方式進行。後面的步驟也同樣,進行select,獲取change事件,根據不一樣的事件處理不一樣。write事件不使用,客戶端關注的就connect和read事件了。
int event = selector.select(); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if(sk.isValid() && sk.isConnectable()) { if(socketChannel.isConnectionPending()) { if(socketChannel.finishConnect()) { sk.interestOps(SelectionKey.OP_READ); } else { sk.cancel(); } } } else if(sk.isValid() && sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; while((size = sc.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } } } }
這裏要注意的是connect並無真正連上,要觸發了connect事件,執行finishConnect纔會鏈接成功。鏈接成功後更新成read事件。這裏會有一個疑惑,server端的時候intersetOps設置成0或者read不是直接生效,要select執行後才能生效,爲何這邊connect設置成read事件就能直接改過來???...這是一個思惟陷阱:不是要執行後才能改變狀態,而是select認準的狀態是select操做以前一瞬間的狀態。server端的例子,哪怕不須要兩個線程,單個線程也能觸發,只要是異步操做。主線程先接收到A的讀取操做,設置A成0,而後又進行select了,此一瞬間A的狀態是0,後面A處理完後,再來一條消息就沒用了,由於此時select阻塞時檢測的狀態是0,後續改過來也沒用,因此才須要wakeup一下,讓其認識到其狀態應該修改後的read。而上述例子爲何不須要,就是由於這是一個同步的過程,這次connect事件,下次再select的時候必定變成了read。
其餘的也沒有什麼值得一提的了,下面是客戶端的完整代碼。
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class NioClient { private String host; private int port; private SocketChannel socketChannel; private Selector selector; public NioClient(String host, int port) { this.host = host; this.port = port; } public void open() { try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.socket().setTcpNoDelay(true); socketChannel.connect(new InetSocketAddress(host, port)); socketChannel.register(selector, SelectionKey.OP_CONNECT); System.out.println("client端啓動..."); for(;;) { System.out.println("======>select的keys數量:" + selector.keys().size()); int event = selector.select(); System.out.println("======>select的keys數量:" + selector.keys().size() + ", change事件數量:" + event); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if(sk.isValid() && sk.isConnectable()) { if(socketChannel.isConnectionPending()) { if(socketChannel.finishConnect()) { sk.interestOps(SelectionKey.OP_READ); System.out.println("鏈接上遠程服務器:" + socketChannel.getRemoteAddress()); } else { sk.cancel(); System.out.println("鏈接未創建..."); } } } else if(sk.isValid() && sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; while((size = sc.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } System.out.println("接收服務器消息:" + new String(baos.toByteArray(), "UTF-8")); } else { System.out.println("其它事件:" + sk.interestOps()); } } } } } catch (IOException e) { e.printStackTrace(); } } public void close() { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } public void send(String msg) { byte[] b = msg.getBytes(); ByteBuffer buffer = ByteBuffer.wrap(b); try { while (buffer.hasRemaining()) { socketChannel.write(buffer); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { NioClient client = new NioClient("127.0.0.1", 7777); Thread thread = new Thread(new Runnable() { @Override public void run() { client.open(); } }); thread.setDaemon(true); thread.start(); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String msg = scanner.nextLine(); if("close".equals(msg)) { client.close(); System.out.println("退出成功"); break; } else { client.send(msg); } } } }
此章結合java nio的實際demo增強一下對多路IO複用的理解,理解Java的nio基本流程,對於理解後面的netty設計的結構有很大的幫助。