架構設計:系統間通訊(5)——IO通訊模型和JAVA實踐 下篇

七、異步IO

上面兩篇文章中,咱們分別講解了阻塞式同步IO、非阻塞式同步IO、多路複用IO 這三種IO模型,以及JAVA對於這三種IO模型的支持。重點說明了IO模型是由操做系統提供支持,且這三種IO模型都是同步IO,都是採用的「應用程序不詢問我,我毫不會主動通知」的方式。 java

異步IO則是採用「訂閱-通知」模式:即應用程序向操做系統註冊IO監聽,而後繼續作本身的事情。當操做系統發生IO事件,而且準備好數據後,在主動通知應用程序,觸發相應的函數: 程序員

這裏寫圖片描述

  • 和同步IO同樣,異步IO也是由操做系統進行支持的。微軟的windows系統提供了一種異步IO技術:IOCP(I/O Completion Port,I/O完成端口); apache

  • Linux下因爲沒有這種異步IO技術,因此使用的是epoll(上文介紹過的一種多路複用IO技術的實現)對異步IO進行模擬json

八、JAVA的支持(JAVA AIO)

8-一、JAVA AIO框架簡析

這裏寫圖片描述

  • 一樣的猶如《架構設計:系統間通訊(4)——IO通訊模型和JAVA實踐 中篇》中對JAVA NIO框架的實現分析,這裏也沒有將JAVA AIO框架全部的實現類畫完,只是經過這個結構分析要告訴各位讀者JAVA AIO中類設計和操做系統的相關性 windows

  • 在文中咱們一再說明JAVA AIO框架在windows下使用windows IOCP技術,在Linux下使用epoll多路複用IO技術模擬異步IO,這個從JAVA AIO框架的部分類設計上就能夠看出來。例如框架中,在Windows下負責實現套接字通道的具體類是 「sun.nio.ch.WindowsAsynchronousSocketChannelImpl」,其引用的IOCP類型文檔註釋如是: 緩存

/**
* Windows implementation of AsynchronousChannelGroup encapsulating an I/O
* completion port.
*/ 服務器

若是您感興趣,固然能夠去看看所有完整代碼(建議從「java.nio.channels.spi.AsynchronousChannelProvider」這個類看起)。 網絡

  • 特別說明一下,請注意圖中的「java.nio.channels.NetworkChannel」接口,這個接口一樣被JAVA NIO框架實現了,以下圖所示:
    這裏寫圖片描述

8-二、代碼實例

下面,咱們經過一個代碼示例,來說解JAVA AIO框架的具體使用,先上代碼,在針對代碼編寫和運行中的要點進行講解: 架構

