何謂Reactor模式?它是實現高性能IO的一種設計模式。網上資料有不少,有些寫的也很好,但大多不知其因此然。這裏博主按本身的思路簡單介紹下,有不對的地方敬請指正。html
BIOjava
Java1.4(2002年)之前,IO都是Blocking的,也就是常說的BIO,它在等待請求、讀、寫(返回)三個環節都是阻塞的。在等待請求階段,系統沒法知道請求什麼時候到達,所以須要一個主線程一直守着,當有請求進來時,將請求分發給讀寫線程。如圖:linux
代碼以下:編程
ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//線程池 ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(8088); while(!Thread.currentThread.isInturrupted()){//主線程死循環等待新鏈接到來 Socket socket = serverSocket.accept(); executor.submit(new ConnectIOnHandler(socket));//爲新的鏈接建立新的線程 }
class ConnectIOnHandler extends Thread{ private Socket socket; public ConnectIOnHandler(Socket socket){ this.socket = socket; } public void run(){ while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){//死循環處理讀寫事件 String someThing = socket.read()....//讀取數據 if(someThing!=null){
......//處理數據
socket.write()....//寫數據 } } }
需知,請求進來(accept),並不表示數據立刻達到了,可能隔一段時間纔會傳進來,這個時候socket.read()也是一直阻塞的狀態。socket.write()也同理,當向磁盤或其它socket寫數據時,也要等對方準備好才能寫入,在對方準備階段,socket.write()也是阻塞的。這兩個環節可能的無效阻塞致使讀寫線程的低效。segmentfault
NIO設計模式
Java1.4開始,引入了NIO。NIO有三個概念:Selector、Buffer、Channel。與BIO的區別是,請求進來後,並不會立刻分派IO線程,而是依靠操做系統底層的多路複用機制(select/poll/epoll等),在監聽到socket讀寫就緒以後,再分配IO線程(實際可由當前線程[使用Buffer和Channel]直接讀寫,由於讀寫自己的效率很高),這就避免了線程等待。且與BIO多線程方式相比,使用I/O多路複用技術,系統沒必要建立和維護龐大的線程池,從而大大減少了開銷。這部分工做是NIO的核心,由Selector負責,本質上是多路複用的Java封裝。而Buffer和Channel又封裝了一層socket的讀寫,應該爲的是將IO與業務代碼完全分離。如下圖示爲本人理解:網絡
如圖示,與BIO中監聽線程職責不一樣,Selector監聽的不僅是鏈接請求,還有讀寫就緒事件,當某個事件發生時,即通知註冊了該事件的Channel,由Channel操做socket讀寫Buffer。虛線表示須要具體的NIO框架或業務代碼本身處理,好比Channel如何註冊以及註冊何種事件,Channel處理IO的方式(如在當前線程處理仍是新開線程,若新開線程,則可看做是AIO模式)等。NIO只是提供了一套機制,具體使用仍是須要編程實現(Reactor模式就是OO的一種實現)。多線程
示例代碼(摘自Java NIO詳解)框架
服務端:異步
1 package cn.blog.test.NioTest; 2 3 4 import java.io.IOException; 5 import java.net.InetSocketAddress; 6 import java.nio.ByteBuffer; 7 import java.nio.channels.*; 8 import java.nio.charset.Charset; 9 import java.util.Iterator; 10 import java.util.Set; 11 12 13 public class MyNioServer { 14 private Selector selector; //建立一個選擇器 15 private final static int port = 8686; 16 private final static int BUF_SIZE = 10240; 17 18 private void initServer() throws IOException { 19 //建立通道管理器對象selector 20 this.selector=Selector.open(); 21 22 //建立一個通道對象channel 23 ServerSocketChannel channel = ServerSocketChannel.open(); 24 channel.configureBlocking(false); //將通道設置爲非阻塞 25 channel.socket().bind(new InetSocketAddress(port)); //將通道綁定在8686端口 26 27 //將上述的通道管理器和通道綁定,併爲該通道註冊OP_ACCEPT事件 28 //註冊事件後,當該事件到達時,selector.select()會返回(一個key),若是該事件沒到達selector.select()會一直阻塞 29 SelectionKey selectionKey = channel.register(selector,SelectionKey.OP_ACCEPT); 30 31 while (true){ //輪詢 32 selector.select(); //這是一個阻塞方法,一直等待直到有數據可讀,返回值是key的數量(能夠有多個) 33 Set keys = selector.selectedKeys(); //若是channel有數據了,將生成的key訪入keys集合中 34 Iterator iterator = keys.iterator(); //獲得這個keys集合的迭代器 35 while (iterator.hasNext()){ //使用迭代器遍歷集合 36 SelectionKey key = (SelectionKey) iterator.next(); //獲得集合中的一個key實例 37 iterator.remove(); //拿到當前key實例以後記得在迭代器中將這個元素刪除,很是重要,不然會出錯 38 if (key.isAcceptable()){ //判斷當前key所表明的channel是否在Acceptable狀態,若是是就進行接收 39 doAccept(key); 40 }else if (key.isReadable()){ 41 doRead(key); 42 }else if (key.isWritable() && key.isValid()){ 43 doWrite(key); 44 }else if (key.isConnectable()){ 45 System.out.println("鏈接成功!"); 46 } 47 } 48 } 49 } 50 51 public void doAccept(SelectionKey key) throws IOException { 52 ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); 53 System.out.println("ServerSocketChannel正在循環監聽"); 54 SocketChannel clientChannel = serverChannel.accept(); 55 clientChannel.configureBlocking(false); 56 clientChannel.register(key.selector(),SelectionKey.OP_READ); 57 } 58 59 public void doRead(SelectionKey key) throws IOException { 60 SocketChannel clientChannel = (SocketChannel) key.channel(); 61 ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE); 62 long bytesRead = clientChannel.read(byteBuffer); 63 while (bytesRead>0){ 64 byteBuffer.flip(); 65 byte[] data = byteBuffer.array(); 66 String info = new String(data).trim(); 67 System.out.println("從客戶端發送過來的消息是:"+info); 68 byteBuffer.clear(); 69 bytesRead = clientChannel.read(byteBuffer); 70 } 71 if (bytesRead==-1){ 72 clientChannel.close(); 73 } 74 } 75 76 public void doWrite(SelectionKey key) throws IOException { 77 ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE); 78 byteBuffer.flip(); 79 SocketChannel clientChannel = (SocketChannel) key.channel(); 80 while (byteBuffer.hasRemaining()){ 81 clientChannel.write(byteBuffer); 82 } 83 byteBuffer.compact(); 84 } 85 86 public static void main(String[] args) throws IOException { 87 MyNioServer myNioServer = new MyNioServer(); 88 myNioServer.initServer(); 89 } 90 }
客戶端:
1 package cn.blog.test.NioTest; 2 3 4 import java.io.IOException; 5 import java.net.InetSocketAddress; 6 import java.nio.ByteBuffer; 7 import java.nio.channels.SelectionKey; 8 import java.nio.channels.Selector; 9 import java.nio.channels.SocketChannel; 10 import java.util.Iterator; 11 12 public class MyNioClient { 13 private Selector selector; //建立一個選擇器 14 private final static int port = 8686; 15 private final static int BUF_SIZE = 10240; 16 private static ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE); 17 18 private void initClient() throws IOException { 19 this.selector = Selector.open(); 20 SocketChannel clientChannel = SocketChannel.open(); 21 clientChannel.configureBlocking(false); 22 clientChannel.connect(new InetSocketAddress(port)); 23 clientChannel.register(selector, SelectionKey.OP_CONNECT); 24 while (true){ 25 selector.select(); 26 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); 27 while (iterator.hasNext()){ 28 SelectionKey key = iterator.next(); 29 iterator.remove(); 30 if (key.isConnectable()){ 31 doConnect(key); 32 }else if (key.isReadable()){ 33 doRead(key); 34 } 35 } 36 } 37 } 38 39 public void doConnect(SelectionKey key) throws IOException { 40 SocketChannel clientChannel = (SocketChannel) key.channel(); 41 if (clientChannel.isConnectionPending()){ 42 clientChannel.finishConnect(); 43 } 44 clientChannel.configureBlocking(false); 45 String info = "服務端你好!!"; 46 byteBuffer.clear(); 47 byteBuffer.put(info.getBytes("UTF-8")); 48 byteBuffer.flip(); 49 clientChannel.write(byteBuffer); 50 //clientChannel.register(key.selector(),SelectionKey.OP_READ); 51 clientChannel.close(); 52 } 53 54 public void doRead(SelectionKey key) throws IOException { 55 SocketChannel clientChannel = (SocketChannel) key.channel(); 56 clientChannel.read(byteBuffer); 57 byte[] data = byteBuffer.array(); 58 String msg = new String(data).trim(); 59 System.out.println("服務端發送消息:"+msg); 60 clientChannel.close(); 61 key.selector().close(); 62 } 63 64 public static void main(String[] args) throws IOException { 65 MyNioClient myNioClient = new MyNioClient(); 66 myNioClient.initClient(); 67 } 68 }
在早期的JDK1.4和1.5 update10版本以前,Selector基於select/poll模型實現,是基於IO複用技術的非阻塞IO,不是異步IO。在JDK1.5 update10和linux core2.6以上版本,sun優化了Selctor的實現,底層使用epoll替換了select/poll。另聽說Buffer指向的並不是堆內內存,NIO使用 Native 函數庫直接分配堆外內存,而後經過一個存儲在 Java 堆的 DirectByteBuffer 對象做爲這塊內存的引用進行操做,避免了在 Java 堆和 Native 堆中來回複製數據。
NIO的實現解析可參看:深刻淺出NIO Socket實現機制
Reactor模式
NIO爲實現Reactor模式提供了基礎,上面的NIO圖示其實就是Reactor模式的雛形,只是Reactor以OO的方式抽象出了幾個概念,使得職責劃分更加明確。
基於上述三個角色畫出Reactor模式圖以下:
如此,Reactor模式便很是清晰地展示在咱們眼前。那麼業務線程如何與Reactor交互呢?由前文所知,數據存取於Buffer,具體操做由Handler負責。socket.read()將數據讀入Buffer,須要一種機制將Buffer引用推送給業務線程;一樣,業務線程返回的數據須要寫入Buffer,按Reactor模式,寫入後還須要註冊write事件,socket可寫後write()。若是直接調用的話,至少Handler和業務代碼會耦合在一塊兒,常見的解耦方式是定義接口,或使用消息中間件。
其它
話說回來,因爲相對短暫的歷史以及相對封閉的環境,.Net社區缺乏不少概念的演化、探究和討論,這也致使了.Neter們這些概念的缺失。雖然從語言層面上來講,C#和Java大同小異,前者甚至必定程度的有語法上的便利,然而只有認識到了其背後的思想和模式,才能真正用好這門語言,這就是.Neter須要瞭解Java及其歷史的緣由,畢竟.Net一開始就是參照着Java來的。
好比.Net裏的堆棧概念,就算一些經典書籍都沒有很是深刻的說明,而Java方面的資料就不少了,參看深刻理解JVM—JVM內存模型
其它參考資料: