RPC框架入門篇(1)之【BIO,NIO,AIO】


前段時間在看dubbo的源碼,看的差很少了也開始在寫一個RPC框架,如今寫的快一半了,纔想起來怎麼按部就班的經過文章的方式跟你們聊這個東西。因而思來想去,決定先從最基礎的服務間網絡通訊提及比較好,後面再慢慢的跟你們引出怎麼去寫一個RPC框架。編程

本篇主要跟你們聊下網咯I/O,主要是針對初學者的由淺入深系列。緩存

傳統的BIO通訊弊端


傳統的BIO通訊當接收到客戶端的請求時,爲每個請求建立一個新的線程進行鏈路處理,處理完成以後,經過輸出流返回給客戶端,而後線程銷燬。bash

這種模型的弊端就是當併發數上漲之後,server端的線程也跟着線性增加,會帶來服務性能的急劇降低,而且可能會發生線程堆棧溢出,從而致使不能對外提供服務。

僞異步IO模型


在傳統BIO模型的基礎上,用線程池來處理客戶端的請求,防止高併發致使的server端資源被耗盡問題。服務器

僞異步IO的缺點

不管是BIO仍是僞異步本質上都是阻塞型I/O,都是基於Stream進行網絡數據的讀和寫。首先咱們看下InputStreamread方法源碼:網絡

/**
     * Reads the next byte of data from the input stream. The value byte is
     * returned as an <code>int</code> in the range <code>0</code> to
     * <code>255</code>. If no byte is available because the end of the stream
     * has been reached, the value <code>-1</code> is returned. This method
     * blocks until input data is available, the end of the stream is detected,
     * or an exception is thrown.
     *
     * <p> A subclass must provide an implementation of this method.
     *
     * @return     the next byte of data, or <code>-1</code> if the end of the
     *             stream is reached.
     * @exception  IOException  if an I/O error occurs.
     */
    public abstract int read() throws IOException;
複製代碼

經過註釋能夠知道,對Socket的輸入流進行讀取的時候,會一直髮生阻塞,直到如下3種狀況:併發

  • 有數據可讀
  • 可用數據已經讀取完畢
  • 發生空指針或者異常

意味着當對方發送請求或者應答消息比較緩慢的時候,或者網絡傳輸比較慢的時候,讀取輸入流一方的通訊線程將被長時間阻塞。在此期間,後面的請求都得排隊。框架

在繼續看下Outputtreamwrite方法:異步

/**
     * Writes the specified byte to this output stream. The general
     * contract for <code>write</code> is that one byte is written
     * to the output stream. The byte to be written is the eight
     * low-order bits of the argument <code>b</code>. The 24
     * high-order bits of <code>b</code> are ignored.
     * <p>
     * Subclasses of <code>OutputStream</code> must provide an
     * implementation for this method.
     *
     * @param      b   the <code>byte</code>.
     * @exception  IOException  if an I/O error occurs. In particular,
     *             an <code>IOException</code> may be thrown if the
     *             output stream has been closed.
     */
    public abstract void write(int b) throws IOException;
複製代碼

當調用write寫輸出流的時候,會發生阻塞,直到全部要發送的字節所有寫入完畢,或者發生異常。切換爲從TCP/IP角度來理解,當消息的接收方處理比較緩慢,不能及時的從TCP緩衝區讀取數據,這會致使發送方的TCP``window size不斷縮小,直到爲0,雙方處於Keep-Alive狀態,消息發送方就不能在繼續像TCP緩衝區寫入消息,若是採用的是同步阻塞I/O,write將會被無限期阻塞,直到window size大於0或者發生I/O異常。ide

所以使用阻塞I/O的SocketServerSocket在生產使用問題不少,所以NIO誕生了,對應的是SocketChannelServerSocketChannel兩個類。高併發

NIO編程介紹


NIO相關概念

Buffer

