一、reactor(反應器)模式html
使用單線程模擬多線程,提升資源利用率和程序的效率,增長系統吞吐量。下面例子比較形象的說明了什麼是反應器模式:java
一個老闆經營一個飯店,react
傳統模式 - 來一個客人安排一個服務員招呼,客人很滿意;(至關於一個鏈接一個線程)服務器
後來客人愈來愈多,須要的服務員愈來愈多,資源條件不足以再請更多的服務員了,傳統模式已經不能知足需求。老闆之因此爲老闆天然有過人之處,老闆發現,服務員在爲客人服務時,當客人點菜的時候,服務員基本處於等待狀態,(阻塞線程,不作事)。網絡
因而乎就讓服務員在客人點菜的時候,去爲其餘客人服務,當客人菜點好後再招呼服務員便可。 --反應器(reactor)模式誕生了多線程
飯店的生意紅紅火火,幾個服務員就足以支撐大量的客流量,老闆用有限的資源賺了更多的money~~~~^_^app
二、NIO中的重要概念 通道、緩衝區、選擇器異步
通道:相似於流,可是能夠異步讀寫數據(流只能同步讀寫),通道是雙向的,(流是單向的),通道的數據老是要先讀到一個buffer 或者 從一個buffer寫入,即通道與buffer進行數據交互。socket
通道類型: ide
FileChannel比較特殊,它能夠與通道進行數據交互, 不能切換到非阻塞模式,套接字通道能夠切換到非阻塞模式;
緩衝區 - 本質上是一塊能夠存儲數據的內存,被封裝成了buffer對象而已!
緩衝區類型:
經常使用方法:
緩衝區的一些屬性:
切換到讀模式時,position會被置爲0,表示當前讀的位置
選擇器:至關於一個觀察者,用來監聽通道感興趣的事件,一個選擇器能夠綁定多個通道;
通道向選擇器註冊時,須要指定感興趣的事件,選擇器支持如下事件:
若是你對不止一種事件感興趣,那麼能夠用「位或」操做符將常量鏈接起來,以下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
通道向選擇器註冊時,會返回一個 SelectionKey對象,具備以下屬性
用「位與」操做interest 集合和給定的SelectionKey常量,能夠肯定某個肯定的事件是否在interest 集合中。
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
ready 集合是通道已經準備就緒的操做的集合。在一次選擇(Selection)以後,你會首先訪問這個ready set。Selection將在下一小節進行解釋。能夠這樣訪問ready集合:
int readySet = selectionKey.readyOps();
也可使用如下四個方法獲取已就緒事件,返回值爲boolean:
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
能夠將一個對象或者更多信息附着到SelectionKey上,即記錄在附加對象上,方法以下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
能夠經過選擇器的select方法獲取是否有就緒的通道;
返回值表示上次執行select以後,就緒通道的個數。
能夠經過selectedKeySet獲取已就緒的通道。返回值是SelectionKey 的集合,處理完相應的通道以後,須要removed 由於Selector不會本身removed
select阻塞後,能夠用wakeup喚醒;執行wakeup時,若是沒有阻塞的select 那麼執行完wakeup後下一個執行select就會當即返回。
調用close() 方法關閉selector
下面是一個簡單的實例代碼,幫助理解上面的內容:
package com.pt.nio; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.Iterator; import java.util.Set; public class Reactor implements Runnable { public int id = 100001; public int bufferSize = 2048; @Override public void run() { // TODO Auto-generated method stub init(); } public void init() { try { // 建立通道和選擇器 ServerSocketChannel socketChannel = ServerSocketChannel.open(); Selector selector = Selector.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress( InetAddress.getLocalHost(), 4700); socketChannel.socket().bind(inetSocketAddress); // 設置通道非阻塞 綁定選擇器 socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_ACCEPT).attach( id++); System.out.println("Server started .... port:4700"); listener(selector); } catch (Exception e) { // TODO: handle exception } } public void listener(Selector in_selector) { try { while (true) { Thread.sleep(1*1000); in_selector.select(); // 阻塞 直到有就緒事件爲止 Set<SelectionKey> readySelectionKey = in_selector .selectedKeys(); Iterator<SelectionKey> it = readySelectionKey.iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); // 判斷是哪一個事件 if (selectionKey.isAcceptable()) {// 客戶請求鏈接 System.out.println(selectionKey.attachment() + " - 接受請求事件"); // 獲取通道 接受鏈接, // 設置非阻塞模式(必須),同時須要註冊 讀寫數據的事件,這樣有消息觸發時才能捕獲 ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey .channel(); serverSocketChannel .accept() .configureBlocking(false) .register( in_selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE).attach(id++); System.out .println(selectionKey.attachment() + " - 已鏈接"); // 下面這種寫法是有問題的 不該該在serverSocketChannel上面註冊 /* * serverSocketChannel.configureBlocking(false); * serverSocketChannel.register(in_selector, * SelectionKey.OP_READ); * serverSocketChannel.register(in_selector, * SelectionKey.OP_WRITE); */ } if (selectionKey.isReadable()) {// 讀數據 System.out.println(selectionKey.attachment() + " - 讀數據事件"); SocketChannel clientChannel=(SocketChannel)selectionKey.channel(); ByteBuffer receiveBuf = ByteBuffer.allocate(bufferSize); clientChannel.read(receiveBuf); System.out.println(selectionKey.attachment() + " - 讀取數據:" + getString(receiveBuf)); } if (selectionKey.isWritable()) {// 寫數據 System.out.println(selectionKey.attachment() + " - 寫數據事件"); SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); ByteBuffer sendBuf = ByteBuffer.allocate(bufferSize); String sendText = "hello\n"; sendBuf.put(sendText.getBytes()); sendBuf.flip(); //寫完數據後調用此方法 clientChannel.write(sendBuf); } if (selectionKey.isConnectable()) { System.out.println(selectionKey.attachment() + " - 鏈接事件"); } // 必須removed 不然會繼續存在,下一次循環還會進來, // 注意removed 的位置,針對一個.next() remove一次 it.remove(); } } } catch (Exception e) { // TODO: handle exception System.out.println("Error - " + e.getMessage()); e.printStackTrace(); } } /** * ByteBuffer 轉換 String * @param buffer * @return */ public static String getString(ByteBuffer buffer) { String string = ""; try { for(int i = 0; i<buffer.position();i++){ string += (char)buffer.get(i); } return string; } catch (Exception ex) { ex.printStackTrace(); return ""; } } }
package com.pt.bio; import java.io.*; import java.net.*; public class BioServer implements Runnable { @Override public void run() { // TODO Auto-generated method stub System.out.println("Hello Server!!"); try { ServerSocket server = null; try { server = new ServerSocket(4700); // 建立一個ServerSocket在端口4700監聽客戶請求 } catch (Exception e) { System.out.println("can not listen to:" + e); // 出錯,打印出錯信息 } Socket socket = null; try { socket = server.accept(); // 使用accept()阻塞等待客戶請求,有客戶 // 請求到來則產生一個Socket對象,並繼續執行 } catch (Exception e) { System.out.println("Error." + e); // 出錯,打印出錯信息 } String line; BufferedReader is = new BufferedReader(new InputStreamReader( socket.getInputStream())); // 由Socket對象獲得輸入流,並構造相應的BufferedReader對象 // 由Socket對象獲得輸出流,並構造PrintWriter對象 // BufferedReader sin = new BufferedReader(new InputStreamReader( // System.in)); // 由系統標準輸入設備構造BufferedReader對象 System.out.println("Client:" + is.readLine()); PrintWriter os = new PrintWriter(socket.getOutputStream()); // 在標準輸出上打印從客戶端讀入的字符串 line = "hello"; // 從標準輸入讀入一字符串 // while (!line.equals("bye")) { // 若是該字符串爲 "bye",則中止循環 os.println(line); // 向客戶端輸出該字符串 os.flush(); // 刷新輸出流,使Client立刻收到該字符串 // System.out.println("Server:" + line); // 在系統標準輸出上打印讀入的字符串 // System.out.println("Client:" + is.readLine()); // 從Client讀入一字符串,並打印到標準輸出上 // line = sin.readLine(); // 從系統標準輸入讀入一字符串 // } // 繼續循環 // os.close(); // 關閉Socket輸出流 is.close(); // 關閉Socket輸入流 socket.close(); // 關閉Socket server.close(); // 關閉ServerSocket } catch (Exception e) { System.out.println("Error." + e); // 出錯,打印出錯信息 } } }
package com.pt; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import org.junit.Test; import com.pt.bio.BioServer; import com.pt.nio.Reactor; public class TestReactor { @Test public void testConnect() throws Exception{ Socket socket=new Socket("192.168.82.35",4700);//BIO 阻塞 System.out.println("鏈接成功"); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //下面這種寫法,不用關閉客戶端,服務器端也是能夠收到的 { PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true); printWriter.println("hi"); printWriter.flush(); } //這種寫法必須關閉客戶端,服務器端才能夠收到 NIO不用 { // socket.getOutputStream().write(new byte[]{'h','i'}); // socket.getOutputStream().flush(); //必須關閉BIO服務器才能收到消息.NIO服務器不須要關閉 //socket.close(); } byte[] buf = new byte[2048]; System.out.println("準備讀取數據~~"); while(true){ try { //兩種讀取數據方式 int count = socket.getInputStream().read(buf); //會阻塞 //String readFromServer = bufferedReader.readLine();//能夠讀取到數據 會阻塞,直到碰見\n //System.out.println("方式二: 讀取數據" + readFromServer); System.out.println("方式一: 讀取數據" + new String(buf) + " count = " + count); Thread.sleep(1*1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //break; } } @Test public void testNioServer(){ Thread server = new Thread(new Reactor()); server.start(); while(true){ try { Thread.sleep(3*1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } @Test public void testBioServer(){ Thread server = new Thread(new BioServer()); server.start(); while(true){ try { Thread.sleep(3*1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
其中 testNioServer()方法,是啓動NIO服務器端;
testBioServer()方法是啓動BIO服務器端
testConnect()是BIO的一個鏈接
基於NIO實現的時鐘服務器:http://www.cnblogs.com/tengpan-cn/p/6529628.html
一篇寫的比較詳細的JAVA NIO的文章:http://www.iteye.com/magazines/132-Java-NIO