package testASocket; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; /** * JAVA AIO框架測試。請必定將 * 《架構設計:系統間通訊(4)——IO通訊模型和JAVA實踐 中篇》看了後再看本篇測試代碼。 * 這樣對您理解代碼的關鍵點很是有益。 * @author yinwenjie */ public class SocketServer { static {
        BasicConfigurator.configure();
    } private static final Object waitObject = new Object(); /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { /* * 對於使用的線程池技術,我必定要多說幾句 * 一、Executors是線程池生成工具,經過這個工具咱們能夠很輕鬆的生成「固定大小的線程池」、「調度池」、「可伸縮線程數量的池」。具體請看API Doc * 二、固然您也能夠經過ThreadPoolExecutor直接生成池。 * 三、這個線程池是用來獲得操做系統的「IO事件通知」的,不是用來進行「獲得IO數據後的業務處理的」。要進行後者的操做,您能夠再使用一個池(最好不要混用) * 四、您也能夠不使用線程池(不推薦),若是決定不使用線程池,直接AsynchronousServerSocketChannel.open()就好了。 * */ ExecutorService threadPool = Executors.newFixedThreadPool(20);
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool); final AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group); //設置要監聽的端口「0.0.0.0」表明本機全部IP設備 serverSocket.bind(new InetSocketAddress("0.0.0.0", 83)); //爲AsynchronousServerSocketChannel註冊監聽,注意只是爲AsynchronousServerSocketChannel通道註冊監聽 //並不包括爲 隨後客戶端和服務器 socketchannel通道註冊的監聽 serverSocket.accept(null, new ServerSocketChannelHandle(serverSocket)); //等待,以便觀察現象(這個和要講解的原理自己沒有任何關係,只是爲了保證守護線程不會退出) synchronized(waitObject) {
            waitObject.wait();
        }
    }
} /** * 這個處理器類,專門用來響應 ServerSocketChannel 的事件。 * 還記得咱們在《架構設計:系統間通訊(4)——IO通訊模型和JAVA實踐 中篇》中所提到的內容嗎?ServerSocketChannel只有一種事件:接受客戶端的鏈接 * @author yinwenjie */ class ServerSocketChannelHandle implements CompletionHandler<AsynchronousSocketChannel, Void> { /** * 日誌 */ private static final Log LOGGER = LogFactory.getLog(ServerSocketChannelHandle.class); private AsynchronousServerSocketChannel serverSocketChannel; /** * @param serverSocketChannel */ public ServerSocketChannelHandle(AsynchronousServerSocketChannel serverSocketChannel) { this.serverSocketChannel = serverSocketChannel;
    } /** * 注意,咱們分別觀察 this、socketChannel、attachment三個對象的id。 * 來觀察不一樣客戶端鏈接到達時,這三個對象的變化,以說明ServerSocketChannelHandle的監聽模式 */ @Override public void completed(AsynchronousSocketChannel socketChannel, Void attachment) {
        ServerSocketChannelHandle.LOGGER.info("completed(AsynchronousSocketChannel result, ByteBuffer attachment)"); //每次都要從新註冊監聽(一次註冊,一次響應),可是因爲「文件狀態標示符」是獨享的,因此不須要擔憂有「漏掉的」事件 this.serverSocketChannel.accept(attachment, this); //爲這個新的socketChannel註冊「read」事件,以便操做系統在收到數據並準備好後,主動通知應用程序 //在這裏,因爲咱們要將這個客戶端屢次傳輸的數據累加起來一塊兒處理,因此咱們將一個stringbuffer對象做爲一個「附件」依附在這個channel上 // ByteBuffer readBuffer = ByteBuffer.allocate(50);
        socketChannel.read(readBuffer, new StringBuffer(), new SocketChannelReadHandle(socketChannel , readBuffer));
    } /* (non-Javadoc) * @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object) */ @Override public void failed(Throwable exc, Void attachment) {
        ServerSocketChannelHandle.LOGGER.info("failed(Throwable exc, ByteBuffer attachment)");
    }
} /** * 負責對每個socketChannel的數據獲取事件進行監聽。<p> * * 重要的說明:一個socketchannel都會有一個獨立工做的SocketChannelReadHandle對象(CompletionHandler接口的實現), * 其中又都將獨享一個「文件狀態標示」對象FileDescriptor、 * 一個獨立的由程序員定義的Buffer緩存(這裏咱們使用的是ByteBuffer)、 * 因此不用擔憂在服務器端會出現「竄對象」這種狀況,由於JAVA AIO框架已經幫您組織好了。<p> * * 可是最重要的,用於生成channel的對象:AsynchronousChannelProvider是單例模式,不管在哪組socketchannel, * 對是一個對象引用(但這不要緊,由於您不會直接操做這個AsynchronousChannelProvider對象)。 * @author yinwenjie */ class SocketChannelReadHandle implements CompletionHandler<Integer, StringBuffer> { /** * 日誌 */ private static final Log LOGGER = LogFactory.getLog(SocketChannelReadHandle.class); private AsynchronousSocketChannel socketChannel; /** * 專門用於進行這個通道數據緩存操做的ByteBuffer<br> * 固然,您也能夠做爲CompletionHandler的attachment形式傳入。<br> * 這是,在這段示例代碼中,attachment被咱們用來記錄全部傳送過來的Stringbuffer了。 */ private ByteBuffer byteBuffer; public SocketChannelReadHandle(AsynchronousSocketChannel socketChannel , ByteBuffer byteBuffer) { this.socketChannel = socketChannel; this.byteBuffer = byteBuffer;
    } /* (non-Javadoc) * @see java.nio.channels.CompletionHandler#completed(java.lang.Object, java.lang.Object) */ @Override public void completed(Integer result, StringBuffer historyContext) { //若是條件成立,說明客戶端主動終止了TCP套接字,這時服務端終止就能夠了 if(result == -1) { try { this.socketChannel.close();
            } catch (IOException e) {
                SocketChannelReadHandle.LOGGER.error(e);
            } return;
        }

        SocketChannelReadHandle.LOGGER.info("completed(Integer result, Void attachment) : 而後咱們來取出通道中準備好的值"); /* * 實際上,因爲咱們從Integer result知道了本次channel從操做系統獲取數據總長度 * 因此實際上,咱們不須要切換成「讀模式」的,可是爲了保證編碼的規範性,仍是建議進行切換。 * * 另外,不管是JAVA AIO框架仍是JAVA NIO框架,都會出現「buffer的總容量」小於「當前從操做系統獲取到的總數據量」, * 但區別是,JAVA AIO框架中,咱們不須要專門考慮處理這樣的狀況,由於JAVA AIO框架已經幫咱們作了處理(作成了屢次通知) * */ this.byteBuffer.flip(); byte[] contexts = new byte[1024]; this.byteBuffer.get(contexts, 0, result); this.byteBuffer.clear(); try {
            String nowContent = new String(contexts , 0 , result , "UTF-8");
            historyContext.append(nowContent);
            SocketChannelReadHandle.LOGGER.info("================目前的傳輸結果:" + historyContext);
        } catch (UnsupportedEncodingException e) {
            SocketChannelReadHandle.LOGGER.error(e);
        } //若是條件成立,說明尚未接收到「結束標記」 if(historyContext.indexOf("over") == -1) { return;
        } //========================================================================= // 和上篇文章的代碼相同,咱們以「over」符號做爲客戶端完整信息的標記 //========================================================================= SocketChannelReadHandle.LOGGER.info("=======收到完整信息,開始處理業務=========");
        historyContext = new StringBuffer(); //還要繼續監聽(一次監聽一次通知) this.socketChannel.read(this.byteBuffer, historyContext, this);
    } /* (non-Javadoc) * @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object) */ @Override public void failed(Throwable exc, StringBuffer historyContext) {
        SocketChannelReadHandle.LOGGER.info("=====發現客戶端異常關閉,服務器將關閉TCP通道"); try { this.socketChannel.close();
        } catch (IOException e) {
            SocketChannelReadHandle.LOGGER.error(e);
        }
    }
}

