阻塞與非阻塞是描述進程在訪問某個資源時,數據是否準備就緒的的一種處理方式。當數據沒有準備就緒時:java
阻塞:線程持續等待資源中數據準備完成,直到返回響應結果。數組
非阻塞:線程直接返回結果,不會持續等待資源準備數據結束後才響應結果。緩存
同步與異步是指訪問數據的機制,同步通常指主動請求並等待IO操做完成的方式。服務器
異步則指主動請求數據後即可以繼續處理其它任務,隨後等待IO操做完畢的通知。網絡
老王燒開水:異步
一、普通水壺煮水,站在旁邊,主動的看水開了沒有?同步的阻塞socket
二、普通水壺煮水,去幹點別的事,每過一段時間去看看水開了沒有,水沒開就走人。 同步非阻塞ide
三、響水壺煮水,站在旁邊,不會每過一段時間主動看水開了沒有。若是水開了,水壺自動通知他。 異步阻塞測試
四、響水壺煮水,去幹點別的事,若是水開了,水壺自動通知他。異步非阻塞this
傳統BIO是一種同步的阻塞IO,IO在進行讀寫時,該線程將被阻塞,線程沒法進行其它操做。
IO流在讀取時,會阻塞。直到發生如下狀況:一、有數據能夠讀取。二、數據讀取完成。三、發生異常。
以傳統BIO模型爲基礎,經過線程池的方式維護全部的IO線程,實現相對高效的線程開銷及管理。
NIO(JDK1.4)模型是一種同步非阻塞IO,主要有三大核心部分:Channel(通道),Buffer(緩衝區), Selector(多路複用器)。傳統IO基於字節流和字符流進行操做,而NIO基於Channel和Buffer(緩衝區)進行操做,數據老是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。Selector(多路複用器)用於監聽多個通道的事件(好比:鏈接打開,數據到達)。所以,單個線程能夠監聽多個數據通道。
NIO和傳統IO(一下簡稱IO)之間第一個最大的區別是,IO是面向流的,NIO是面向緩衝區的。 Java IO面向流意味着每次從流中讀一個或多個字節,直至讀取全部字節,它們沒有被緩存在任何地方。此外,它不能先後移動流中的數據。若是須要先後移動從流中讀取的數據,須要先將它緩存到一個緩衝區。NIO的緩衝導向方法略有不一樣。數據讀取到一個它稍後處理的緩衝區,須要時可在緩衝區中先後移動。這就增長了處理過程當中的靈活性。可是,還須要檢查是否該緩衝區中包含全部您須要處理的數據。並且,需確保當更多的數據讀入緩衝區時,不要覆蓋緩衝區裏還沒有處理的數據。
IO的各類流是阻塞的。這意味着,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些數據被讀取,或數據徹底寫入。該線程在此期間不能再幹任何事情了。 NIO的非阻塞模式,使一個線程從某通道發送請求讀取數據,可是它僅能獲得目前可用的數據,若是目前沒有數據可用時,就什麼都不會獲取。而不是保持線程阻塞,因此直至數據變的能夠讀取以前,該線程能夠繼續作其餘的事情。 非阻塞寫也是如此。一個線程請求寫入一些數據到某通道,但不須要等待它徹底寫入,這個線程同時能夠去作別的事情。 線程一般將非阻塞IO的空閒時間用於在其它通道上執行IO操做,因此一個單獨的線程如今能夠管理多個輸入和輸出通道(channel)。
NIO優勢:
傳統IO操做對read()或write()方法的調用,可能會由於沒有數據可讀/可寫而阻塞,直到有數據響應。也就是說讀寫數據的IO調用,可能會無限期的阻塞等待,效率依賴網絡傳輸的速度。最重要的是在調用一個方法前,沒法知道是否會被阻塞。
NIO的Channel抽象了一個重要特徵就是能夠經過配置它的阻塞行爲,來實現非阻塞式的通道。
Channel是一個雙向通道,與傳統IO操做只容許單向的讀寫不一樣的是,NIO的Channel容許在一個通道上進行讀和寫的操做。
FileChannel:文件
SocketChannel:
ServerSocketChannel:
DatagramChannel: UDP
Bufer顧名思義,它是一個緩衝區,其實是一個容器,一個連續數組。Channel提供從文件、網絡讀取數據的渠道,可是讀寫的數據都必須通過Buffer。
Buffer緩衝區本質上是一塊能夠寫入數據,而後能夠從中讀取數據的內存。這塊內存被包裝成NIO Buffer對象,並提供了一組方法,用來方便的訪問該模塊內存。爲了理解Buffer的工做原理,須要熟悉它的三個屬性:capacity、position和limit。
position和limit的含義取決於Buffer處在讀模式仍是寫模式。無論Buffer處在什麼模式,capacity的含義老是同樣的。見下圖:
capacity:做爲一個內存塊,Buffer有固定的大小值,也叫做「capacity」,只能往其中寫入capacity個byte、long、char等類型。一旦Buffer滿了,須要將其清空(經過讀數據或者清楚數據)才能繼續寫數據。
position:當你寫數據到Buffer中時,position表示當前的位置。出事的position值爲0,當寫入一個字節數據到Buffer中後,position會向前移動到下一個可插入數據的Buffer單元。position最大可爲capacity-1。當讀取數據時,也是從某個特定位置讀,講Buffer從寫模式切換到讀模式,position會被重置爲0。當從Buffer的position處讀取一個字節數據後,position向前移動到下一個可讀的位置。
limit:在寫模式下,Buffer的limit表示你最多能往Buffer裏寫多少數據。 寫模式下,limit等於Buffer的capacity。當切換Buffer到讀模式時, limit表示你最多能讀到多少數據。所以,當切換Buffer到讀模式時,limit會被設置成寫模式下的position值。換句話說,你能讀到以前寫入的全部數據(limit被設置成已寫數據的數量,這個值在寫模式下就是position)
Buffer的分配:對Buffer對象的操做必須首先進行分配,Buffer提供一個allocate(int capacity)方法分配一個指定字節大小的對象。
向Buffer中寫數據:寫數據到Buffer中有兩種方式:
一、從channel寫到Buffer
int bytes = channel.read(buf); //將channel中的數據讀取到buf中
二、經過Buffer的put()方法寫到Buffer
buf.put(byte); //將數據經過put()方法寫入到buf中
flip()方法:將Buffer從寫模式切換到讀模式,調用flip()方法會將position設置爲0,並將limit設置爲以前的position的值。
從Buffer中讀數據:從Buffer中讀數據有兩種方式:
一、從Buffer讀取數據到Channel
int bytes = channel.write(buf); //將buf中的數據讀取到channel中
二、經過Buffer的get()方法讀取數據
byte bt = buf.get(); //從buf中讀取一個byte
rewind()方法:Buffer.rewind()方法將position設置爲0,使得能夠重讀Buffer中的全部數據,limit保持不變。
clear()與compact()方法:一旦讀完Buffer中的數據,須要讓Buffer準備好再次被寫入,能夠經過clear()或compact()方法完成。若是調用的是clear()方法,position將被設置爲0,limit設置爲capacity的值。可是Buffer並未被清空,只是經過這些標記告訴咱們能夠從哪裏開始往Buffer中寫入多少數據。若是Buffer中還有一些未讀的數據,調用clear()方法將被"遺忘 "。compact()方法將全部未讀的數據拷貝到Buffer起始處,而後將position設置到最後一個未讀元素的後面,limit屬性依然設置爲capacity。可使得Buffer中的未讀數據還能夠在後續中被使用。
mark()與reset()方法:經過調用Buffer.mark()方法能夠標記一個特定的position,以後能夠經過調用Buffer.reset()恢復到這個position上。
Selector與Channel是相互配合使用的,將Channel註冊在Selector上以後,才能夠正確的使用Selector,但此時Channel必須爲非阻塞模式。Selector能夠監聽Channel的四種狀態(Connect、Accept、Read、Write),當監聽到某一Channel的某個狀態時,才容許對Channel進行相應的操做。
測試代碼
/** * 服務端 */ public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean stop; public MultiplexerTimeServer(int port) { try { //打開ServerSocketChannel serverChannel = ServerSocketChannel.open(); //設置爲非阻塞模式 serverChannel.configureBlocking(false); //綁定監聽的端口地址 serverChannel.socket().bind(new InetSocketAddress(port), 1024); //建立Selector線程 selector = Selector.open(); //將ServerSocketChannel註冊到Selector,交給Selector監聽 serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port:"+port); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } public void stop(){ this.stop = true; } @Override public void run() { while(!stop){ try { //經過Selector循環準備就緒的Key selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); SelectionKey selectionKey = null; while(iterator.hasNext()){ selectionKey = iterator.next(); iterator.remove(); try { handleInput(selectionKey); } catch (Exception e) { if(selectionKey!=null){ selectionKey.cancel(); if(selectionKey.channel()!=null){ selectionKey.channel().close(); } } } } } catch (Exception e) { e.printStackTrace(); } } if(selector !=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey selectionKey) throws IOException { if(selectionKey.isValid()){ if (selectionKey.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); //多路複用器監聽到新的客戶端鏈接,處理鏈接請求,完成TCP三次握手。 SocketChannel client = server.accept(); //設置爲非阻塞模式 client.configureBlocking(false); // 將新鏈接註冊到多路複用器上,監聽其讀操做,讀取客戶端發送的消息。 client.register(selector, SelectionKey.OP_READ); } if(selectionKey.isReadable()){ SocketChannel client = (SocketChannel) selectionKey.channel(); ByteBuffer receivebuffer = ByteBuffer.allocate(1024); //讀取客戶端請求消息到緩衝區 int count = client.read(receivebuffer); //非阻塞 if (count > 0) { receivebuffer.flip(); byte[] bytes = new byte[receivebuffer.remaining()]; //remaining()方法 //從緩衝區讀取消息 receivebuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body); //將currentTime響應給客戶端(客戶端Channel) String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(client, currentTime); }else if(count < 0){ selectionKey.channel(); client.close(); }else{ } } } } private void doWrite(SocketChannel client, String currentTime) throws IOException { if(currentTime != null && currentTime.trim().length()>0){ ByteBuffer sendbuffer = ByteBuffer.allocate(1024); sendbuffer.put(currentTime.getBytes()); sendbuffer.flip(); //將客戶端響應消息寫入到客戶端Channel中。 client.write(sendbuffer); System.out.println("服務器端向客戶端發送數據--:" + currentTime); }else{ System.out.println("沒有數據"); } } }
/** * 客戶端 */ public class TimeClientHandler implements Runnable { private String host; private int port; private SocketChannel socketChannel; private Selector selector; private volatile boolean stop; public TimeClientHandler(String host, int port) { this.host = host; this.port = port; try { //打開SocketChannel socketChannel = SocketChannel.open(); //建立Selector線程 selector = Selector.open(); //設置爲非阻塞模式 socketChannel.configureBlocking(false); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } @Override public void run() { try { doConnect(); } catch (Exception e) { e.printStackTrace(); System.exit(1); } while(!stop){ //輪訓通道的狀態 try { selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); SelectionKey selectionKey = null; while(iterator.hasNext()){ selectionKey = iterator.next(); iterator.remove(); try { handleInput(selectionKey); } catch (Exception e) { if(selectionKey!=null){ selectionKey.cancel(); if(selectionKey.channel()!=null){ selectionKey.channel().close(); } } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } if(selector !=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey selectionKey) throws Exception { if(selectionKey.isValid()){ SocketChannel client = (SocketChannel) selectionKey.channel(); if (selectionKey.isConnectable()){ if(client.finishConnect()){ client.register(selector, SelectionKey.OP_READ); doWrite(client); }else{ System.exit(1); } } if (selectionKey.isReadable()) { ByteBuffer receivebuffer = ByteBuffer.allocate(1024); int count = client.read(receivebuffer); if (count > 0) { receivebuffer.flip(); byte[] bytes = new byte[receivebuffer.remaining()]; //remaining()方法 receivebuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is "+body); this.stop = true; }else if(count < 0){ selectionKey.channel(); client.close(); }else{ } } } } private void doConnect() throws Exception { //鏈接服務端 boolean connect = socketChannel.connect(new InetSocketAddress(host, port)); //判斷是否鏈接成功,若是鏈接成功,則監聽Channel的讀狀態。 if(connect){ socketChannel.register(selector, SelectionKey.OP_READ); //寫數據 寫給服務端 doWrite(socketChannel); }else{ //若是沒有鏈接成功,則向多路複用器註冊Connect狀態 socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel channel) throws IOException { ByteBuffer sendbuffer = ByteBuffer.allocate(1024); sendbuffer.put("QUERY TIME ORDER".getBytes()); sendbuffer.flip(); //向Channel中寫入客戶端的請求指令 寫到服務端 channel.write(sendbuffer); if(!sendbuffer.hasRemaining()){ System.out.println("Send order to server succeed."); } } }
/** * 運行服務端 **/ public class TimeServer { public static void main(String[] args) { int port=8080; //服務端默認端口 MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port); new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); } } /** * 運行客戶端 **/ public class TimeServerClient { public static void main(String[] args) { int port=8080; //服務端默認端口 new Thread(new TimeClientHandler("127.0.0.1", port), "NIO-TimeServerClient-001").start(); } }