傳統的BIO主要是面向流的,能夠將數據直接寫入或者讀取到Stream對象中;而在NIO中,讀取和寫入數據都是在緩衝區中處理的,任什麼時候候訪問NIO中的數據,都是經過緩衝區進行的。最經常使用的緩衝區是ByteBuffer,經常使用的緩衝區還有下面幾種:

關於Buffer的源碼部分,因爲篇幅關係再也不囉嗦。

Channel

Channel就是一個通道,網絡數據經過Channel進行數據的讀取。Stream只是在一個方向上流動,讀和寫分別在InputStreamOutputStream上進行,而Channel能夠讀和寫同時進行。 實際上Channel能夠分爲兩大類,用於網絡數據讀寫的SelectableChannel和文件操做的FileChannelNIO中的ServerSocketChannelSocketChannel都是SelectableChannel的子類。

Selector

多路複用器SelectorNIO的基礎,多路複用器能夠不斷地輪循註冊在其上的Channel,若是某個Channel發生了讀或者寫事件,那麼這個Channel就屬於就緒狀態,就會被Selector輪循出來,而後經過SelectionKey能夠讀取Channel的集合,進行後續的I/O操做。

JDK中的Selector使用了epoll()替代了傳統的select,因此一個Selector能夠同時註冊大量的Channel,沒有傳統的鏈接句柄的限制。

NIO服務端和客戶端解基本鏈路圖

NIO服務端通訊的過程大體以下:

接下來看NIO客戶端鏈路圖:

這裏就不貼server端和client端的代碼了,由於這兩部分的代碼都比較冗長。

NIO對比BIO

  1. 鏈接操做都是異步的,能夠經過多路複用器Selector註冊OP_CONNECT等待後續結果,不須要像以前客戶端那樣被同步阻塞;
  2. SocketChannel的讀寫操做都是異步的,若是沒有可讀寫的數據,直接同步返回,這樣通訊IO線程就能夠處理其餘的請求,不會被阻塞。
  3. 因爲JDK的Selector在Linux等系統上都是經過epoll實現,他沒有鏈接句柄的限制(上限是系統的最大句柄數或者對單個進程的句柄限制數),這意味着一個Selector能夠處理成千上萬個鏈接請求,並且性能方面也不會有明顯的降低,所以,比較適合作高性能,高負載的服務器。

真正意義上的異步IO-AIO

JDK NIO2.0異步文件通道和異步套接字通道的實現,NIO2.0的異步套接字通道是真正意義上的異步非阻塞IO,熟悉UNIX的應該知道事件驅動I/O(AIO),相比較NIO1.0,不須要經過到多路複用器就Selector對註冊的通道Channel進行一個個的輪循就能夠實現異步讀寫,所以實際編程中也比較簡潔。 這裏簡單貼一下AIO實現一個基本的服務端代碼實現。

服務器端代碼:

