Netty源碼分析--NIO(一)

      很久沒寫博客了,最近打算花些時間把Netty的源碼好好讀一讀,下面是本人在學習的過程當中的一些筆記,不能確保本身思考的徹底是正確的,若是有錯誤,歡迎你們指正。java

      因爲本人的語文功底爛的很,通篇使用大白話來說解0.0,有一些概念上的東西,博主可能不會明確的給出定義,建議使用過Netty的同窗一塊兒來研究。api

      好了,咱們一塊兒來看下吧。服務器

      Netty 是一款用於快速開發的高性能的網絡應用程序的Java框架。說到Netty, 咱們先對幾種I/O模型進行一下比對:網絡

       

       那麼僞異步IO是啥呢?框架

       其實就是加入了線程池(ThreadPoolExecutor),對接入的客戶端的Socket封裝成task,實現了Runnable接口,而後投遞到線程池中處理,這樣就避免了BIO那種一個客戶端鏈接一個IO線程的狀況,防止資源耗盡和宕機。可是這種方式底層的通訊依然採用了同步阻塞模型,沒法從根本上解決問題。dom

       那麼AIO又是啥呢?異步

       NIO2.0 引入了新的一步通道的概念,並提供了異步文件通道和異步套接字的實現。它不須要經過多路複用器對註冊的通道進行輪詢操做便可實現異步讀寫,屬於真正意義上的異步非阻塞IO。socket

       一、經過java.util.concurrent.Future 類來異步獲取操做的結果。post

       二、在執行異步操做的時候傳入一個CompletionHandler接口的實現類,做爲操做完成的回調。性能

            接口有如下兩個方法。     

 1 /**
 2  * Invoked when an operation has completed.
 3  *
 4  * @param   result
 5  *          The result of the I/O operation.
 6  * @param   attachment
 7  *          The object attached to the I/O operation when it was initiated.
 8  */
 9 void completed(V result, A attachment);
10 
11 /**
12  * Invoked when an operation fails.
13  *
14  * @param   exc
15  *          The exception to indicate why the I/O operation failed
16  * @param   attachment
17  *          The object attached to the I/O operation when it was initiated.
18  */
19 void failed(Throwable exc, A attachment);

 

       好的,下面也稍微回顧一下NIO,以及NIO涉及的幾個關鍵組件:

  •  緩衝區 Buffer 
  •  通道 Channel
  •  多路複用器 Selector
  1. Buffer : 看什麼都不如看官方文檔來的更準確,下面是官方Buffer javadoc內容,咱們來看下: 

          

         裏面講述了,buffer抽象類 是一個數據容器,除了內容,還有一些屬性,capacity、limit、position。

         capacity 是容器的容量,這個值一旦被建立,就沒法修改。 limit 是 不該該被讀或寫的第一個元素的位置。 position 是指下一個將會被讀或寫的位置,這個值必定小於等於limit。

         另外javadoc中還提到了mark和reset, 其中mark其實就是打一個標記,把當前的position賦給mark。  那麼 reset 的 描述是這樣的 把當前的position 改爲以前mark的位置。

         

         ok,由上面的文檔能夠得出下面的順序  0 <= mark <= position <= limit <= capacity

         其實Buffer中還有一個很是重要的方法必需要說一下,那就是 flip() ,看下javadoc

         

         這個其實就是把 當前的limit = position, position = 0, 固然若是以前有mark也會失效,設置成-1, 當你往buffer中寫了數據的時候,只有執行flip()方法, 才能夠正確的讀取數據,  doc中還指出這個方法常常和compact()方法連着用。一樣,貼出javadoc:

        

       至關於什麼呢,就至關因而清理掉已經讀取過得數據,好比 position = 5 , limit = 10,前5個數據經讀取過了,那麼將新建一個buffer,將當前position到limit的數據拷貝到一個新的Buffer中,那麼新的buffer的postion = limit-postion, limit = capacity, 好了,看源碼是這樣的,接下來就是驗證一下了:

 1 ByteBuffer buffer = ByteBuffer.allocate(10);
 2 buffer.put("helloworld".getBytes());
 3 System.out.println(buffer.position() + ":" + buffer.limit());
 4 buffer.flip();
 5 System.out.println(buffer.position() + ":" + buffer.limit());
 6 byte[] bytes = new byte[buffer.limit() + 1];
 7 for(int i=0; i<6; i++) {
 8     bytes[i] = buffer.get();
 9 }
10 System.out.println(new String(bytes));
11 System.out.println(buffer.position() + ":" + buffer.limit());
12 System.out.println(buffer);
13 buffer.compact();
14 System.out.println(buffer.position() + ":" + buffer.limit());
15 System.out.println(buffer);

測試結果以下:

10:10
0:10
hellow     
6:10
java.nio.HeapByteBuffer[pos=6 lim=10 cap=10]
4:10
java.nio.HeapByteBuffer[pos=4 lim=10 cap=10]

 好了,Buffer的源碼看到這裏也算是差很少了。