8-2-一、要點講解

  • 注意在JAVA NIO框架中,咱們說到了一個重要概念「selector」(選擇器)。它負責代替應用查詢中全部已註冊的通道到操做系統中進行IO事件輪詢、管理當前注 冊的通道集合,定位發生事件的通道等操操做;可是在JAVA AIO框架中,因爲應用程序不是「輪詢」方式,而是訂閱-通知方式,因此再也不須要「selector」(選擇器)了,改由channel通道直接到操做系統註冊監聽併發

  • JAVA AIO框架中,只實現了兩種網絡IO通道「AsynchronousServerSocketChannel」(服務器監聽通道)、 「AsynchronousSocketChannel」(socket套接字通道)。可是不管哪一種通道他們都有獨立的fileDescriptor(文 件標識符)、attachment(附件,附件可使任意對象,相似「通道上下文」),並被獨立的SocketChannelReadHandle類實例 引用。咱們經過debug操做來看看它們的引用結構:

在測試過程當中,咱們啓動了兩個客戶端(客戶端用什麼語言來寫都行,用阻塞或者非阻塞方式也都行,只要是支持 TCP Socket套接字的就行。若是您非要看看客戶端是怎麼寫的,您能夠參見個人《架構設計:系統間通訊(3)——IO通訊模型和JAVA實踐 上篇》這篇文章中的客戶端代碼示例),而後咱們觀察服務器端對這兩個客戶端通道的處理狀況:

這裏寫圖片描述

能夠看到,在服務器端分別爲客戶端1和客戶端2建立的兩個WindowsAsynchronousSocketChannelImpl對象爲:

這裏寫圖片描述

客戶端1:WindowsAsynchronousSocketChannelImpl:760 | FileDescriptor:762

客戶端2:WindowsAsynchronousSocketChannelImpl:792 | FileDescriptor:797

接下來,咱們讓兩個客戶端發送信息到服務器端,並觀察服務器端的處理狀況。客戶端1發來的消息和客戶端2發來的消息,在服務器端的處理狀況以下圖所示:

這裏寫圖片描述

客戶端1:WindowsAsynchronousSocketChannelImpl:760 | FileDescriptor:762 | SocketChannelReadHandle:803 | HeapByteBuffer:808

客戶端2:WindowsAsynchronousSocketChannelImpl:792 | FileDescriptor:797 | SocketChannelReadHandle:828 | HeapByteBuffer:833

能夠明顯看到,服務器端處理每個客戶端通道所使用的SocketChannelReadHandle(處理器)對象都是獨立的,而且所引用的SocketChannel對象都是獨立的

  • JAVA NIO和JAVA AIO框架,除了由於操做系統的實現不同而去掉了Selector外,其餘的重要概念都是存在的,例如上文中提到的Channel的概念,還有演示代碼 中使用的Buffer緩存方式。實際上JAVA NIO和JAVA AIO框架您能夠當作是一套完整的「高併發IO處理」的實現。

