io.mycat.net.NIOAcceptor
NIOAcceptor負責處理客戶端(指鏈接MyCAT以訪問數據庫的程序)的鏈接請求。
NIOAcceptor中持有一個Selector字段selector,和一個ServerSocketChannel字段serverChannel。並向selector中註冊serverChannel
,並註冊感興趣的事件爲 OP_ACCEPT。
NIOAcceptor繼承了Thread接口,在run()方法內,生成了一個selector的拷貝tSeletor,並由tSeletor調用select()方法輪詢事件是否就緒。
當檢測到前段一個鏈接請求過來時,會調用serverChannel的accept()方法建立一個新的通道,並從工廠獲取一個新的前端鏈接實例(MyCAT和客戶端的鏈接稱爲前端鏈接,MyCAT和MySQL的鏈接稱爲後端鏈接),將通道和鏈接實例綁定。該鏈接實例由io.mycat.net.NIOProcessor負責管理,NIOProcessor在MyCAT中也是以NIOProcessor池的形式存在的,acceptor從池中拿出一個Processor實例,並將其和鏈接實例綁定。隨後,NIOAcceptor將該鏈接轉交給io.mycat.net.NIOReactor,
NIOAcceptor的工做就結束了。
io.mycat.net.NIOReactor
NIOReactor中聲明瞭一個final的內部類 RW,RW繼承了Runnable。RW中持有一個Selector字段selector,和一個用來保存AbstractConnection的ConcurrentLinkedQueue隊列registerQueue。在RW的run()方法中,調用selector的select()方法(該selector也是由RW的字段selector的拷貝而來,之後如非特別說明,皆是如此),輪詢 I/O 事件。
NIOReactor在接到NIOAcceptor發來的前端鏈接實例後,會將其添加到registerQueue的隊尾,並調用selector的wakeup()方法使selector馬上返回。阻塞在select()方法以後的是RW的register方法,調用register(這個方法設計的別出心裁,以後會貼出代碼),RW會從registerQueue中取出隊首的鏈接實例c,從c中獲取NIOSocketWR實例,調用NIOSocketWR的register方法,該方法從鏈接實例中獲取通道,向selector中註冊該通道,註冊感興趣的事件爲OP_READ。隨後,調用鏈接實例c的register()方法,生成認證數據,發送握手數據包,創建TCP鏈接。鏈接創建後,調用NIOSocketWR的asynRead()異步讀方法(爲何這麼作,在貼出代碼中會講述)。
前面講到RW的run()方法會循環調用selector的select()方法,除了當有新鏈接加入時,會直接返回以外,selector只有當檢測到有註冊的 OP_READ 事件就緒時纔會返回。當有通道的讀事件就緒時,RW會判斷該事件是一個「讀」事件仍是一個「寫」事件,並調用鏈接實例的相應方法處理該讀/寫事件。爲何還要判斷?由於從名字就能夠得知,這是一個讀寫複用的處理類,雖然當前咱們處理的是讀事件。
asynRead()異步讀方法:獲取緩衝區,調用通道的read方法,將數據讀到緩衝區。隨後,調用鏈接實例的處理方法處理緩衝區的字節流信息。
代碼(註釋是我加的)
/**
* 當一個鏈接由 Acceptor 轉發過來時,會使得 selector 立刻返回,調用該方法,
* 將該鏈接及其通道註冊到 selector 中, 註冊 OP_READ 事件。
* 隨後,創建 TCP 鏈接,創建 TCP 鏈接的時候也會調用異步讀取數據。
* 若此時剛好有數據到達,則可直接讀取。
* 若此時沒有數據到達,則繼續執行 select() 輪詢 I/O 事件。
*
* 當 selector 輪詢到有 I/O 事件就緒,而返回時,
* 如此時 registerQueue 是必定爲空的。
*/
private void register(Selector selector) {
AbstractConnection c = null;
if (registerQueue.isEmpty()) {
return;
}
while ((c = registerQueue.poll()) != null) {
try {
((NIOSocketWR) c.getSocketWR()).register(selector);
c.register();
} catch (Exception e) {
c.close("register err" + e.toString());
}
}
}
io.mycat.net.AbstractConnection
前面反覆提到一個詞:鏈接實例,究竟什麼是鏈接實例?客戶端發往MyCAT的每一次請求,以及MyCAT發往MySQL的每一次請求,都是一個鏈接實例。能夠把鏈接實例看做是一次請求事件的主幹,咱們都知道,NIO是一個同步非阻塞的 I/O 模型。阻塞的線程沒有作任何有意義的事情,卻依然消耗系統資源,這是咱們不能接受的,所謂非阻塞,就是不斷的在這條主幹上衍生分支,來處理複雜的業務請求,這樣主幹就不會阻塞。而同步,是指
線程不斷輪詢 IO 事件是否就緒,主幹上衍生的這些分支,都維護了一個Selector對象,Selector代替了主幹線程來執行這種輪詢,包括前面講到的acceptor和reactor;這些分支線程是以線程池的形式存在的,是能夠複用的,從而減小了頻繁建立、啓動、掛起、析構新線程的開銷,大大提高系統的併發效率。
io.mycat.net.NIOHandler
前面講到了,當數據讀到緩衝區後,調用鏈接實例的處理方法處理緩衝區的字節流。那麼,這裏是如何處理的呢?事實上,鏈接實例會先從將數據流從緩衝區讀出來,回想一下,這是一個MySQL的中間件,全部的數據都是SQL語句。因此,接下來就是對字節流形式的SQL解包。不要忘了計算機網絡的知識,端與端之間的通訊是要按照某種協議的,這就是包頭。因此,接下來的工做就是對包頭進行解壓,分析。通過這一步,MyCAT已經知道了客戶端發來的SQL語句是什麼類型的語句(登錄、增刪改查等)。而後,就會調用 NIOHandler 來處理這一條 SQL 語句。
NIOHandler和客戶端相關的實體類有:FrontendAuthenticator、FrontendCommandHandler。從名字就能夠看出,一個是負責權限驗證的,一個是負責處理命令行的。
在FrontendCommandHandler,會根據解析過的包頭,根據不一樣的SQL語句類型,調用鏈接實例的相應方法。
接下來的事情
接下來就是,在鏈接實例的細分方法中,將字節流形式的信息轉成字符串;而後就是SQL語法分析,生成語法樹;展開語法樹,計算路由節點;再接下來就是將SQL發給MySQL服務器;而後合併結果集,返回客戶端。
我不許備講語法解析和路由的部分,這不是咱們的重點。因此,如今咱們假設已經作完了這兩步,接下來要作的就是將SQL發給MySQL真正執行。
NonBlockingSession、SingleNodeHandler/MultiNodeHandler
這個時候,前端鏈接實例會調用本身NonBlockingSession類型的session字段的execute方法,execute也只作了一件事情,就是根據返回的RouteResultset是單節點的仍是多節點的,決定是調用SingleNodeHandler仍是MultiNodeHandler。session中維護了與每一個節點的後端鏈接,在nodeHandler中,會從session中取得須要的後端鏈接,而後只作了一件事,就是將本身設置爲後端鏈接實例的回調。隨後,真正的執行就交給了後端鏈接。
之因此要通過設置回調這一步,是由於nodeHandler會負責解析由MySQL發回的消息。
MySQLConnection
MySQLConnection的execute方法中,將通過解析的SQL語句從新封裝成消息包,並將該消息包加入到寫隊列中。前面出現過一個NIOSocketWR的類,由於前端的鏈接是NIO的,而MyCAT與MySQL的鏈接是由AIO實現的,所以,MySQLConnection會將把寫隊列的緩衝區寫到Channel的任務交給了AIOSocketWR,AIOSocketWR負責維護一個AsynchronousSocketChannel類型的channel對象,經過調用AsynchronousSocketChannel的:
write(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler)
其中,傳入的attachment是AIOSocketWR自己,並實現了一個CompletionHandler類AIOWriteHandler,須要重寫兩個方法,分別是completed和failed,當寫入完成後會回調相應的方法。
到這個時候,將客戶端的命令發給MySQL的工做就作完了,接下來的就是等待MySQL返回結果了。
如何知道MySQL是否返回結果了?也是在NIOReactor(mycat對nio和aio關係處理的有點亂),nioreactor輪詢到有消息過來的時候,就會教給鏈接實例去執行異步讀方法,這個方法中又調用了socketWR的異步讀。注意這個時候鏈接實例已是後端鏈接了,因此它會調用AIOSocketWR。異步讀和異步寫相同,都是使用了Java NIO包封裝的類AsynchronousSocketChannel,調用:
read(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler)
在completed中,會調用connection的處理方法,而connection,則會交給以前註冊過的回調handler,就是SingleNodeHandler或MultiNodeHandler。
由於MySQL返回的包是一行一行的,所以會屢次調用異步讀方法。
SingleNodeHandler和MultiNodeHandler都是繼承了ResponseHandler,經過觀看源碼,能夠更加輕鬆地理解這種回調是如何進行的。
public interface ResponseHandler {
/**
* 沒法獲取鏈接
*
* @param e
* @param conn
*/
public void connectionError(Throwable e, BackendConnection conn);
/**
* 已得到有效鏈接的響應處理
*/
void connectionAcquired(BackendConnection conn);
/**
* 收到錯誤數據包的響應處理
*/
void errorResponse(byte[] err, BackendConnection conn);
/**
* 收到OK數據包的響應處理
*/
void okResponse(byte[] ok, BackendConnection conn);
/**
* 收到字段數據包結束的響應處理
*/
void fieldEofResponse(byte[] header, List<byte[]> fields, byte[] eof,
BackendConnection conn);
/**
* 收到行數據包的響應處理
*/
void rowResponse(byte[] row, BackendConnection conn);
/**
* 收到行數據包結束的響應處理
*/
void rowEofResponse(byte[] eof, BackendConnection conn);
/**
* 寫隊列爲空,能夠寫數據了
*
*/
void writeQueueAvailable();
/**
* on connetion close event
*/
void connectionClose(BackendConnection conn, String reason);
}