二、Channel

      Channel是一個通道, 它就像自來水管同樣,網絡數據經過Channel讀取與寫入,通道與流的不一樣之處在於通道是雙向的,流只是在一個方向上移動(一個流必須是InputStream或者OutStream的子類),而通道能夠用於讀、寫或者兩者同時進行, 屬於全雙工。

     這裏咱們也來看下源碼吧,就看ServerSocketChannel

     提供了幾個比較重要的api:  

     public static ServerSocketChannel open() throws IOException; // 經過該方法建立一個Channel

     看下javadoc , 明確說明了 新建立的channel是沒有任何綁定的,在進行accepted以前須要綁定一個地址。

    

     public final ServerSocketChannel bind(SocketAddress local);// 綁定一個端口號

     public abstract SocketChannel accept() throws IOException; // 接收新的客戶端

     

 三、Selector 多路複用器 ,簡單來講呢,Selector 會不斷的輪訓註冊在其上的Channel, 若是某個Channel上面發生了讀寫等事件,這個Channel就會處理就緒狀態, 會被Selector輪訓出來,而後拿到SelectionKey Set集合,從而獲取到每個就緒狀態的Channel,進行後續的I/O操做。

       因爲JDK使用了epoll() 代替傳統的select實現,因此沒有最大句柄的1024/2048的限制, 只須要一個線程負責Selector的輪訓,就能夠接入成千上萬的客戶端。NB

       

        channel將會經過一個SelectionKey註冊到一個selector上,一個selector 經過 open方法去建立。

       

        這一段着重指出,selectionKey集合只能經過 set 集合的 remove() 方法 或者 一個迭代器的 remove() 方法來移除。其他的方法都不能夠修改 selected-key 。

        好了,看到這裏,有些朋友可能似懂非懂,可是看下下面的單元測試一會兒就懂了。

        這段代碼實現了Nio的服務器端,接收到客戶端消息後,而後通知全部的客戶端。

 1     private static final Map<String, SocketChannel> clientMap = new ConcurrentHashMap();
 2 
 3     public static void main(String[] args) {
 4 
 5         try {
 6             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  // 建立一個Channel
 7             serverSocketChannel.configureBlocking(false); // 設置爲非阻塞
 8             serverSocketChannel.bind(new InetSocketAddress(8899)); // 綁定端口
 9 
10             Selector selector = Selector.open(); // 建立一個Selector
11             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 將Channel註冊到Selector上,設置selectionKey 爲 accept, 準備接收新的客戶端鏈接
12 
13             while (true) { // 死循環不斷輪訓,查看 是否有準備就緒的channel
14                 selector.select();  // 阻塞等到就緒的channel
15                 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 獲取到就緒的selectionKeys集合
16                 selectionKeys.forEach(value -> {
17                     try {
18                         if(value.isAcceptable()) { // 接收新的客戶端事件
19                             ServerSocketChannel channel = (ServerSocketChannel)value.channel(); // 獲取channel
20                             SocketChannel clientChannel = channel.accept(); // 獲取客戶端的 socketChannel
21                             clientChannel.configureBlocking(false); // 設置爲非阻塞
22                             String clientId = UUID.randomUUID().toString();
23                             System.out.println("客戶端接入" + clientId); 
24                             clientMap.put(clientId, clientChannel);
25                             clientChannel.register(selector, SelectionKey.OP_READ); // 這裏重點說下, 當接收到新的客戶端後,接下來就是準備接收數據,因此這裏就是註冊的是Read事件
// 而且這裏註冊到selector上的是客戶端對應的SocketChannel, 而不是ServerSocketChannel,
// 由於ServerScoketChannel只負責接收新的客戶端
26 } else if(value.isReadable()) { // 接收到read事件 27 SocketChannel clientChannel = (SocketChannel)value.channel(); // 因此這裏是SocketChannel 28 ByteBuffer buffer = ByteBuffer.allocate(1024); // 分配內存 29 int count = clientChannel.read(buffer); // 寫channel中的數據到Buffer中 30 if (count > 0) { 31 buffer.flip(); // 寫完以後,必定要執行flip。轉化成讀 32 Charset charset = Charset.forName("utf-8"); 33 String receiveMsg = String.valueOf(charset.decode(buffer).array()); 34 System.out.println("receiveMsg = " +receiveMsg); 35 Iterator<Map.Entry<String, SocketChannel>> it = clientMap.entrySet().iterator(); 36 String sendClient = null; 37 while (it.hasNext()) { 38 Map.Entry<String, SocketChannel> next = it.next(); 39 if(next.getValue() == clientChannel) { 40 sendClient = next.getKey(); 41 break; 42 } 43 } 44 it = clientMap.entrySet().iterator(); 45 ByteBuffer writeBuffer = ByteBuffer.allocate(1024); 46 while (it.hasNext()) { 47 SocketChannel socketChannel = it.next().getValue(); 48 writeBuffer.clear(); 49 writeBuffer.put(("sendClient:" + sendClient + "發送了消息").getBytes()); 50 writeBuffer.flip(); 51 socketChannel.write(writeBuffer); 52 } 53 } 54 } 55 } catch (Exception e) { 56 e.printStackTrace(); 57 } 58 }); 59 selectionKeys.clear(); // 每次處理完這一批selectionKeys,必定要清空掉集合。 60 } 61 62 } catch (IOException e) { 63 e.printStackTrace(); 64 } finally { 65 } 66 }

    ok, 上面是我本身的一些理解,若是有問題歡迎你們指正。下一篇,咱們將開始學習Netty的源碼。

相關文章
相關標籤/搜索