8-2-二、還有改進可能

固然,以上代碼是示例代碼,目標是爲了讓您瞭解JAVA AIO框架的基本使用。因此它還有不少改造的空間,例如:

  • 在生產環境下,咱們須要記錄這個通道上「用戶的登陸信息」。那麼這個需求可使用JAVA AIO中的「附件」功能進行實現。

  • 咱們在本文和上文(《架構設計:系統間通訊(4)——IO通訊模型和JAVA實踐 中篇》)中,都是使用「自定義文本」格式傳輸內容,並檢查「over」關鍵字。可是在正式生產環境下,您會這樣用嗎?

  • 顯然是不會的,由於它壓縮率不高。要麼咱們會使用json格式:由於它在相同的壓縮率的前提下,有更好的信息結構;咱們還可使用 protobuffer:由於它兼顧傳輸效率和良好的信息結構;甚至還可使用TLV格式:提供很好的信息傳輸效率(它連一個多餘的byte描述都沒 有),這幾種格式的講解,您能夠參考《架構設計:系統間通訊(1)——概述從「聊天」開始上篇》。

  • 記住JAVA AIO 和 JAVA NIO 框架都是要使用線程池的(固然您也能夠不用),線程池的使用原則,必定是只有業務處理部分才使用, 使用後立刻結束線程的執行(還回線程池或者消滅它)。JAVA AIO框架中還有一個線程池,是拿給「通知處理器」使用的,這是由於JAVA AIO框架是基於「訂閱-通知」模型的,「訂閱」操做能夠由主線程完成,可是您總不能要求在應用程序中併發的「通知」操做也在主線程上完成吧^_^。

  • 最好的改進方式,固然就是使用Netty或者Mina咯。

8-三、爲何還有Netty

那麼有的讀者可能就會問,既然JAVA NIO / JAVA AIO已經實現了各主流操做系統的底層支持,那麼爲何如今主流的JAVA NIO技術會是Netty和MINA呢?答案很簡單:由於更好用,這裏舉幾個方面的例子:

  • 雖然JAVA NIO 和 JAVA AIO框架提供了 多路複用IO/異步IO的支持,可是並無提供上層「信息格式」的良好封裝。例如前二者並無提供針對 Protocol Buffer、JSON這些信息格式的封裝,可是Netty框架提供了這些數據格式封裝(基於責任鏈模式的編碼和解碼功能)

  • 要編寫一個可靠的、易維護的、高性能的(注意它們的排序)NIO/AIO 服務器應用。除了框架自己要兼容實現各種操做系統的實現外。更重要的是它應該還要處理不少上層特有服務,例如:客戶端的權限、還有上面提到的信息格式封 裝、簡單的數據讀取。這些Netty框架都提供了響應的支持。

  • JAVA NIO框架存在一個poll/epoll bug:Selector doesn’t block on Selector.select(timeout),不能block意味着CPU的使用率會變成100%(這是底層JNI的問題,上層要處理這個異常實際 上也好辦)。固然這個bug只有在Linux內核上才能重現。

  • 這個問題在JDK 1.7版本中尚未被徹底解決:http://bugs.java.com/bugdatabase/view_bug.do?bug_id=2147719。雖然Netty 4.0中也是基於JAVA NIO框架進行封裝的(上文中已經給出了Netty中NioServerSocketChannel類的介紹),可是Netty已經將這個bug進行了處理。

  • 其餘緣由,用過Netty後,您就能夠本身進行比較了。

九、後文預告

經過三篇文章,咱們把操做系統的四種IO模型都進行了介紹,而且說明了JAVA對這四種IO模型的支持,也給出了代碼講解。有讀者反映仍是不夠深 入,例如典型的EPOLL技術的工做細節並無講解,也沒有進行各類IO模型的性能比較,等等。別慌,我計劃將來的3-4個月咱們都會討論「系統間通訊技 術」,因此就想作「負載均衡」那個系列的專欄同樣,咱們會在後面的時間進行補全。固然本人的技術水平有限,寫博客的目的主要也是爲了分享和總結,因此歡迎 各位讀者多多吐槽。

從下篇文章開始,咱們將話一到兩篇文章的內容,討論Netty框架(以Netty4.0版本做爲討論基礎)。隨後咱們將開始介紹JAVA 的RIM,並從RIM引導進入RPC技術的介紹。

相關文章
相關標籤/搜索