NIO系列1:框架拆解

最近一年用NIO寫了很多網絡程序,也研究了一些開源NIO網絡框架netty、mina等,總結了一下NIO的架構特色。java

不管是netty仍是mina它們都在java原生NIO的基礎上進行了完善的封裝,雖然細節有所不一樣,但整體架構思路一致,都大概劃分出了如下幾個組成部分:react

- - transport:傳輸層的抽象程序員

- - protocol: 協議codec的抽象緩存

- - event model:統一事件模型安全

- - buffer:底層buffer封裝網絡

在徹底屏蔽底層API的同時,對上層應用提供了自身的統一API接口。session

框架進行黑盒封裝的同時,再進行通用化的接口開放,帶來的好處是統一化,但壞處是程序的透明度下降,抽象度提升,增長理解難度和實現難度。多線程

 

下面說說每一個部分的一些設計考量:架構

transport傳輸層抽象都是對java原生NIO API的封裝,在這一層封裝的程度在於框架的實現目標。例如mina立足於通用的網絡框架,所以徹底屏蔽了原生的API,提供了自身的統一接口,由於它不只須要封裝NIO的API還有一系列其餘類型的IO操做的API,提供統一API接口。爲了通用兼顧各種傳輸通道所以可能不得不暴露多餘的API接口,使用方需甄別傳輸通道的不一樣,增長了理解難度。併發

protocol封裝各種經常使用協議的codec操做,但目前這些網絡框架的codec實現都與自身的API緊密綁定,下降了可重用性。

event model 事件模型的設計一般不能徹底獨立,例如java NIO自己的模型是事件驅動的,但傳統阻塞型IO並不是事件驅動,要兼顧兩者一般要付出額外的代價和開銷。

有一種說法是讓異步IO同步化使用(由於同步化使用更簡單,異步致使了業務處理的碎片化)到底對不對值得商榷?模型阻抗致使的代價和開銷屏蔽在了黑盒中,也容易誤導應用程序員對本該採用同步化處理的業務卻濫用了異步化機制,並不會帶來什麼好處。

buffer 一般都用來配合底層IO數據流和協議codec使用,自己是否適合暴露給應用方取決於框架是否整合codec,由於codec自己帶有業務性質,而純粹的IO數據流處理使用的buffer則徹底無需暴露給應用方。

 

以上簡單說了下NIO框架各部分的設計考量,能夠看出目前流行的NIO框架(netty和mina)都在走一條相似「瑞士軍刀」的路線,集各類功能與一身(多種IO封裝、協議封裝),但你又很難把瑞士軍刀上的某個刀片拆下來單獨使用。

在實踐中感受,考慮從單一性、簡潔性、重用性、組合性、透明性幾個方面去設計原子化的IO組件也許更可取,更像是一種「工具箱」路線。

 

典型的事件驅動模型NIO框架組件交互圖以下:

Acceptor:  負責監聽鏈接事件負責接入

Processor:負責IO讀寫事件處理

EventDispatcher:負責事件派發

Handler:業務處理器

後面將經過一個系列文章來討論一個原子化的NIO組件實現的細節及設計考量。

 

NIO系列2:TCP監聽綁定

分類: 踏莎行·術

注:本文適合對象需對java NIO API的使用及異步事件模型(Reactor模式)有必定程度的瞭解,主要講述使用java原生NIO實現一個TCP監聽綁定的過程及細節設計。

 

咱們一開始設計了一個TCP接入服務類,這個類提供了一個API方法提供對本地一系列地址(端口)的監聽綁定,類初始化後完成Selector的open操做以下:

 

[java]  view plain copy
 
  1. selector = Selector.open();  

提供的綁定API,其方法簽名以下:

 

[java]  view plain copy
 
  1. /** 
  2.  * Binds to the specified local addresses and start to accept incoming connections. If any address binding failed then 
  3.  * rollback the already binding addresses. Bind is fail fast, if encounter the first bind exception then throw it immediately. 
  4.  *  
  5.  * @param firstLocalAddress 
  6.  * @param otherLocalAddresses 
  7.  * @throws throw if bind failed. 
  8.  */  
  9. synchronized public void bind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses) throws IOException;  

爲什麼須要同步?由於咱們不但願多個線程同時調用該方法,致使地址綁定異常。

 

