1:NIOConnector(負責連接mysql數據庫,鏈接池以數據庫爲準不以連接字符串爲準),mysql
1:NIOAcceptor,ManagerConnectionFactory(管理模塊,默認端口9066)react
2:NIOAcceptor,ServerConnectionFactory(mysql服務模塊,默認端口8066)linux
這裏介紹下管理模塊的啓動流程sql
mycat分別實現了NIO和AIO,因爲linux當前沒有真正實現AIO這裏主要介紹NIO的流程。數據庫
NIO的Reactor與AIO的Proactor兩種模式的場景區別:
下面是Reactor的作法:
1. 等待事件響應 (Reactor job)
2. 分發 「Ready-to-Read」 事件給用戶句柄 ( Reactor job)
3. 讀數據 (user handler job)
4. 處理數據( user handler job)
下面再來看看真正意義的異步模式Proactor是如何作的:
1. 等待事件響應 (Proactor job)
2. 讀數據 (Proactor job)
3. 分發 「Read-Completed」 事件給用戶句柄 (Proactor job)
4. 處理數據(user handler job)網絡
mycat的NIO實現app
Selector(選擇器)是Java NIO中可以檢測一到多個NIO通道,並可以知曉通道是否爲諸如讀寫事件作好準備的組件。這樣,一個單獨的線程能夠管理多個channel,從而管理多個網絡鏈接。
Selector能夠監聽四種不一樣類型的事件:
- Connect
- Accept
- Read
- Write
這四種事件用SelectionKey的四個常量來表示:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
前面已經說了,NIO採用的Reactor模式:例如汽車是乘客訪問的主體(Reactor),乘客上車後,到售票員(acceptor)處登記,以後乘客即可以休息睡覺去了,當到達乘客所要到達的目的地後,售票員將其喚醒便可。異步
核心順序socket
1:new ManagerConnectionFactory extends FrontendConnectionFactorypost
2:new NIOReactorPool,new NIOReactor,new RW中new ConcurrentLinkedQueue<AbstractConnection>()而AbstractConnection中new NIOSocketWR
3:new NIOAcceptor中向反應堆中註冊了OP_ACCEPT,該類繼承了Thread而後start啓動
channel = serverChannel.accept(); channel.configureBlocking(false); FrontendConnection c = factory.make(channel); c.setAccepted(true); c.setId(ID_GENERATOR.getId()); NIOProcessor processor = (NIOProcessor) MycatServer.getInstance() .nextProcessor(); c.setProcessor(processor); LOGGER.info("accept"); NIOReactor reactor = reactorPool.getNextReactor(); reactor.postRegister(c);
factory.make(channel):最終構造了ManagerQueryHandler(管理命令解析器)和FrontendAuthenticator(mycat權限解析器)
reactor.postRegister(c):把當前連接添加到reactor的registerQueue中並喚醒reactor的selector
在NIOReactor的registerQueue爲空的時候run循環空運轉,當上一步把accept的連接放到隊列的時候則
for (;;) { ++reactCount; try { selector.select(500L); register(selector); keys = selector.selectedKeys(); for (SelectionKey key : keys) { AbstractConnection con = null; try { Object att = key.attachment(); if (att != null) { con = (AbstractConnection) att; if (key.isValid() && key.isReadable()) { try { con.asynRead(); } catch (IOException e) { con.close("program err:" + e.toString()); continue; } catch (Exception e) { LOGGER.debug("caught err:", e); con.close("program err:" + e.toString()); continue; } } if (key.isValid() && key.isWritable()) { con.doNextWriteCheck(); } } else { key.cancel(); } } catch (CancelledKeyException e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(con + " socket key canceled"); } } catch (Exception e) { LOGGER.warn(con + " " + e); } } } catch (Exception e) { LOGGER.warn(name, e); } finally { if (keys != null) { keys.clear(); } }
register(selector);也即
((NIOSocketWR) c.getSocketWR()).register(selector); 註冊OP_READ事件
c.register();即FrontendConnection的register發送握手數據包
con.asynRead();即NIOSocketWR的asynRead即
public void asynRead() throws IOException { LOGGER.info("asynRead"); ByteBuffer theBuffer = con.readBuffer; if (theBuffer == null) { theBuffer = con.processor.getBufferPool().allocate(); con.readBuffer = theBuffer; } int got = channel.read(theBuffer); con.onReadData(got); }
con.onReadData(got);即AbstractConnection的onReadData這裏拆包獲得完成的數據包後調用
handler.handle(data);也即FrontendAuthenticator的handle在這裏check user;check password;check schema若是失敗則將失敗信息寫入緩衝區,若是成功
則把AbstractConnection的默認hander從FrontendAuthenticator換成FrontendCommandHandler等待接下來的處理(好比show命令等,
以上的處理是發生在輸入mysql -utest -ptest -h10.97.177.83 -P9066時)
認證完成後下一次的handler.handle(data)則使用FrontendCommandHandler的handle來處理也即
public void handle(byte[] data) { if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData()) { MySQLMessage mm = new MySQLMessage(data); int packetLength = mm.readUB3(); if(packetLength+4==data.length) { source.loadDataInfileData(data); } return; } switch (data[4]) { case MySQLPacket.COM_INIT_DB: commands.doInitDB(); source.initDB(data); break; case MySQLPacket.COM_QUERY: commands.doQuery(); source.query(data); break; case MySQLPacket.COM_PING: commands.doPing(); source.ping(); break; case MySQLPacket.COM_QUIT: commands.doQuit(); source.close("quit cmd"); break; case MySQLPacket.COM_PROCESS_KILL: commands.doKill(); source.kill(data); break; case MySQLPacket.COM_STMT_PREPARE: commands.doStmtPrepare(); source.stmtPrepare(data); break; case MySQLPacket.COM_STMT_EXECUTE: commands.doStmtExecute(); source.stmtExecute(data); break; case MySQLPacket.COM_STMT_CLOSE: commands.doStmtClose(); source.stmtClose(data); break; case MySQLPacket.COM_HEARTBEAT: commands.doHeartbeat(); source.heartbeat(data); break; default: commands.doOther(); source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command"); } }
source.query(data);即queryHandler.query(sql);這裏的queryHandler是ManagerQueryHandler即
public void query(String sql) { ManagerConnection c = this.source; if (LOGGER.isDebugEnabled()) { LOGGER.debug(new StringBuilder().append(c).append(sql).toString()); } int rs = ManagerParse.parse(sql); switch (rs & 0xff) { case ManagerParse.SELECT: SelectHandler.handle(sql, c, rs >>> SHIFT); break; case ManagerParse.SET: c.write(c.writeToBuffer(OkPacket.OK, c.allocate())); break; case ManagerParse.SHOW: ShowHandler.handle(sql, c, rs >>> SHIFT); break; case ManagerParse.SWITCH: SwitchHandler.handler(sql, c, rs >>> SHIFT); break; case ManagerParse.KILL_CONN: KillConnection.response(sql, rs >>> SHIFT, c); break; case ManagerParse.OFFLINE: Offline.execute(sql, c); break; case ManagerParse.ONLINE: Online.execute(sql, c); break; case ManagerParse.STOP: StopHandler.handle(sql, c, rs >>> SHIFT); break; case ManagerParse.RELOAD: ReloadHandler.handle(sql, c, rs >>> SHIFT); break; case ManagerParse.ROLLBACK: RollbackHandler.handle(sql, c, rs >>> SHIFT); break; case ManagerParse.CLEAR: ClearHandler.handle(sql, c, rs >>> SHIFT); break; case ManagerParse.CONFIGFILE: ConfFileHandler.handle(sql, c); break; case ManagerParse.LOGFILE: ShowServerLog.handle(sql, c); break; default: c.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement"); } }
mycat的網絡處理邏輯上是經過隊列加上後臺線程來實現了accept和read的解耦從而實現了高性能,可是代碼寫的就不敢恭維。