public static class AioServerHandler implements Runnable {
        int port;
        CountDownLatch latch;
        AsynchronousServerSocketChannel ssc;
        public AioServerHandler(int port) {
            this.port = port;
            try {
                ssc = AsynchronousServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(port));
                System.out.println("AioServer is started at port: " + port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void run() {
            latch = new CountDownLatch(1);
            // 讀取請求消息
            doAccept();
            // 阻塞一下消息,防止線程退出
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public void doAccept() {
            // CompletionHandler
            ssc.accept(this, new AcceptCompletionHandler());
        }
    }
複製代碼
// 接收鏈接
    public static class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AioServerHandler> {
        // 讀取客戶端請求消息,而後將請求寫回去
        @Override
        public void completed(AsynchronousSocketChannel result, AioServerHandler attachment) {
            // AsynchronousServerSocketChannel能夠接成千上萬的客戶端,新的鏈接將繼續調用complete方法
            attachment.ssc.accept(attachment, this); // 繼續AsynchronousServerSocketChannel的accept方法,若是有新的客戶端鏈接,將繼續調用CompletionHandler的Complete方法
            // 讀取消息
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            result.read(buffer, buffer, new ReadCompletionHandler(result));
        }
        @Override
        public void failed(Throwable exc, AioServerHandler attachment) {
            exc.printStackTrace();
            attachment.latch.countDown(); // 釋放服務
        }
    }
複製代碼
// 讀取消息和返回消息給客戶端
    public static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
        AsynchronousSocketChannel channel;
        public ReadCompletionHandler(AsynchronousSocketChannel channel) {
            if (this.channel == null) {
                this.channel = channel;
            }
        }
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.flip();
            byte[] body = new byte[attachment.remaining()];
            attachment.get(body);

            try {
                String req = new String(body, "UTF-8");
                System.out.println("server接收到消息: " + req);
                doWrite(String.valueOf(System.currentTimeMillis()));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        private void doWrite(String current) {
            byte[] bytes = current.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    // 入若是沒有發送完,繼續發送
                    if (attachment.hasRemaining()) {
                        channel.write(writeBuffer, writeBuffer, this);
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

複製代碼

AIO 服務端代碼分析

AsynchronousServerSocketChannel做爲一個異步的服務通道,而後綁定服務端口號。而後當AsynchronousServerSocketChannel.accept成功的接收請求了,再經過AcceptCompletionHandler對象來讀取請求消息。CompletionHandler有兩個方法:

  1. public void completed(AsynchronousSocketChannel result, AioServerHandler attachment)
  2. public void failed(Throwable exc, AioServerHandler attachment)

completed接口實現,讀取attachmentAsynchronousServerSocketChannel,而後繼續調用accept方法,這裏接收客戶端請求已經成功了,那爲何還須要再次調用AsynchronousServerSocketChannel.accept方法呢?

由於對於AsynchronousServerSocketChannel.accept來講,當有新的客戶端請求的時候,系統將回調AcceptCompletionHandler.complete方法,表示新的客戶端請求已經接收成功,因爲AsynchronousServerSocketChannel能夠鏈接成千上萬的客戶端,所以當一個客戶端鏈接成功以後,繼續調用accept方法以等待新的客戶端來異步鏈接AsynchronousServerSocketChannel

當新客戶端和服務端的鏈接創建成功以後,則須要經過AsynchronousSocketChannel.read來異步讀取客戶端的請求消息。

@Override
    public final <A> void read(ByteBuffer dst,
                               A attachment,
                               CompletionHandler<Integer,? super A> handler)
    {
        read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
    }
複製代碼

ByteBuffer dst接收緩衝區,用於從異步Channel中讀取數據包,A attachment異步Channel綁定的附件,用於通知回調的時候做爲入參使用,CompletionHandler<Integer,? super A> handler爲異步回調接口handler。

繼續看ReadCompletionHandler,將AsynchronousSocketChannel傳給ReadCompletionHandler的構造方法,主要做爲讀取半包參數和應答客戶端返回消息來用。關於半包讀寫這裏再也不贅述,後續的RPC入門文章會繼續說明。

這裏主要針對AsynchronousSocketChannel.write方法進行說明:

@Override
    public final <A> void write(ByteBuffer src,
                                A attachment,
                                CompletionHandler<Integer,? super A> handler)

    {
        write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
    }
複製代碼

ByteBuffer srcA attachment與上面的read方法的參數意義同樣,src做爲AsynchronousSocketChannel的接收緩存;attachment做爲Channel的綁定附件,回調的時候做爲入參使用;這裏直接實例化CompletionHandler做爲實現write的異步回調,當能夠寫的時候會調用complete方法進行應答。

其實CompletionHandlerfailed方法在實際的業務中須要注意下,須要對Throwable進行異常判斷,若是是I/O異常,則須要關閉鏈路釋放異常,若是是其餘的異常則能夠根據實際的業務須要進行處理。本例子中爲了簡單,就直接關閉鏈路。

這篇文章主要簡單的介紹下相關的概念,關於客戶端代碼示例這裏再也不敘述。後續的RPC系列文章會繼續講解,歡迎關注、點贊、留言分享。

相關文章
相關標籤/搜索