參數中能夠傳遞多個本地地址(端口)同時進行監聽綁定。

在NIO的綁定過程當中需進行事件註冊(對OP_ACCEPT感興趣),以下:

 

[java]  view plain copy
 
  1. ServerSocketChannel ssc = ServerSocketChannel.open();  
  2. ssc.configureBlocking(false);  
  3. ServerSocket ss = ssc.socket();  
  4. ss.setReuseAddress(config.isReuseAddress());  
  5. ss.bind(address, config.getBacklog());  
  6. ssc.register(selector, SelectionKey.OP_ACCEPT);  

 

 

因爲註冊過程當中除了涉及鎖競爭還可能產生死鎖,因此通常的作法都是將綁定地址放在隊列中進行異步註冊由reactor線程進行處理,例如:

 

[java]  view plain copy
 
  1. bindAddresses.addAll(localAddresses);  
  2. if (!bindAddresses.isEmpty()) {  
  3.     synchronized (lock) {  
  4.         // wake up for unblocking the select() to process binding addresses  
  5.         selector.wakeup();  
  6.   
  7.         // wait for bind result  
  8.         wait0();  
  9.     }  
  10. }  

從同步註冊變爲異步註冊後就存在一個問題,實際註冊綁定時可能存在端口已綁定的異常,在異步狀況下就須要線程間通訊來通知異常消息,並向調用方反饋。

 

如上面代碼片斷中的wait0()方法就是等待綁定結果,若出現綁定異常則拋出

 

