NIO一般採用Reactor模式,AIO一般採用Proactor模式。AIO簡化了程序的編寫,stream的讀取和寫入都有OS來完成,不須要像NIO那樣子遍歷Selector。Windows基於IOCP實現AIO,Linux只有eppoll模擬實現了AIO。
Java7以前的JDK只支持NIO和BIO,從7開始支持AIO。
4種通訊方式:TCP/IP+BIO, TCP/IP+NIO, UDP/IP+BIO, UDP/IP+NIO。
TCP/IP+BIO、
Socket和ServerSocket實現,ServerSocket實現Server端端口監聽,Socket用於創建網絡IO鏈接。
不適用於處理多個請求 1.生成Socket會消耗過多的本地資源。2. Socket鏈接的創建通常比較慢。
BIO狀況下,能支持的鏈接數有限,通常都採起accept獲取Socket之後採用一個thread來處理,one connection one thread。不管鏈接是否有真正數據請求,都須要獨佔一個thread。
能夠經過設立Socket池來必定程度上解決問題,可是使用池須要注意的問題是:1. 競爭等待比較多。 2. 須要控制好超時時間。
TCP/IP+NIO
使用Channel(SocketChannel和ServerSocketChannel)和Selector。
Server端一般由一個thread來監聽connect事件,另外多個thread來監聽讀寫事件。這樣作的好處是這些鏈接只有在真是請求的時候纔會建立thread來處理,one request one thread。這種方式在server端須要支持大量鏈接但這些鏈接同時發送請求的峯值不會不少的時候十分有效。
UDP/IP+BIO
DatagramSocket和DatagramPacket。DatagramSocket負責監聽端口以及讀寫數據,DatagramPacket做爲數據流對象進行傳輸。
UDP/IP是無鏈接的,沒法進行雙向通訊,除非雙方都成爲UDP Server。
UDP/IP+NIO
經過DatagramChannel和ByteBuffer實現。DatagramChannel負責端口監聽及讀寫。ByteBuffer負責數據流傳輸。
若是要將消息發送到多臺機器,若是爲每一個目標機器都創建一個鏈接的話,會有很大的網絡流量壓力。這時候可使用基於UDP/IP的Multicast協議傳輸,Java中能夠經過MulticastSocket和DatagramPacket來實現。
Multicast通常多用於多臺機器的狀態同步,好比JGroups。SRM, URGCP都是Multicast的實現方式。eBay就採用SRM來實現將數據從主數據庫同步到各個搜索節點機器。
Java aio(異步網絡IO)初探
按照《Unix網絡編程》的劃分,IO模型能夠分爲:阻塞IO、非阻塞IO、IO複用、信號驅動IO和異步IO,按照POSIX標準來劃分只分爲兩類:同步IO和異步IO。如何區分呢?首先一個IO操做其實分紅了兩個步驟:發起IO請求和實際的IO操做,同步IO和異步IO的區別就在於第二個步驟是否阻塞,若是實際的IO讀寫阻塞請求進程,那麼就是同步IO,所以阻塞IO、非阻塞IO、IO服用、信號驅動IO都是同步IO,若是不阻塞,而是操做系統幫你作完IO操做再將結果返回給你,那麼就是異步IO。阻塞IO和非阻塞IO的區別在於第一步,發起IO請求是否會被阻塞,若是阻塞直到完成那麼就是傳統的阻塞IO,若是不阻塞,那麼就是非阻塞IO。
Java nio 2.0的主要改進就是引入了異步IO(包括文件和網絡),這裏主要介紹下異步網絡IO API的使用以及框架的設計,以TCP服務端爲例。首先看下爲了支持AIO引入的新的類和接口:
java.nio.channels.AsynchronousChannel
標記一個channel支持異步IO操做。
java.nio.channels.AsynchronousServerSocketChannel
ServerSocket的aio版本,建立TCP服務端,綁定地址,監聽端口等。
java.nio.channels.AsynchronousSocketChannel
面向流的異步socket channel,表示一個鏈接。
java.nio.channels.AsynchronousChannelGroup
異步channel的分組管理,目的是爲了資源共享。一個AsynchronousChannelGroup綁定一個線程池,這個線程池執行兩個任務:處理IO事件和派發CompletionHandler。AsynchronousServerSocketChannel建立的時候能夠傳入一個 AsynchronousChannelGroup,那麼經過AsynchronousServerSocketChannel建立的 AsynchronousSocketChannel將同屬於一個組,共享資源。
java.nio.channels.CompletionHandler
異步IO操做結果的回調接口,用於定義在IO操做完成後所做的回調工做。AIO的API容許兩種方式來處理異步操做的結果:返回的Future模式或者註冊CompletionHandler,我更推薦用CompletionHandler的方式,這些handler的調用是由 AsynchronousChannelGroup的線程池派發的。顯然,線程池的大小是性能的關鍵因素。AsynchronousChannelGroup容許綁定不一樣的線程池,經過三個靜態方法來建立:
1public static AsynchronousChannelGroup withFixedThreadPool(int nThreads,
2 ThreadFactory threadFactory)
3 throws IOException
4
5public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,
6 int initialSize)
7
8public static AsynchronousChannelGroup withThreadPool(ExecutorService executor)
9 throws IOException
10
須要根據具體應用相應調整,從框架角度出發,須要暴露這樣的配置選項給用戶。
在介紹完了aio引入的TCP的主要接口和類以後,咱們來設想下一個aio框架應該怎麼設計。參考非阻塞nio框架的設計,通常都是採用Reactor模式,Reacot負責事件的註冊、select、事件的派發;相應地,異步IO有個Proactor模式,Proactor負責 CompletionHandler的派發,查看一個典型的IO寫操做的流程來看二者的區別:
Reactor: send(msg) -> 消息隊列是否爲空,若是爲空 -> 向Reactor註冊OP_WRITE,而後返回 -> Reactor select -> 觸發Writable,通知用戶線程去處理 ->先註銷Writable(不少人遇到的cpu 100%的問題就在於沒有註銷),處理Writeable,若是沒有徹底寫入,繼續註冊OP_WRITE。注意到,寫入的工做仍是用戶線程在處理。
Proactor: send(msg) -> 消息隊列是否爲空,若是爲空,發起read異步調用,並註冊CompletionHandler,而後返回。 -> 操做系統負責將你的消息寫入,並返回結果(寫入的字節數)給Proactor -> Proactor派發CompletionHandler。可見,寫入的工做是操做系統在處理,無需用戶線程參與。事實上在aio的API 中,AsynchronousChannelGroup就扮演了Proactor的角色。
CompletionHandler有三個方法,分別對應於處理成功、失敗、被取消(經過返回的Future)狀況下的回調處理:
1public interface CompletionHandler<V,A> {
2
3 void completed(V result, A attachment);
4
5 void failed(Throwable exc, A attachment);
6
7 void cancelled(A attachment);
8}
其中的泛型參數V表示IO調用的結果,而A是發起調用時傳入的attchment。
在初步介紹完aio引入的類和接口後,咱們看看一個典型的tcp服務端是怎麼啓動的,怎麼接受鏈接並處理讀和寫,這裏引用的代碼都是yanf4j 的aio分支中的代碼,能夠從svn checkout,svn地址: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio
第一步,建立一個AsynchronousServerSocketChannel,建立以前先建立一個 AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel能夠綁定一個 AsynchronousChannelGroup,那麼經過這個AsynchronousServerSocketChannel創建的鏈接都將同屬於一個AsynchronousChannelGroup並共享資源:
1this.asynchronousChannelGroup = AsynchronousChannelGroup
2 .withCachedThreadPool(Executors.newCachedThreadPool(),
3 this.threadPoolSize);
而後初始化一個AsynchronousServerSocketChannel,經過open方法:
1this.serverSocketChannel = AsynchronousServerSocketChannel
2 .open(this.asynchronousChannelGroup);
3
經過nio 2.0引入的SocketOption類設置一些TCP選項:
1this.serverSocketChannel
2 .setOption(
3 StandardSocketOption.SO_REUSEADDR,true);
4this.serverSocketChannel
5 .setOption(
6 StandardSocketOption.SO_RCVBUF,16*1024);
綁定本地地址:
1this.serverSocketChannel
2 .bind(new InetSocketAddress("localhost",8080), 100);
其中的100用於指定等待鏈接的隊列大小(backlog)。完了嗎?尚未,最重要的監聽工做還沒開始,監聽端口是爲了等待鏈接上來以便accept產生一個AsynchronousSocketChannel來表示一個新創建的鏈接,所以須要發起一個accept調用,調用是異步的,操做系統將在鏈接創建後,將最後的結果——AsynchronousSocketChannel返回給你:
1public void pendingAccept() {
2 if (this.started && this.serverSocketChannel.isOpen()) {
3 this.acceptFuture = this.serverSocketChannel.accept(null,
4 new AcceptCompletionHandler());
5
6 } else {
7 throw new IllegalStateException("Controller has been closed");
8 }
9 }
10
注意,重複的accept調用將會拋出PendingAcceptException,後文提到的read和write也是如此。accept方法的第一個參數是你想傳給CompletionHandler的attchment,第二個參數就是註冊的用於回調的CompletionHandler,最後返回結果Future。你能夠對future作處理,這裏採用更推薦的方式就是註冊一個CompletionHandler。那麼accept的CompletionHandler中作些什麼工做呢?顯然一個赤裸裸的 AsynchronousSocketChannel是不夠的,咱們須要將它封裝成session,一個session表示一個鏈接(mina裏就叫 IoSession了),裏面帶了一個緩衝的消息隊列以及一些其餘資源等。在鏈接創建後,除非你的服務器只准備接受一個鏈接,否則你須要在後面繼續調用pendingAccept來發起另外一個accept請求:
1private final class AcceptCompletionHandler implements
2 CompletionHandler<AsynchronousSocketChannel, Object> {
3
4 @Override
5 public void cancelled(Object attachment) {
6 logger.warn("Accept operation was canceled");
7 }
8
9 @Override
10 public void completed(AsynchronousSocketChannel socketChannel,
11 Object attachment) {
12 try {
13 logger.debug("Accept connection from "
14 + socketChannel.getRemoteAddress());
15 configureChannel(socketChannel);
16 AioSessionConfig sessionConfig = buildSessionConfig(socketChannel);
17 Session session = new AioTCPSession(sessionConfig,
18 AioTCPController.this.configuration
19 .getSessionReadBufferSize(),
20 AioTCPController.this.sessionTimeout);
21 session.start();
22 registerSession(session);
23 } catch (Exception e) {
24 e.printStackTrace();
25 logger.error("Accept error", e);
26 notifyException(e);
27 } finally {
28 <strong>pendingAccept</strong>();
29 }
30 }
31
32 @Override
33 public void failed(Throwable exc, Object attachment) {
34 logger.error("Accept error", exc);
35 try {
36 notifyException(exc);
37 } finally {
38 <strong>pendingAccept</strong>();
39 }
40 }
41 }
42
注意到了吧,咱們在failed和completed方法中在最後都調用了pendingAccept來繼續發起accept調用,等待新的鏈接上來。有的同窗可能要說了,這樣搞是否是遞歸調用,會不會堆棧溢出?實際上不會,由於發起accept調用的線程與CompletionHandler回調的線程並不是同一個,不是一個上下文中,二者之間沒有耦合關係。要注意到,CompletionHandler的回調共用的是 AsynchronousChannelGroup綁定的線程池,所以千萬別在CompletionHandler回調方法中調用阻塞或者長時間的操做,例如sleep,回調方法最好能支持超時,防止線程池耗盡。
鏈接創建後,怎麼讀和寫呢?回憶下在nonblocking nio框架中,鏈接創建後的第一件事是幹什麼?註冊OP_READ事件等待socket可讀。異步IO也一樣如此,鏈接創建後立刻發起一個異步read調用,等待socket可讀,這個是Session.start方法中所作的事情:
1public class AioTCPSession {
2 protected void start0() {
3 pendingRead();
4 }
5
6 protected final void pendingRead() {
7 if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {
8 if (!this.readBuffer.hasRemaining()) {
9 this.readBuffer = ByteBufferUtils
10 .increaseBufferCapatity(this.readBuffer);
11 }
12 this.readFuture = this.asynchronousSocketChannel.read(
13 this.readBuffer, this, this.readCompletionHandler);
14 } else {
15 throw new IllegalStateException(
16 "Session Or Channel has been closed");
17 }
18 }
19
20}
21
AsynchronousSocketChannel的read調用與AsynchronousServerSocketChannel的accept調用相似,一樣是非阻塞的,返回結果也是一個Future,可是寫的結果是整數,表示寫入了多少字節,所以read調用返回的是 Future,方法的第一個參數是讀的緩衝區,操做系統將IO讀到數據拷貝到這個緩衝區,第二個參數是傳遞給 CompletionHandler的attchment,第三個參數就是註冊的用於回調的CompletionHandler。這裏保存了read的結果Future,這是爲了在關閉鏈接的時候可以主動取消調用,accept也是如此。如今能夠看看read的CompletionHandler的實現:
1public final class ReadCompletionHandler implements
2 CompletionHandler<Integer, AbstractAioSession> {
3
4 private static final Logger log = LoggerFactory
5 .getLogger(ReadCompletionHandler.class);
6 protected final AioTCPController controller;
7
8 public ReadCompletionHandler(AioTCPController controller) {
9 this.controller = controller;
10 }
11
12 @Override
13 public void cancelled(AbstractAioSession session) {
14 log.warn("Session(" + session.getRemoteSocketAddress()
15 + ") read operation was canceled");
16 }
17
18 @Override
19 public void completed(Integer result, AbstractAioSession session) {
20 if (log.isDebugEnabled())
21 log.debug("Session(" + session.getRemoteSocketAddress()
22 + ") read +" + result + " bytes");
23 if (result < 0) {
24 session.close();
25 return;
26 }
27 try {
28 if (result > 0) {
29 session.updateTimeStamp();
30 session.getReadBuffer().flip();
31 session.decode();
32 session.getReadBuffer().compact();
33 }
34 } finally {
35 try {
36 session.pendingRead();
37 } catch (IOException e) {
38 session.onException(e);
39 session.close();
40 }
41 }
42 controller.checkSessionTimeout();
43 }
44
45 @Override
46 public void failed(Throwable exc, AbstractAioSession session) {
47 log.error("Session read error", exc);
48 session.onException(exc);
49 session.close();
50 }
51
52}
53
若是IO讀失敗,會返回失敗產生的異常,這種狀況下咱們就主動關閉鏈接,經過session.close()方法,這個方法幹了兩件事情:關閉channel和取消read調用:
1if (null != this.readFuture) {
2 this.readFuture.cancel(true);
3 }
4this.asynchronousSocketChannel.close();
5
在讀成功的狀況下,咱們還須要判斷結果result是否小於0,若是小於0就表示對端關閉了,這種狀況下咱們也主動關閉鏈接並返回。若是讀到必定字節,也就是result大於0的狀況下,咱們就嘗試從讀緩衝區中decode出消息,並派發給業務處理器的回調方法,最終經過pendingRead繼續發起read調用等待socket的下一次可讀。可見,咱們並不須要本身去調用channel來進行IO讀,而是操做系統幫你直接讀到了緩衝區,而後給你一個結果表示讀入了多少字節,你處理這個結果便可。而nonblocking IO框架中,是reactor通知用戶線程socket可讀了,而後用戶線程本身去調用read進行實際讀操做。這裏還有個須要注意的地方,就是decode出來的消息的派發給業務處理器工做最好交給一個線程池來處理,避免阻塞group綁定的線程池。
IO寫的操做與此相似,不過一般寫的話咱們會在session中關聯一個緩衝隊列來處理,沒有徹底寫入或者等待寫入的消息都存放在隊列中,隊列爲空的狀況下發起write調用:
1protected void write0(WriteMessage message) {
2 boolean needWrite = false;
3 synchronized (this.writeQueue) {
4 needWrite = this.writeQueue.isEmpty();
5 this.writeQueue.offer(message);
6 }
7 if (needWrite) {
8 pendingWrite(message);
9 }
10 }
11
12 protected final void pendingWrite(WriteMessage message) {
13 message = preprocessWriteMessage(message);
14 if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {
15 this.asynchronousSocketChannel.write(message.getWriteBuffer(),
16 this, this.writeCompletionHandler);
17 } else {
18 throw new IllegalStateException(
19 "Session Or Channel has been closed");
20 }
21 }
22
write調用返回的結果與read同樣是一個Future,而write的CompletionHandler處理的核心邏輯大概是這樣:
1@Override
2 public void completed(Integer result, AbstractAioSession session) {
3 if (log.isDebugEnabled())
4 log.debug("Session(" + session.getRemoteSocketAddress()
5 + ") writen " + result + " bytes");
6
7 WriteMessage writeMessage;
8 Queue<WriteMessage> writeQueue = session.getWriteQueue();
9 synchronized (writeQueue) {
10 writeMessage = writeQueue.peek();
11 if (writeMessage.getWriteBuffer() == null
12 || !writeMessage.getWriteBuffer().hasRemaining()) {
13 writeQueue.remove();
14 if (writeMessage.getWriteFuture() != null) {
15 writeMessage.getWriteFuture().setResult(Boolean.TRUE);
16 }
17 try {
18 session.getHandler().onMessageSent(session,
19 writeMessage.getMessage());
20 } catch (Exception e) {
21 session.onException(e);
22 }
23 writeMessage = writeQueue.peek();
24 }
25 }
26 if (writeMessage != null) {
27 try {
28 session.pendingWrite(writeMessage);
29 } catch (IOException e) {
30 session.onException(e);
31 session.close();
32 }
33 }
34 }
35
compete方法中的result就是實際寫入的字節數,而後咱們判斷消息的緩衝區是否還有剩餘,若是沒有就將消息從隊列中移除,若是隊列中還有消息,那麼繼續發起write調用。
重複一下,這裏引用的代碼都是yanf4j aio分支中的源碼,感興趣的朋友能夠直接check out出來看看: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio。
在引入了aio以後,java對於網絡層的支持已經很是完善,該有的都有了,java也已經成爲服務器開發的首選語言之一。java的弱項在於對內存的管理上,因爲這一切都交給了GC,所以在高性能的網絡服務器上仍是Cpp的天下。java這種單一堆模型比之erlang的進程內堆模型仍是有差距,很難作到高效的垃圾回收和細粒度的內存管理。
這裏僅僅是介紹了aio開發的核心流程,對於一個網絡框架來講,還須要考慮超時的處理、緩衝buffer的處理、業務層和網絡層的切分、可擴展性、性能的可調性以及必定的通用性要求。