轉載請註明出處:http://www.cnblogs.com/Joanna-Yan/p/7793964.html html
前面講到:Java IO編程全解(三)——僞異步IO編程java
NIO,即New I/O,這是官方叫法,由於它相對於以前的I/O類庫是新增的。可是,因爲以前老的I/O類庫是阻塞I/O,New I/O類庫的目標就是要讓Java支持非阻塞I/O,因此,更多的人喜歡稱之爲非阻塞I/O(Non-block I/O),因爲非阻塞I/O更可以體現NIO的特色,因此這裏使用的NIO都是指非阻塞I/O。編程
與Socket類和ServerSocket類相對應,NIO也提供了SocketChannel和ServerSocketChannel兩種不一樣的套接字通道實現。這兩種新增的通道都支持阻塞和非阻塞兩種模式。阻塞模式使用很是簡單,可是性能和可靠性都很差,非阻塞則正好相反。開發人員通常能夠根據本身的須要來選擇合適的模式,通常來講,低負載、低併發的應用程序能夠選擇同步阻塞I/O以下降編程複雜度,可是對於高負載、高併發的網絡應用,須要使用NIO的非阻塞模式進行開發。數組
新的輸入/輸出(NIO)庫是在JDK1.4中引入的。NIO彌補了原來同步阻塞I/O的不足,它在標準Java代碼中提供了高速的、面向塊的I/O。經過定義包含數據的類,以及經過以塊的形式處理這些數據,NIO不使用本機代碼就能夠利用低級優化,這是原來的I/O包所沒法作到的。下面對NIO的一些概念和功能作下簡單介紹,以便你們可以快速地瞭解NIO類庫和相關概念。服務器
1.緩衝區Buffer微信
Buffer是一個對象,它包含一些要寫入或者要讀出的數據。在NIO類庫中加入Buffer對象,體現了新庫與原I/O的一個重要區別。在面向流的I/O中,能夠將數據直接寫入或者將數據直接讀到Stream對象中。網絡
在NIO庫中,全部數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,寫入到緩衝區中。任什麼時候候訪問NIO中的數據,都是經過緩衝區進行操做。併發
緩衝區實質上是一個數組。一般它是一個字節數組(ByteBuffer),也可使用其餘種類的數組。可是緩衝區不只僅是一個數組,緩衝區提供了對數據的結構化訪問以及維護讀寫位置(limit)等信息。異步
最經常使用的緩衝區是ByteBuffer,一個ByteBuffer提供了一組功能用於操做byte數組。除了ByteBuffer,還有其餘的一些緩衝區,事實上,每一種Java基本類型(除了Boolean類型)都對應有一種緩衝區,具體以下:socket
每個Buffer類都是Buffer接口的一個子實例。除了ByteBuffer,每個Buffer類都有徹底同樣的操做,只是它們所處理的數據類型不同。由於大多數標準I/O操做都是使用ByteBuffer,因此它除了具備通常緩衝區的操做以外還提供一些特有的操做,方便網絡讀寫。
2.通道Channel
Channel是一個通道,能夠經過它讀取和寫入數據,它就像自來水管同樣,網絡數據經過Channel讀取和寫入。通道與流的不一樣之處在於通道是雙向的,流只是在一個方向上移動(一個流必須是InputStream或者OutputStream的子類),並且通道能夠用於讀、寫或者同時讀寫。由於Channel是全雙工的,因此它能夠比流更好地映射底層操做系統的API。
3.多路複用器Selector
多路複用器Selector是Java NIO編程的基礎,熟練地掌握Selector對於掌握NIO編程相當重要。多路複用器提供選擇已經就緒的任務的能力。簡單來說,Selector會不斷地輪詢註冊在其上的Channel,若是某個Channel上面有新的TCP鏈接接入、讀和寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,而後經過SelectionKey能夠獲取就緒Channel的集合,進行後續的I/O操做。
一個多路複用器Selector能夠同時輪詢多個Channel,因爲JDK使用了epoll()代替傳統的select實現,因此它並無最大鏈接句柄1024/2048的限制。這也就意味着只須要一個線程負責Selector的輪詢,就能夠接入成千上萬的客戶端,這確實是個很是巨大的進步。
NIO服務端通訊序列圖以下圖所示:
下面,咱們對NIO服務端的主要建立過程進行講解和說明,做爲NIO的基礎入門,咱們將忽略掉一些在生產環境中部署所須要的一些特性和功能。
步驟一:打開ServerSocketChannel,用於監聽客戶端的鏈接,它是全部客戶端鏈接的父管道。
ServerSocketChannel acceptorSvr=ServerSocketChannel.open();
步驟二:綁定監聽端口,設置鏈接爲非阻塞模式。
acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"),port)); acceptorSvr.configureBlocking(false);
步驟三:建立Reactor線程,建立多路複用器並啓動線程。
Selector selector=Selector.open(); New Thread(new ReactorTask()).start();
步驟四:將ServerSocketChannel註冊到Reactor線程的多路複用器Selector上,監聽ACCEPT事件。
SelectionKey key=acceptorSvr.register(selector,SelectionKey.OP_ACCEPT,ioHandler);
步驟五:多路複用器在線程run方法的無線循環體內輪詢準備就緒的Key。
int num=selector.select(); Set selectedKeys=selector.selectedKeys(); Iterator it=selectedKeys.iterator(); while(it.hasNext()){ SelectionKey key=(SelectionKey )it.next(); //...deal with I/O event... }
步驟六:多路複用器監聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手,創建物理鏈路。
SocketChannel channel=svrChannel.accpet();
步驟七:設置客戶端鏈路爲非阻塞模式。
channel.configureBlocking(false); channel.socket().setReuseAddress(true); ......
步驟八:將新接入的客戶端鏈接註冊到Reactor線程的多路複用器,監聽讀操做,用來讀取客戶端發送的網絡消息。
SelectionKey key=socketChannel.register(selector,SelectionKey.OP_READ,ioHandler);
步驟九:異步讀取客戶端請求消息到緩衝區。
int readNumber=channel.read(receivedBuffer);
步驟十:對ByteBuffer進行編解碼,若是有半包消息指針reset,繼續讀取後續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排。
Object message=null; while(buffer.hasRemain()){ byteBuffer.mark(); Object message=decode(byteBuffer); if(message==null){ byteBuffer.reset(); break; } messageList.add(message); } if(!byteBuffer.hasRemain()){ byteBuffer.clear(); }else{ byteBuffer.compact(); } if(messageList!=null& !messageList.isEmpty()){ for(Object messageE: messageList){ handlerTask(messageE); } }
步驟十一:將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端。
socketChannel.write(buffer);
注意:若是發送區TCP緩衝區滿,會致使寫半包,此時,須要註冊監聽寫操做位,循環寫,直到整包消息寫入TCP緩衝區。
當咱們瞭解建立NIO服務端的基本步驟以後,下面咱們將前面的時間服務器程序經過NIO重寫一遍,讓你們可以學習到完整版的NIO服務端建立。
package joanna.yan.nio; public class TimeServer { public static void main(String[] args) { int port=9090; if(args!=null&&args.length>0){ try { port=Integer.valueOf(args[0]); } catch (Exception e) { // 採用默認值 } } MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port); new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); } }
package joanna.yan.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Set; /** * 多路複用類 * 它是一個獨立的線程,負責輪詢多路復器Selector,能夠處理多個客戶端的併發接入。 * @author Joanna.Yan * @date 2017年11月6日下午3:51:41 */ public class MultiplexerTimeServer implements Runnable{ private Selector selector;//多路複用器 private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路複用器、綁定監聽端口 * @param port */ public MultiplexerTimeServer(int port){ try { selector=Selector.open(); servChannel=ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port: "+port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop(){ this.stop=true; }
@Override public void run() { while(!stop){ try { //設置selector的休眠時間爲1s,不管是否有讀寫等事件發生,selector每隔1s都被喚醒一次。 selector.select(1000); //當有處於就緒狀態的Channel時,selector就返回就緒狀態的Channel的SelectionKey集合。 Set<SelectionKey> selectedKeys=selector.selectedKeys(); Iterator<SelectionKey> it=selectedKeys.iterator(); SelectionKey key=null; //經過對就緒狀態的Channel集合進行迭代,能夠進行網絡的異步讀寫操做。 while(it.hasNext()){ key=it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } /* * 多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,因此不須要重複釋放資源。 */ if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //處理新接入的請求消息 //經過SelectionKey的操做位進行判斷便可獲知網絡事件類型 if(key.isAcceptable()){ //Accept the new connection ServerSocketChannel ssc=(ServerSocketChannel) key.channel(); SocketChannel sc=ssc.accept(); //-----以上操做至關於完成了TCP的三次握手,TCP物理鏈路正式創建------ //將新建立的SocketChannel設置爲異步非阻塞,同時也能夠對其TCP參數進行設置,例如TCP接收和發送緩衝區的大小等。 sc.configureBlocking(false); //Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if(key.isReadable()){ //Read the data SocketChannel sc=(SocketChannel) key.channel(); //因爲實現咱們得知客戶端發送的碼流大小,做爲例程,咱們開闢一個1K的緩衝區 ByteBuffer readBuffer=ByteBuffer.allocate(1024); //因爲已經設置SocketChannel爲異步非阻塞模式,所以它的read是非阻塞的。 int readBytes=sc.read(readBuffer); /* * readBytes>0 讀到了字節,對字節進行編解碼; * readBytes=0 沒有讀取到字節,屬於正常場景,忽略; * readByte=-1 鏈路已經關閉,須要關閉SocketChannel,釋放資源 */ if(readBytes>0){ //將緩衝區當前的limit設置爲position,position設置爲0,用於後續對緩衝區的讀取操做。 readBuffer.flip(); //根據緩衝區可讀的字節個數建立字節數組 byte[] bytes=new byte[readBuffer.remaining()]; //調用ByteBuffer的get操做將緩衝區可讀的字節數組複製到新建立愛你的字節數組中 readBuffer.get(bytes); String body=new String(bytes, "UTF-8"); System.out.println("The time server receive order: "+body); //若是請求指令是"QUERY TIME ORDER"則把服務器的當前時間編碼後返回給客戶端 String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc,currentTime); }else if(readBytes<0){ //對端鏈路關閉 key.cancel(); sc.close(); }else{ //讀到0字節,忽略 } } } } private void doWrite(SocketChannel channel,String response) throws IOException{ if(response!=null&& response.trim().length()>0){ byte[] bytes=response.getBytes(); ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length); //調用ByteBuffer的put操做將字節數組複製到緩衝區 writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); /* * 須要指出的是,因爲SocketChannel是異步非阻塞的,它並不保證一次性可以把須要發送的字節數組發送完, * 此時會出現「寫半包」問題,咱們須要註冊寫操做,不斷輪詢Selector,將沒有發送完畢的ByteBuffer發送完畢, * 能夠經過ByteBuffer的hasRemaining()方法判斷消息是否發送完成。 * 此處僅僅是各簡單的入門級例程,沒有演示如何處理「寫半包」場景,後面會說到。 */ } } }
NIO客戶端建立序列圖如圖所示。
步驟一:打開SocketChannel,綁定客戶端本地地址(可選,默認系統會隨機分配一個可用的本地地址)
SocketChannel clientChannel=SocketChannel.open();
步驟二:設置SocketChannel爲非阻塞模式,同時設置客戶端鏈接的TCP參數。
clientChannel.configureBlocking(false); socket.setReuseAddress(true); socket.setReceiveBufferSize(BUFFER_SIZE); socket.setSendBufferSize(BUFFER_SIZE);
步驟三:異步鏈接服務端。
boolean connected=clientChannel.connect(new InetSocketAddress("ip",port));
步驟四:判斷是否鏈接成功,若是鏈接成功,則直接註冊讀狀態位到多路複用器中,若是當前沒有鏈接成功(異步鏈接,返回false,說明客戶端已經發送sync包,服務端沒有返回ack包,物理鏈路尚未創建)。
if(connected){ clientChannel.register(selector,SelectionKey.OP_READ,ioHandler); }else{ clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler); }
步驟五:向Reactor線程的多路複用器註冊OP_CONNECT狀態位,監聽服務端的TCP ACK應答。
clientChannel.register(selector,SelectionKay.OP_CONNECT,ioHandler);
步驟六:建立Reactor線程,建立多路複用器並啓動線程。
Selector selector=Selector.open(); new Thread(new ReactorTask()).start();
步驟七:多路複用器在線程run方法的無限循環體內輪詢準備就緒的key。
int num=selector.select(); Set selectedKeys=selector.selectedKeys(); Iterator it=selectedKeys.iterator(); while(it.hasNext()){ SelectionKey key=(SelectionKey)it.next(); //...deal with I/O event... }
步驟八:接收connect事件進行處理。
if(key.isConnectable()){ //handlerConnect(); }
步驟九:判斷鏈接結果,若是鏈接成功,註冊讀事件到多路複用器。
if(channel.finishConnect()){ registerRead(); }
步驟十:註冊讀事件到多路複用器。
clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);
步驟十一:異步讀客戶端請求消息到緩衝區。
int readNumber=channel.read(receivedBuffer);
步驟十二:對ByteBuffer進行編解碼,若是有半包消息接收緩衝區Reset,繼續讀取後續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排。
Object message=null; while(buffer.hasRemain()){ byteBuffer.mark(); Object message=decode(byteBuffer); if(message==null){ byteBuffer.reset(); break; } messageList.add(message); } if(!byteBuffer.hasRemain()){ byteBuffer.clear(); }else{ byteBuffer.compact(); } if(messageList!=null & !messageList.isEmpty()){ for(Object messageE:messageList){ handlerTask(messageE); } }
步驟十三:將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端。
socketChannel.wirte(buffer);
package joanna.yan.nio; public class TimeClient { public static void main(String[] args) { int port=9090; if(args!=null&&args.length>0){ try { port=Integer.valueOf(args[0]); } catch (Exception e) { // 採用默認值 } } new Thread(new TimeClientHandle("127.0.0.1", port),"TimClient-001").start(); } }
package joanna.yan.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * 處理異步鏈接和讀寫操做 * @author Joanna.Yan * @date 2017年11月6日下午4:33:14 */ public class TimeClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; /** * 初始化NIO的多路複用器和SocketChannel對象 * @param host * @param port */ public TimeClientHandle(String host,int port){ this.host=host==null ? "127.0.0.1" : host; this.port=port; try { selector=Selector.open(); socketChannel=SocketChannel.open(); //設置爲異步非阻塞模式,同時還能夠設置SocketChannel的TCP參數。例如接收和發送的TCP緩衝區大小 socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } }
@Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while(!stop){ try { selector.select(1000); Set<SelectionKey> selectedKeys=selector.selectedKeys(); Iterator<SelectionKey> it=selectedKeys.iterator(); SelectionKey key=null; while(it.hasNext()){//輪詢多路複用器Selector,當有就緒的Channel時 key=it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); System.exit(1); } } //多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動註冊並關閉,因此不須要重複釋放資源。 /* * 因爲多路複用器上可能註冊成千上萬的Channel或者pipe,若是一一對這些資源進行釋放顯然不合適。 * 所以,JDK底層會自動釋放全部跟此多路複用器關聯的資源。 */ if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } //多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動註冊並關閉,因此不須要重複釋放資源。 /* * 因爲多路複用器上可能註冊成千上萬的Channel或者pipe,若是一一對這些資源進行釋放顯然不合適。 * 所以,JDK底層會自動釋放全部跟此多路複用器關聯的資源。 */ if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws ClosedChannelException, IOException { if(key.isValid()){ //判斷是否鏈接成功 SocketChannel sc=(SocketChannel) key.channel(); if(key.isConnectable()){//處於鏈接狀態,說明服務器已經返回ACK應答消息 if(sc.finishConnect()){//對鏈接結果進行判斷 /* * 將SocketChannel註冊到多路複用器上,註冊SelectionKey.OP_READ操做位, * 監聽網絡讀操做,而後發送請求消息給服務端。 */ sc.register(selector, SelectionKey.OP_READ); doWrite(sc); }else{ System.exit(1);//鏈接失敗,進程退出 } } if(key.isReadable()){ //開闢緩衝區 ByteBuffer readBuffer=ByteBuffer.allocate(1024); //異步讀取 int readBytes=sc.read(readBuffer); if(readBytes>0){ readBuffer.flip(); byte[] bytes=new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body=new String(bytes, "UTF-8"); System.out.println("Now is: "+body); this.stop=true; }else if(readBytes<0){ //對端鏈路關閉 key.cancel(); sc.close(); }else{ //讀到0字節,忽略 } } } } private void doConnect() throws IOException { //若是直接鏈接成功,則將SocketChannel註冊到多路複用器Selector上,發送請求消息,讀應答 if(socketChannel.connect(new InetSocketAddress(host, port))){ socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else{ /* * 若是沒有直接鏈接成功,則說明服務端沒有返回TCP握手應答信息,但這並不表明鏈接失敗, * 咱們須要將SocketChannel註冊到多路複用器Selector上,註冊SelectionKey.OP_CONNECT, * 當服務端返回TCP syn-ack消息後,Selector就能輪詢到整個SocketChannel處於鏈接就緒狀態。 */ socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel sc) throws IOException { byte[] req="QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer=ByteBuffer.allocate(req.length); //寫入到發送緩衝區中 writeBuffer.put(req); writeBuffer.flip(); //因爲發送是異步的,因此會存在"半包寫"問題,此處不贅述 sc.write(writeBuffer); if(!writeBuffer.hasRemaining()){//若是緩衝區中的消息所有發送完成 System.out.println("Send order 2 server succeed."); } } }
經過源碼對比分析發現,NIO編程難度確實比同步阻塞BIO大不少,此處咱們的NIO例程並無考慮「半包讀」和「半包寫」,若是加上這些,代碼會更加複雜。NIO代碼既然這麼複雜,爲何它的應用卻愈來愈普遍呢,使用NIO編程的優勢總結以下:
JDK1.7升級了NIO類庫,升級後的NIO類庫被稱爲NIO 2.0。引入注目的是,Java正式提供了異步文件I/O操做,同時提供了與UNIX網絡編程事件驅動I/O對應的AIO。
若是此文對您有幫助,微信打賞我一下吧~