咱們熟知的Socket編程就是BIO,每一個請求對應一個線程去處理。一個socket鏈接一個處理線程(這個線程負責這個Socket鏈接的一系列數據傳輸操做)。阻塞的緣由在於:操做系統容許的線程數量是有限的,多個socket申請與服務端創建鏈接時,服務端不能提供相應數量的處理線程,沒有分配處處理線程的鏈接就會阻塞等待或被拒絕。java
以下圖就是BIO(1:1同步阻塞)通訊模型,每當有一個請求過來,都會建立新的線程,當線程數達到必定數量,佔滿了整臺機器的資源,那麼機器就掛掉了。對於CPU來講也是一個很差的事情,由於會致使頻繁的切換上下文。react
那麼咱們有以下的改進措施(M:N同步阻塞IO),可是仍是有上面的一些問題,僅僅是解決了頻繁的建立線程的問題,不過因爲是同步,若是讀寫速度慢,那麼每一個線程進來是會致使阻塞的,性能的高低徹底取決於阻塞的時間。這個對於用戶的體驗也是至關很差的。linux
NIO 是一種同步非阻塞的 IO 模型。同步是指線程不斷輪詢 IO 事件是否就緒,非阻塞是指線程在等待 IO 的時候,能夠同時作其餘任務。同步的核心就是 Selector,Selector 代替了線程自己輪詢 IO 事件,避免了阻塞同時減小了沒必要要的線程消耗;非阻塞的核心就是通道和緩衝區,當 IO 事件就緒時,能夠經過寫道緩衝區,保證 IO 的成功,而無需線程阻塞式地等待。編程
非阻塞式IO模型(NIO)NIO+單線程Reactor模式:reactor設計模式是event-driven architecture的一種實現方式,處理多個客戶端併發的向服務端請求服務的場景。每種服務在服務端可能由多個方法組成。reactor會解耦併發請求的服務並分發給對應的事件處理器來處理。設計模式
reactor主要由如下幾個角色構成:handle、Synchronous Event Demultiplexer、Initiation Dispatcher、Event Handler、Concrete Event Handlerapi
處理流程:數組
這種模型狀況下,因爲 acceptor 是單線程的,既要接受請求,還要去處理時間,若是某一些事件處理請求花費的時間比較長,那麼這個請求將會進入等待,整個狀況下會同步。基於這種問題下咱們有什麼改進措施呢?ruby
非阻塞式IO模型(NIO)NIO+多線程Reactor模式:可使用多線程去處理,使用線程池,讓acceptor僅僅去接受請求,把事件的處理交給線程池中的線程去處理:服務器
那麼在這種狀況下還存在哪些弊端呢?將處理器的執行放入線程池,多線程進行業務處理。但Reactor仍爲單個線程。仍是acceptor是單線程的,沒法去並行的去響應多個客戶端,那麼要怎麼處理呢?網絡
NIO+主從多線程Reactor模式:
mainReactor負責監聽鏈接,accept鏈接給subReactor處理,爲何要單獨分一個Reactor來處理監聽呢?由於像TCP這樣須要通過3次握手才能創建鏈接,這個創建鏈接的過程也是要耗時間和資源的,單獨分一個Reactor來處理,能夠提升性能。
異步阻塞IO(AIO):
NIO是同步的IO,是由於程序須要IO操做時,必須得到了IO權限後親自進行IO操做才能進行下一步操做。AIO是對NIO的改進(因此AIO又叫NIO.2),它是基於Proactor模型的。每一個socket鏈接在事件分離器註冊 IO完成事件 和 IO完成事件處理器。程序須要進行IO時,向分離器發出IO請求並把所用的Buffer區域告知分離器,分離器通知操做系統進行IO操做,操做系統本身不斷嘗試獲取IO權限並進行IO操做(數據保存在Buffer區),操做完成後通知分離器;分離器檢測到 IO完成事件,則激活 IO完成事件處理器,處理器會通知程序說「IO已完成」,程序知道後就直接從Buffer區進行數據的讀寫。
也就是說:AIO是發出IO請求後,由操做系統本身去獲取IO權限並進行IO操做;NIO則是發出IO請求後,由線程不斷嘗試獲取IO權限,獲取到後通知應用程序本身進行IO操做。
同步/異步:數據若是還沒有就緒,是否須要等待數據結果。
阻塞/非阻塞:進程/線程須要操做的數據若是還沒有就緒,是否妨礙了當前進程/線程的後續操做。應用程序的調用是否當即返回!
NIO與BIO最大的區別是 BIO是面向流的,而NIO是面向Buffer的。
NIO還提供了兩個新概念:Buffer和Channel:
Buffer: 是一塊連續的內存塊,是 NIO 數據讀或寫的中轉地。 爲何說NIO是基於緩衝區的IO方式呢?由於,當一個連接創建完成後,IO的數據未必會立刻到達,爲了當數據到達時可以正確完成IO操做,在BIO(阻塞IO)中,等待IO的線程必須被阻塞,以全天候地執行IO操做。爲了解決這種IO方式低效的問題,引入了緩衝區的概念,當數據到達時,能夠預先被寫入緩衝區,再由緩衝區交給線程,所以線程無需阻塞地等待IO。
Channel: 數據的源頭或者數據的目的地 ,用於向 buffer 提供數據或者讀取 buffer 數據 ,buffer 對象的惟一接口,異步 I/O 支持。
Buffer做爲IO流中數據的緩衝區,而Channel則做爲socket的IO流與Buffer的傳輸通道。客戶端socket與服務端socket之間的IO傳輸不直接把數據交給CPU使用,而是先通過Channel通道把數據保存到Buffer,而後CPU直接從Buffer區讀寫數據,一次能夠讀寫更多的內容。使用Buffer提升IO效率的緣由(這裏與IO流裏面的BufferedXXStream、BufferedReader、BufferedWriter提升性能的原理同樣):IO的耗時主要花在數據傳輸的路上,普通的IO是一個字節一個字節地傳輸,而採用了Buffer的話,經過Buffer封裝的方法(好比一次讀一行,則以行爲單位傳輸而不是一個字節一次進行傳輸)就能夠實現「一大塊字節」的傳輸。好比:IO就是送快遞,普通IO是一個快遞跑一趟,採用了Buffer的IO就是一車跑一趟。很明顯,buffer效率更高,花在傳輸路上的時間大大縮短。
面向buffer的通道,一個Channel(通道)表明和某一實體的鏈接,這個實體能夠是文件、網絡套接字等。也就是說,通道是Java NIO提供的一座橋樑,用於咱們的程序和操做系統底層I/O服務進行交互。通道是一種很基本很抽象的描述,和不一樣的I/O服務交互,執行不一樣的I/O操做,實現不同,所以具體的有FileChannel、SocketChannel,ServerSocketChannel,DatagramChannel等。通道使用起來跟Stream比較像,能夠讀取數據到Buffer中,也能夠把Buffer中的數據寫入通道。可是channel是雙向的,而stream是單向的。
在Java NIO中,若是兩個通道中有一個是FileChannel,那你能夠直接將數據從一個channel傳輸到另一個channel。對應的api是 transferFrom() 跟transferTo()。
buffer:
與Java基本類型相對應,NIO提供了多種 Buffer 類型,如ByteBuffer、CharBuffer、IntBuffer等,區別就是讀寫緩衝區時的單位長度不同(以對應類型的變量爲單位進行讀寫)。Buffer中有3個很重要的變量,它們是理解Buffer工做機制的關鍵,分別是capacity (總容量),position (指針當前位置),limit (讀/寫邊界位置)。
Buffer的工做方式跟C語言裏的字符數組很是的像,類比一下,capacity就是數組的總長度,position就是咱們讀/寫字符的下標變量,limit就是結束符的位置。Buffer初始時3個變量的狀況以下圖:
在對Buffer進行讀/寫的過程當中,position會日後移動,而 limit 就是 position 移動的邊界。由此不難想象,在對Buffer進行寫入操做時,limit應當設置爲capacity的大小,而對Buffer進行讀取操做時,limit應當設置爲數據的實際結束位置。(注意:將Buffer數據 寫入 通道是Buffer 讀取 操做,從通道 讀取 數據到Buffer是Buffer 寫入 操做)
在對Buffer進行讀/寫操做前,咱們能夠調用Buffer類提供的一些輔助方法來正確設置 position 和 limit 的值,主要有以下幾個:
java 層面中Buffer是一個頂層抽象類,咱們須要先了解一下經常使用的實現進行數據的編/解碼:
public class BufferDemo { public static void decode(String str) throws UnsupportedEncodingException { // 開闢一個長度爲128的字節空間 ByteBuffer byteBuffer = ByteBuffer.allocate(128); //寫入數據 byteBuffer.put(str.getBytes("UTF-8")); //寫完數據之後要進行讀取,須要設置 limit 爲 position 的值,而後 position 置爲0。 byteBuffer.flip(); /*獲取utf8的編解碼器*/ Charset utf8 = Charset.forName("UTF-8"); CharBuffer charBuffer = utf8.decode(byteBuffer);/*對bytebuffer中的內容解碼*/ /*array()返回的就是內部的數組引用,編碼之後的有效長度是0~limit*/ char[] charArr = Arrays.copyOf(charBuffer.array(), charBuffer.limit()); System.out.println(charArr); } public static void encode(String str){ CharBuffer charBuffer = CharBuffer.allocate(128); charBuffer.append(str); charBuffer.flip(); /*對獲取utf8的編解碼器*/ Charset utf8 = Charset.forName("UTF-8"); ByteBuffer byteBuffer = utf8.encode(charBuffer); /*對charbuffer中的內容解碼*/ /*array()返回的就是內部的數組引用,編碼之後的有效長度是0~limit*/ byte[] bytes = Arrays.copyOf(byteBuffer.array(), byteBuffer.limit()); System.out.println(Arrays.toString(bytes)); } public static void main(String[] args) throws UnsupportedEncodingException { BufferDemo.decode("解碼測試"); BufferDemo.encode("編碼測試"); } }
再來看看 FileChannel 的簡單應用:
public class FileChannelDemo { public static void main(String[] args) throws Exception { /*-------從buffer往fileChannel中寫入數據-------------------------*/ File file =new File("D:/nio.data"); if(!file.exists()) {//判斷文件是否存在,不存在則建立 file.createNewFile(); } //獲取輸出流 FileOutputStream outputStream = new FileOutputStream(file); //從輸出流中獲取channel FileChannel writeFileChannel = outputStream.getChannel(); //開闢新的字節空間 ByteBuffer byteBuffer = ByteBuffer.allocate(128); //寫入數據 byteBuffer.put("fileChannel hello".getBytes("UTF-8")); //刷新指針 byteBuffer.flip(); //進行寫操做 writeFileChannel.write(byteBuffer); byteBuffer.clear(); outputStream.close(); writeFileChannel.close(); /*-------從fileChannel往buffer中寫入數據-------------------------*/ Path path = Paths.get("D:/nio.data"); FileChannel readFileChannel = FileChannel.open(path); ByteBuffer byteBuffer2 = ByteBuffer.allocate((int)readFileChannel.size()+1); Charset charset = Charset.forName("UTF-8"); readFileChannel.read(byteBuffer2); byteBuffer2.flip(); CharBuffer charBuffer = charset.decode(byteBuffer2); System.out.println(charBuffer.toString()); byteBuffer2.clear(); readFileChannel.close(); } }
selector:
Selector(選擇器)是一個特殊的組件,用於採集各個通道的狀態(或者說事件)。咱們先將通道註冊到選擇器,並設置好關心的事件,而後就能夠經過調用select()方法,靜靜地等待事件發生。通道有以下4個事件可供咱們監聽:
因爲若是用阻塞I/O,須要多線程(浪費內存),若是用非阻塞I/O,須要不斷重試(耗費CPU)。Selector的出現解決了這尷尬的問題,非阻塞模式下,經過Selector,咱們的線程只爲已就緒的通道工做,不用盲目的重試了。好比,當全部通道都沒有數據到達時,也就沒有Read事件發生,咱們的線程會在select()方法處被掛起,從而讓出了CPU資源。
結合上面的三大組件,來實現一下基本的NIO流程,服務端:
/*服務器端,:接收客戶端發送過來的數據並顯示, *服務器把上接收到的數據加上"echo from service:"再發送回去*/ public class ServiceSocketChannelDemo { public static class TCPEchoServer implements Runnable{ /*服務器地址*/ private InetSocketAddress localAddress; public TCPEchoServer(int port) throws IOException { this.localAddress = new InetSocketAddress(port); } @Override public void run(){ Charset utf8 = Charset.forName("UTF-8"); ServerSocketChannel ssc = null; Selector selector = null; Random rnd = new Random(); try { /*建立選擇器*/ selector = Selector.open(); /*建立服務器通道*/ ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); /*設置監聽服務器的端口,設置最大鏈接緩衝數爲100*/ ssc.bind(localAddress, 100); /*服務器通道只能對tcp連接事件感興趣*/ ssc.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e1) { System.out.println("server start failed"); return; } System.out.println("server start with address : " + localAddress); /*服務器線程被中斷後會退出*/ try{ while(!Thread.currentThread().isInterrupted()){ int n = selector.select(); if(n == 0){ continue; } Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> it = keySet.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); /*防止下次select方法返回已處理過的通道*/ it.remove(); /*若發現異常,說明客戶端鏈接出現問題,但服務器要保持正常*/ try{ /*ssc通道只能對連接事件感興趣*/ if(key.isAcceptable()){ /*accept方法會返回一個普統統道, 每一個通道在內核中都對應一個socket緩衝區*/ SocketChannel sc = ssc.accept(); sc.configureBlocking(false); /*向選擇器註冊這個通道和普統統道感興趣的事件,同時提供這個新通道相關的緩衝區*/ int interestSet = SelectionKey.OP_READ; sc.register(selector, interestSet, new Buffers(256, 256)); System.out.println("accept from " + sc.getRemoteAddress()); } /*(普通)通道感興趣讀事件且有數據可讀*/ if(key.isReadable()){ /*經過SelectionKey獲取通道對應的緩衝區*/ Buffers buffers = (Buffers)key.attachment(); ByteBuffer readBuffer = buffers.getReadBuffer(); ByteBuffer writeBuffer = buffers.gerWriteBuffer(); /*經過SelectionKey獲取對應的通道*/ SocketChannel sc = (SocketChannel) key.channel(); /*從底層socket讀緩衝區中讀入數據*/ sc.read(readBuffer); readBuffer.flip(); /*解碼顯示,客戶端發送來的信息*/ CharBuffer cb = utf8.decode(readBuffer); System.out.println(cb.array()); readBuffer.rewind(); /*準備好向客戶端發送的信息*/ /*先寫入"echo:",再寫入收到的信息*/ writeBuffer.put("echo from service:".getBytes("UTF-8")); writeBuffer.put(readBuffer); readBuffer.clear(); /*設置通道寫事件*/ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } /*通道感興趣寫事件且底層緩衝區有空閒*/ if(key.isWritable()){ Buffers buffers = (Buffers)key.attachment(); ByteBuffer writeBuffer = buffers.gerWriteBuffer(); writeBuffer.flip(); SocketChannel sc = (SocketChannel) key.channel(); int len = 0; while(writeBuffer.hasRemaining()){ len = sc.write(writeBuffer); /*說明底層的socket寫緩衝已滿*/ if(len == 0){ break; } } writeBuffer.compact(); /*說明數據所有寫入到底層的socket寫緩衝區*/ if(len != 0){ /*取消通道的寫事件*/ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } } }catch(IOException e){ System.out.println("service encounter client error"); /*若客戶端鏈接出現異常,從Seletcor中移除這個key*/ key.cancel(); key.channel().close(); } } Thread.sleep(rnd.nextInt(500)); } }catch(InterruptedException e){ System.out.println("serverThread is interrupted"); } catch (IOException e1) { System.out.println("serverThread selecotr error"); }finally{ try{ selector.close(); }catch(IOException e){ System.out.println("selector close failed"); }finally{ System.out.println("server close"); } } } } public static void main(String[] args) throws InterruptedException, IOException{ Thread thread = new Thread(new TCPEchoServer(8080)); thread.start(); Thread.sleep(100000); /*結束服務器線程*/ thread.interrupt(); } }
Buffers:
/*自定義Buffer類中包含讀緩衝區和寫緩衝區,用於註冊通道時的附加對象*/ public class Buffers { ByteBuffer readBuffer; ByteBuffer writeBuffer; public Buffers(int readCapacity, int writeCapacity){ readBuffer = ByteBuffer.allocate(readCapacity); writeBuffer = ByteBuffer.allocate(writeCapacity); } public ByteBuffer getReadBuffer(){ return readBuffer; } public ByteBuffer gerWriteBuffer(){ return writeBuffer; } }
客戶端:
/*客戶端:客戶端每隔1~2秒自動向服務器發送數據,接收服務器接收到數據並顯示*/ public class ClientSocketChannelDemo { public static class TCPEchoClient implements Runnable{ /*客戶端線程名*/ private String name; private Random rnd = new Random(); /*服務器的ip地址+端口port*/ private InetSocketAddress remoteAddress; public TCPEchoClient(String name, InetSocketAddress remoteAddress){ this.name = name; this.remoteAddress = remoteAddress; } @Override public void run(){ /*建立解碼器*/ Charset utf8 = Charset.forName("UTF-8"); Selector selector; try { /*建立TCP通道*/ SocketChannel sc = SocketChannel.open(); /*設置通道爲非阻塞*/ sc.configureBlocking(false); /*建立選擇器*/ selector = Selector.open(); /*註冊感興趣事件*/ int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; /*向選擇器註冊通道*/ sc.register(selector, interestSet, new Buffers(256, 256)); /*向服務器發起鏈接,一個通道表明一條tcp連接*/ sc.connect(remoteAddress); /*等待三次握手完成*/ while(!sc.finishConnect()){ ; } System.out.println(name + " " + "finished connection"); } catch (IOException e) { System.out.println("client connect failed"); return; } /*與服務器斷開或線程被中斷則結束線程*/ try{ int i = 1; while(!Thread.currentThread().isInterrupted()){ /*阻塞等待*/ selector.select(); /*Set中的每一個key表明一個通道*/ Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> it = keySet.iterator(); /*遍歷每一個已就緒的通道,處理這個通道已就緒的事件*/ while(it.hasNext()){ SelectionKey key = it.next(); /*防止下次select方法返回已處理過的通道*/ it.remove(); /*經過SelectionKey獲取對應的通道*/ Buffers buffers = (Buffers)key.attachment(); ByteBuffer readBuffer = buffers.getReadBuffer(); ByteBuffer writeBuffer = buffers.gerWriteBuffer(); /*經過SelectionKey獲取通道對應的緩衝區*/ SocketChannel sc = (SocketChannel) key.channel(); /*表示底層socket的讀緩衝區有數據可讀*/ if(key.isReadable()){ /*從socket的讀緩衝區讀取到程序定義的緩衝區中*/ sc.read(readBuffer); readBuffer.flip(); /*字節到utf8解碼*/ CharBuffer cb = utf8.decode(readBuffer); /*顯示接收到由服務器發送的信息*/ System.out.println(cb.array()); readBuffer.clear(); } /*socket的寫緩衝區可寫*/ if(key.isWritable()){ writeBuffer.put((name + " " + i).getBytes("UTF-8")); writeBuffer.flip(); /*將程序定義的緩衝區中的內容寫入到socket的寫緩衝區中*/ sc.write(writeBuffer); writeBuffer.clear(); i++; } } Thread.sleep(1000 + rnd.nextInt(1000)); } }catch(InterruptedException e){ System.out.println(name + " is interrupted"); }catch(IOException e){ System.out.println(name + " encounter a connect error"); }finally{ try { selector.close(); } catch (IOException e1) { System.out.println(name + " close selector failed"); }finally{ System.out.println(name + " closed"); } } } } public static void main(String[] args) throws InterruptedException{ InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 8080); Thread ta = new Thread(new TCPEchoClient("thread a", remoteAddress)); ta.start(); Thread.sleep(5000); /*結束客戶端a*/ ta.interrupt(); } }