[java]  view plain copy
 
  1. private void wait0() throws IOException {  
  2. while (!this.endFlag) {  
  3.     try {  
  4.         lock.wait();  
  5.     } catch (InterruptedException e) {  
  6.         throw new IOException(e);  
  7.     }  
  8. }  
  9.   
  10. // reset end flag  
  11. this.endFlag = false;  
  12.   
  13. if (this.exception != null) {  
  14.     IOException e = exception;  
  15.     this.exception = null;  
  16.     throw e;  
  17. }  

以上代碼也說明了,NIO異步模型轉化爲同步API致使的模型阻抗付出了額外的代價和開銷 --- 線程間通訊。

 

至此,完成了TCP服務監聽過程,下文將進一步講述服務接入和數據傳輸相關設計細節。
 

NIO系列3:TCP服務接入

分類: 踏莎行·術

注:本文適合對象需對java NIO API的使用及異步事件模型(Reactor模式)有必定程度的瞭解,主要講述使用java原生NIO實現一個TCP服務的過程及細節設計。

 

前文講述了NIO TCP服務綁定過程的實現機制,如今能夠開始講述服務監聽啓動後如何和處理接入和數據傳輸相關的細節設計。

在NIO的接入類中有一個Reactor線程,用於處理OP_ACCEPT事件通知,以下:

 

[java]  view plain copy
 
  1. private class AcceptThread extends Thread {  
  2. public void run() {  
  3.     while (selectable) {  
  4.         try {  
  5.             int selected = selector.select();  
  6.               
  7.             if (selected > 0) {  
  8.                 accept();  
  9.             }  
  10.               
  11.             // bind addresses to listen  
  12.             bind0();  
  13.               
  14.             // unbind canceled addresses  
  15.             unbind0();  
  16.         } catch (Exception e) {  
  17.             LOG.error("Unexpected exception caught while accept", e);  
  18.         }  
  19.     }  
  20.       
  21.     // if selectable == false, shutdown the acceptor  
  22.     try {  
  23.         shutdown0();  
  24.     } catch (Exception e) {  
  25.         LOG.error("Unexpected exception caught while shutdown", e);  
  26.     }  
  27. }  

 

 

當有客戶端接入時selector.select()方法返回大於0的整數,並進入accept()方法進行處理,具體以下:

 

[java]  view plain copy
 
  1.  private void accept() {  
  2.     Iterator<SelectionKey> it = selector.selectedKeys().iterator();  
  3.     while (it.hasNext()) {  
  4.         SelectionKey key = it.next();  
  5.         it.remove();  
  6.         AbstractSession session = (AbstractSession) acceptByProtocol(key);  
  7.         Processor processor = pool.get(session);  
  8.         session.setProcessor(processor);  
  9.         processor.add(session);  
  10.     }  
  11. }  

 

 

 

[java]  view plain copy
 
  1.  protected Session acceptByProtocol(SelectionKey key) {  
  2.     if (key == null || !key.isValid() || !key.isAcceptable()) {  
  3.            <span style="white-space:pre">       </span>return null;  
  4.        <span style="white-space:pre">   </span>}  
  5.       
  6.     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();  
  7.     SocketChannel sc = null;  
  8.     try {  
  9.         sc = ssc.accept();  
  10.         if(sc == null) {  
  11.             return null;  
  12.         }  
  13.         sc.configureBlocking(false);  
  14.     } catch (IOException e) {  
  15.         LOG.warn(e.getMessage(), e);  
  16.         if(sc != null) {  
  17.             try {  
  18.                 sc.close();  
  19.             } catch (IOException ex) {  
  20.                 LOG.warn(ex.getMessage(), ex);  
  21.             }  
  22.         }  
  23.     }  
  24.       
  25.     Session session = new TcpSession(sc, config);  
  26.       
  27.     return session;  
  28. }  

 


爲每個接入的客戶端經過調用NIO原生accept()方法返回一個SocketChannel的抽象,並封裝成一個session對象(session的概念來自mina框架)

注意:此時與客戶鏈接的通道還沒有註冊對讀/寫事件感興趣,由於它的註冊與前文綁定過程同樣須要異步進行。

所以將封裝通道的session轉交給一個processor對象(io讀寫處理器,該概念也是來自mina),processor內部維持了一個新建session的隊列,在其內部reactor線程循環中進行註冊處理。

有關processor處理讀寫事件的細節設計見下文。

 

 

NIO系列4:TCP服務數據讀寫

分類: 踏莎行·術

注:本文適合對象需對java NIO API的使用及異步事件模型(Reactor模式)有必定程度的瞭解,主要講述使用java原生NIO實現一個TCP服務的過程及細節設計。

 

上文講到當客戶端完成與服務端的鏈接創建後,爲其SocketChannel封裝了一個session對象表明這個鏈接,並交給processor處理。

processor的內部有3個重要的隊列,分別存放新建立的session、須要寫數據的session和準備關閉的session,以下:

 

[java]  view plain copy
 
  1. /** A Session queue containing the newly created sessions */  
  2. private final Queue<AbstractSession> newSessions = new ConcurrentLinkedQueue<AbstractSession>();  
  3.   
  4. /** A queue used to store the sessions to be flushed */  
  5. private final Queue<AbstractSession> flushingSessions = new ConcurrentLinkedQueue<AbstractSession>();  
  6.   
  7. /** A queue used to store the sessions to be closed */  
  8. private final Queue<AbstractSession> closingSessions = new ConcurrentLinkedQueue<AbstractSession>();  

在processor的reactor循環處理線程中,每輪循環的處理包括以下步驟:

 

1. selector.select(),其中爲了處理鏈接超時的狀況,select方法中傳遞了超時參數以避免其永久阻塞,一般是1秒。該方法即時在沒有事件發生時每秒返回一次,進入循環檢測超時

 

[java]  view plain copy
 
  1. int selected = selector.select(SELECT_TIMEOUT);  

2. 從select返回後,首先檢查newSessions隊列是否有新的session加入,併爲其註冊監聽事件(讀事件監聽)。session只有在註冊完事件後,咱們才認爲其狀態爲open並派發打開事件。(關於session狀態,有建立、打開、關閉中、已關閉幾種)

 

 

[java]  view plain copy
 
  1. for (AbstractSession session = newSessions.poll(); session != null; session = newSessions.poll()) {  
  2.     SelectableChannel sc = session.getChannel();  
  3.     SelectionKey key = sc.register(selector, SelectionKey.OP_READ, session);  
  4.     session.setSelectionKey(key);  
  5.       
  6.     // set session state open, so we can read / write  
  7.     session.setOpened();  
  8.       
  9.     // fire session opened event  
  10.     eventDispatcher.dispatch(new Event(EventType.SESSION_OPENED, session, null, handler));  
  11.       
  12.     n++;  
  13. }  

 

 

3. 有讀/寫事件時,進行相關處理,每次讀寫事件發生時更新一次最後的IO時間。

 

[java]  view plain copy
 
  1. // set last IO time  
  2. session.setLastIoTime(System.currentTimeMillis());  
  3.   
  4. // Process reads  
  5. if (session.isOpened() && isReadable(session)) {  
  6.     read(session);  
  7. }  
  8.   
  9. // Process writes  
  10. if (session.isOpened() && isWritable(session)) {  
  11.     asyWrite(session);  
  12. }  

讀取數據時有一個小技巧在於靈活自適應buffer分配(來自mina的一個實現策略),每次判斷讀取到的字節數若乘以2依然小於buffer大小,則收縮buffer爲原來一半,若讀取的字節數已裝滿buffer則擴大一倍。

 

[java]  view plain copy
 
  1. int readBytes = 0;  
  2. int ret;  
  3. while ((ret = ((SocketChannel) session.getChannel()).read(buf)) > 0) {  
  4.     readBytes += ret;  
  5.     if (!buf.hasRemaining()) {  
  6.         break;  
  7.     }  
  8. }  
  9.   
  10. if (readBytes > 0) {  
  11.     if ((readBytes << 1) < session.getReadBufferSize()) {  
  12.         shrinkReadBufferSize(session);  
  13.     } else if (readBytes == session.getReadBufferSize()) {  
  14.         extendReadBufferSize(session);  
  15.     }  
  16.       
  17.     fireMessageReceived(session, buf, readBytes);  
  18. }  
  19.   
  20. // read end-of-stream, remote peer may close channel so close session.  
  21. if (ret < 0) {  
  22.     asyClose(session);  
  23. }  

 

處理寫操做實際上是異步的,老是放入flushSessions中等待寫出。

 

 

[java]  view plain copy
 
  1. private void asyWrite(AbstractSession session) {  
  2. // Add session to flushing queue, soon after it will be flushed in the same select loop.  
  3. flushingSessions.add(session);  

4. 如有須要寫數據的session,則進行flush操做。

寫事件通常默認都是不去關注的,由於在TCP緩衝區可寫或遠端斷開或IO錯誤發生時都會觸發該事件,容易誘發服務端忙循環從而CPU100%問題。爲了保證讀寫公平,寫buffer的大小設置爲讀buffer的1.5倍(來自mina的實現策略),每次寫數據前設置爲對寫事件再也不感興趣。限制每次寫出數據大小的緣由除了避免讀寫不公平,也避免某些鏈接有大量數據須要寫出時一次佔用了過多的網絡帶寬而其餘鏈接的數據寫出被延遲從而影響了公平性。

 

[java]  view plain copy
 
  1. // First set not be interested to write event  
  2. etInterestedInWrite(session, false);  

首先向TCP緩衝區寫出數據(NIO的原生API操做都是不阻塞的)

 

 

[java]  view plain copy
 
  1. int qota = maxWrittenBytes - writtenBytes;  
  2. int localWrittenBytes = write(session, buf, qota);  

寫完後根據返回的寫出數據字節數,可能存在如下多種狀況:

 

- - buffer一次寫完,則派發消息已經發送事件

 

[java]  view plain copy
 
  1. // The buffer is all flushed, remove it from write queue  
  2.     if (!buf.hasRemaining()) {  
  3.         if (LOG.isDebugEnabled()) {  
  4.             LOG.debug("The buffer is all flushed, remove it from write queue");  
  5.         }  
  6.           
  7.         writeQueue.remove();  
  8.           
  9.         // fire message sent event  
  10.         eventDispatcher.dispatch(new Event(EventType.MESSAGE_SENT, session, buf.array(), handler));  
  11.     }  

- - 若返回的寫入字節數爲0,多是TCP緩存buffer已滿,則註冊對寫事件感興趣,稍待下次再寫。

 

 

[java]  view plain copy
 
  1. // 0 byte be written, maybe kernel buffer is full so we re-interest in writing and later flush it.  
  2. if (localWrittenBytes == 0) {  
  3.     if (LOG.isDebugEnabled()) {  
  4.         LOG.debug("0 byte be written, maybe kernel buffer is full so we re-interest in writing and later flush it");  
  5.     }  
  6.       
  7.     setInterestedInWrite(session, true);  
  8.     flushingSessions.add(session);  
  9.     return;  
  10. }  

- - 若一次寫入沒有寫完buffer中的數據,依然註冊對寫事件感興趣,稍待下次再寫。

 

 

[java]  view plain copy
 
  1. // The buffer isn't empty(bytes to flush more than max bytes), we re-interest in writing and later flush it.  
  2.     if (localWrittenBytes > 0 && buf.hasRemaining()) {  
  3.         if (LOG.isDebugEnabled()) {  
  4.             LOG.debug("The buffer isn't empty(bytes to flush more than max bytes), we re-interest in writing and later flush it");  
  5.         }  
  6.           
  7.         setInterestedInWrite(session, true);  
  8.         flushingSessions.add(session);  
  9.         return;  
  10.     }  

- - 一次寫入數據太多時,爲了保證公平性,依然下次再寫入

 

 

[java]  view plain copy
 
  1. // Wrote too much, so we re-interest in writing and later flush other bytes.  
  2. if (writtenBytes >= maxWrittenBytes && buf.hasRemaining()) {  
  3.     if (LOG.isDebugEnabled()) {  
  4.         LOG.debug("Wrote too much, so we re-interest in writing and later flush other bytes");  
  5.     }  
  6.       
  7.     setInterestedInWrite(session, true);  
  8.     flushingSessions.add(session);  
  9.     return;  
  10. }  

5. 有須要關閉的session,則進行關閉操做。引起關閉session的操做可能來自應用方主動關閉,也多是因爲IO異常後自動關閉。因爲關閉session可能存在多線程調用,爲了不鎖同步,咱們經過狀態檢測來規避用鎖機制提升效率。

 

關閉session的操做具體來講就是對channel.close()和key.cancel(),這2個操做後其實尚未徹底釋放socket佔用的文件描述符,需等到下次select()操做後,一些NIO框架會主動調用,因爲咱們這裏select(TIMEOUT)帶有超時參數會自動喚醒,所以不存在這個問題。

 

[java]  view plain copy
 
  1. private int close() throws IOException {  
  2.     int n = 0;  
  3.     for (AbstractSession session = closingSessions.poll(); session != null; session = closingSessions.poll()) {  
  4.         if (LOG.isDebugEnabled()) { LOG.debug("Closing session: " + session); }  
  5.           
  6.         if (session.isClosed()) {  
  7.             if (LOG.isDebugEnabled()) { LOG.debug("Escape close session, it has been closed: " + session); }  
  8.             continue;  
  9.         }  
  10.           
  11.         session.setClosing();  
  12.           
  13.         close(session);  
  14.         n++;  
  15.           
  16.         session.setClosed();  
  17.           
  18.         // fire session closed event  
  19.         eventDispatcher.dispatch(new Event(EventType.SESSION_CLOSED, session, null, handler));  
  20.           
  21.         if (LOG.isDebugEnabled()) { LOG.debug("Closed session: " + session); }  
  22.     }  
  23.     return n;  
 

NIO系列5:事件模型

分類: 踏莎行·術

前文講述了NIO數據讀寫處理,那麼這些數據最終如何被遞交給上層業務程序進行處理的呢?

NIO框架通常都採用了事件派發模型來與業務處理器交互,它與原生NIO的事件機制是模型匹配的,缺點是帶來了業務處理的碎片化。須要業務程序開發者對事件的生命週期有一個清晰的瞭解,不像傳統方式那麼直觀。

事件派發器(EventDispatcher)就成爲了NIO框架中IO處理線程和業務處理回調接口(Handler)之間的橋樑。

因爲業務處理的時間長短是難以肯定的,因此通常事件處理器都會分離IO處理線程,使用新的業務處理線程池來進行事件派發,回調業務接口實現。

 

下面經過一段示例代碼來講明事件的派發過程:

這是processor從網絡中讀取到一段字節後發起的MESSAGE_RECEIVED事件,調用了eventDispatcher.dispatch(Event e)方法。

 

[java]  view plain copy
 
  1. private void fireMessageReceived(AbstractSession session, ByteBuffer buf, int length) {  
  2.         // fire message received event, here we copy buffer bytes to a new byte array to avoid handler expose <code>ByteBuffer</code> to end user.  
  3.         byte[] barr = new byte[length];  
  4.         System.arraycopy(buf.array(), 0, barr, 0, length);  
  5.         eventDispatcher.dispatch(new Event(EventType.MESSAGE_RECEIVED, session, barr, handler));  
  6.     }  

dispatch的方法實現有如下關鍵點須要考慮:

1. 事件派發是多線程的,派發線程最終會調用業務回調接口來進行事件處理,回調接口由業務方實現自身去保證線程併發性和安全性。

2. 對於TCP應用來講,由同一session(這裏可表明同一個鏈接)收到的數據必須保證有序派發,不一樣的session可無序。

3. 不一樣session的事件派發要儘量保證公平性,例如:session1有大量事件產生致使派發線程繁忙時,session2產生一個事件不會由於派發線程都在忙於處理session1的事件而被積壓,session2的事件也能儘快獲得及時派發。

下面是一個實現思路的代碼示例:

 

[java]  view plain copy
 
  1. public void dispatch(Event event) {  
  2.         AbstractSession s = (AbstractSession) event.getSession();  
  3.         s.add(event);  
  4.         if (!s.isEventProcessing()) {  
  5.             squeue.offer(s);  
  6.         }  
  7.     }  

爲了保證每一個session的事件有序,咱們將事件存放在每一個session自身包含的隊列中,而後再將session放入一個公共的阻塞隊列中。

 

有一組worker線程在監聽阻塞隊列,一旦有session進入隊列,它們被激活對session進行事件派發,以下:

 

[java]  view plain copy
 
  1. public void run() {  
  2.             try {  
  3.                 for (AbstractSession s = squeue.take(); s != null; s = squeue.take()) {                   
  4.                     // first check any worker is processing this session? if any other worker thread is processing this event with same session, just ignore it.  
  5.                     synchronized (s) {  
  6.                         if (!s.isEventProcessing()) {  
  7.                             s.setEventProcessing(true);  
  8.                         } else {  
  9.                             continue;  
  10.                         }  
  11.                     }  
  12.                       
  13.                     // fire events with same session  
  14.                     fire(s);  
  15.                       
  16.                     // last reset processing flag and quit current thread processing  
  17.                     s.setEventProcessing(false);  
  18.                       
  19.                     // if remaining events, so re-insert to session queue  
  20.                     if (s.getEventQueue().size() > 0 && !s.isEventProcessing()) {  
  21.                         squeue.offer(s);  
  22.                     }  
  23.                 }  
  24.             } catch (InterruptedException e) {  
  25.                 LOG.warn(e.getMessage(), e);  
  26.             }  
  27.         }  

這裏的要點在於,worker不止一個,但爲了保證每一個session的事件有序咱們只能讓惟一一個線程對session進行處理,所以能夠看到上面的代碼中一開始對session進行了加鎖,並改變了session的狀態(置爲事件處理中)。

 

退出臨界區後,進入事件派發處理方法fire(),在fire()方法退出前其餘線程都沒有機會對該session進行處理,保證了同一時刻只有一個線程進行處理的約束。

若是某個session一直不斷有數據進入,則派發線程可能在fire()方法中停留很長時間,具體看fire()的實現以下:

 

[java]  view plain copy
 
  1. private void fire(Session s) {  
  2.             int count = 0;  
  3.             Queue<Event> q = s.getEventQueue();  
  4.             for (Event event = q.poll(); event != null; event = q.poll()) {  
  5.                 event.fire();  
  6.                 count++;  
  7.                 if (count > SPIN_COUNT) {  
  8.                     // quit loop to avoid stick same worker thread by same session  
  9.                     break;  
  10.                 }  
  11.             }  
  12.         }  

從上面代碼能夠看出,每次fire()的循環數被設置了一個上限,若事件太多時每次達到上限會退出循環釋放線程,等下一次再處理。

 

當前線程釋放對session的控制權只需簡單置事件處理狀態爲false,其餘線程就有機會從新獲取該session的控制權。

在最後退出前爲了不事件遺漏,由於可能當前線程由於處理事件達到上限數被退出循環而又沒有新的事件進入阻塞隊列觸發新的線程激活,則由當前線程主動去從新將該session放入阻塞隊列中激活新線程。

相關文章
相關標籤/搜索