前段時間在看dubbo的源碼,看的差很少了也開始在寫一個RPC框架,如今寫的快一半了,纔想起來怎麼按部就班的經過文章的方式跟你們聊這個東西。因而思來想去,決定先從最基礎的服務間網絡通訊提及比較好,後面再慢慢的跟你們引出怎麼去寫一個RPC框架。編程
本篇主要跟你們聊下網咯I/O,主要是針對初學者的由淺入深系列。緩存
傳統的BIO通訊當接收到客戶端的請求時,爲每個請求建立一個新的線程進行鏈路處理,處理完成以後,經過輸出流返回給客戶端,而後線程銷燬。bash
在傳統BIO模型的基礎上,用線程池來處理客戶端的請求,防止高併發致使的server端資源被耗盡問題。服務器
不管是BIO仍是僞異步本質上都是阻塞型I/O,都是基於Stream
進行網絡數據的讀和寫。首先咱們看下InputStream
的read
方法源碼:網絡
/**
* Reads the next byte of data from the input stream. The value byte is
* returned as an <code>int</code> in the range <code>0</code> to
* <code>255</code>. If no byte is available because the end of the stream
* has been reached, the value <code>-1</code> is returned. This method
* blocks until input data is available, the end of the stream is detected,
* or an exception is thrown.
*
* <p> A subclass must provide an implementation of this method.
*
* @return the next byte of data, or <code>-1</code> if the end of the
* stream is reached.
* @exception IOException if an I/O error occurs.
*/
public abstract int read() throws IOException;
複製代碼
經過註釋能夠知道,對Socket
的輸入流進行讀取的時候,會一直髮生阻塞,直到如下3種狀況:併發
意味着當對方發送請求或者應答消息比較緩慢的時候,或者網絡傳輸比較慢的時候,讀取輸入流一方的通訊線程將被長時間阻塞。在此期間,後面的請求都得排隊。框架
在繼續看下Outputtream
的write
方法:異步
/**
* Writes the specified byte to this output stream. The general
* contract for <code>write</code> is that one byte is written
* to the output stream. The byte to be written is the eight
* low-order bits of the argument <code>b</code>. The 24
* high-order bits of <code>b</code> are ignored.
* <p>
* Subclasses of <code>OutputStream</code> must provide an
* implementation for this method.
*
* @param b the <code>byte</code>.
* @exception IOException if an I/O error occurs. In particular,
* an <code>IOException</code> may be thrown if the
* output stream has been closed.
*/
public abstract void write(int b) throws IOException;
複製代碼
當調用write
寫輸出流的時候,會發生阻塞,直到全部要發送的字節所有寫入完畢,或者發生異常。切換爲從TCP/IP
角度來理解,當消息的接收方處理比較緩慢,不能及時的從TCP
緩衝區讀取數據,這會致使發送方的TCP``window size
不斷縮小,直到爲0,雙方處於Keep-Alive
狀態,消息發送方就不能在繼續像TCP
緩衝區寫入消息,若是採用的是同步阻塞I/O,write
將會被無限期阻塞,直到window size
大於0或者發生I/O異常。ide
所以使用阻塞I/O的
Socket
和ServerSocket
在生產使用問題不少,所以NIO
誕生了,對應的是SocketChannel
和ServerSocketChannel
兩個類。高併發
傳統的BIO主要是面向流的,能夠將數據直接寫入或者讀取到Stream
對象中;而在NIO
中,讀取和寫入數據都是在緩衝區中處理的,任什麼時候候訪問NIO
中的數據,都是經過緩衝區進行的。最經常使用的緩衝區是ByteBuffer
,經常使用的緩衝區還有下面幾種:
關於Buffer
的源碼部分,因爲篇幅關係再也不囉嗦。
Channel
就是一個通道,網絡數據經過Channel
進行數據的讀取。Stream
只是在一個方向上流動,讀和寫分別在InputStream
和OutputStream
上進行,而Channel
能夠讀和寫同時進行。 實際上Channel
能夠分爲兩大類,用於網絡數據讀寫的SelectableChannel
和文件操做的FileChannel
。NIO
中的ServerSocketChannel
和SocketChannel
都是SelectableChannel
的子類。
多路複用器Selector
是NIO
的基礎,多路複用器能夠不斷地輪循註冊在其上的Channel
,若是某個Channel
發生了讀或者寫事件,那麼這個Channel
就屬於就緒狀態,就會被Selector
輪循出來,而後經過SelectionKey
能夠讀取Channel
的集合,進行後續的I/O操做。
JDK中的Selector
使用了epoll()
替代了傳統的select
,因此一個Selector
能夠同時註冊大量的Channel
,沒有傳統的鏈接句柄的限制。
NIO服務端通訊的過程大體以下:
接下來看NIO客戶端鏈路圖:
這裏就不貼server端和client端的代碼了,由於這兩部分的代碼都比較冗長。
Selector
註冊OP_CONNECT
等待後續結果,不須要像以前客戶端那樣被同步阻塞;SocketChannel
的讀寫操做都是異步的,若是沒有可讀寫的數據,直接同步返回,這樣通訊IO線程就能夠處理其餘的請求,不會被阻塞。Selector
在Linux等系統上都是經過epoll
實現,他沒有鏈接句柄的限制(上限是系統的最大句柄數或者對單個進程的句柄限制數),這意味着一個Selector
能夠處理成千上萬個鏈接請求,並且性能方面也不會有明顯的降低,所以,比較適合作高性能,高負載的服務器。JDK NIO2.0異步文件通道和異步套接字通道的實現,NIO2.0的異步套接字通道是真正意義上的異步非阻塞IO,熟悉UNIX的應該知道事件驅動I/O(AIO),相比較NIO1.0,不須要經過到多路複用器就Selector
對註冊的通道Channel
進行一個個的輪循就能夠實現異步讀寫,所以實際編程中也比較簡潔。 這裏簡單貼一下AIO
實現一個基本的服務端代碼實現。
服務器端代碼:
public static class AioServerHandler implements Runnable {
int port;
CountDownLatch latch;
AsynchronousServerSocketChannel ssc;
public AioServerHandler(int port) {
this.port = port;
try {
ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(port));
System.out.println("AioServer is started at port: " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
// 讀取請求消息
doAccept();
// 阻塞一下消息,防止線程退出
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doAccept() {
// CompletionHandler
ssc.accept(this, new AcceptCompletionHandler());
}
}
複製代碼
// 接收鏈接
public static class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AioServerHandler> {
// 讀取客戶端請求消息,而後將請求寫回去
@Override
public void completed(AsynchronousSocketChannel result, AioServerHandler attachment) {
// AsynchronousServerSocketChannel能夠接成千上萬的客戶端,新的鏈接將繼續調用complete方法
attachment.ssc.accept(attachment, this); // 繼續AsynchronousServerSocketChannel的accept方法,若是有新的客戶端鏈接,將繼續調用CompletionHandler的Complete方法
// 讀取消息
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AioServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown(); // 釋放服務
}
}
複製代碼
// 讀取消息和返回消息給客戶端
public static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null) {
this.channel = channel;
}
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println("server接收到消息: " + req);
doWrite(String.valueOf(System.currentTimeMillis()));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
private void doWrite(String current) {
byte[] bytes = current.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 入若是沒有發送完,繼續發送
if (attachment.hasRemaining()) {
channel.write(writeBuffer, writeBuffer, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
複製代碼
AsynchronousServerSocketChannel
做爲一個異步的服務通道,而後綁定服務端口號。而後當AsynchronousServerSocketChannel.accept
成功的接收請求了,再經過AcceptCompletionHandler
對象來讀取請求消息。CompletionHandler
有兩個方法:
public void completed(AsynchronousSocketChannel result, AioServerHandler attachment)
public void failed(Throwable exc, AioServerHandler attachment)
completed
接口實現,讀取attachment
的AsynchronousServerSocketChannel
,而後繼續調用accept
方法,這裏接收客戶端請求已經成功了,那爲何還須要再次調用AsynchronousServerSocketChannel.accept
方法呢?
由於對於AsynchronousServerSocketChannel.accept
來講,當有新的客戶端請求的時候,系統將回調AcceptCompletionHandler.complete
方法,表示新的客戶端請求已經接收成功,因爲AsynchronousServerSocketChannel
能夠鏈接成千上萬的客戶端,所以當一個客戶端鏈接成功以後,繼續調用accept
方法以等待新的客戶端來異步鏈接AsynchronousServerSocketChannel
。
當新客戶端和服務端的鏈接創建成功以後,則須要經過AsynchronousSocketChannel.read
來異步讀取客戶端的請求消息。
@Override
public final <A> void read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}
複製代碼
ByteBuffer dst
接收緩衝區,用於從異步Channel
中讀取數據包,A attachment
異步Channel
綁定的附件,用於通知回調的時候做爲入參使用,CompletionHandler<Integer,? super A> handler
爲異步回調接口handler。
繼續看ReadCompletionHandler
,將AsynchronousSocketChannel
傳給ReadCompletionHandler
的構造方法,主要做爲讀取半包參數和應答客戶端返回消息來用。關於半包讀寫
這裏再也不贅述,後續的RPC
入門文章會繼續說明。
這裏主要針對AsynchronousSocketChannel.write
方法進行說明:
@Override
public final <A> void write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}
複製代碼
ByteBuffer src
和A attachment
與上面的read
方法的參數意義同樣,src
做爲AsynchronousSocketChannel
的接收緩存;attachment
做爲Channel
的綁定附件,回調的時候做爲入參使用;這裏直接實例化CompletionHandler
做爲實現write
的異步回調,當能夠寫的時候會調用complete
方法進行應答。
其實CompletionHandler
的failed
方法在實際的業務中須要注意下,須要對Throwable
進行異常判斷,若是是I/O
異常,則須要關閉鏈路釋放異常,若是是其餘的異常則能夠根據實際的業務須要進行處理。本例子中爲了簡單,就直接關閉鏈路。
這篇文章主要簡單的介紹下相關的概念,關於客戶端代碼示例這裏再也不敘述。後續的RPC
系列文章會繼續講解,歡迎關注、點贊、留言分享。