mycat服務啓動{管理模塊啓動過程}

mycat啓動的時候啓動了三個模塊

1:NIOConnector(負責連接mysql數據庫,鏈接池以數據庫爲準不以連接字符串爲準),mysql

1:NIOAcceptor,ManagerConnectionFactory(管理模塊,默認端口9066)react

2:NIOAcceptor,ServerConnectionFactory(mysql服務模塊,默認端口8066)linux

這裏介紹下管理模塊的啓動流程sql

順序圖

NIO和AIO

mycat分別實現了NIO和AIO,因爲linux當前沒有真正實現AIO這裏主要介紹NIO的流程。數據庫

NIO的Reactor與AIO的Proactor兩種模式的場景區別:
下面是Reactor的作法:
1. 等待事件響應 (Reactor job)
2. 分發 「Ready-to-Read」 事件給用戶句柄 ( Reactor job)
3. 讀數據 (user handler job)
4. 處理數據( user handler job)
下面再來看看真正意義的異步模式Proactor是如何作的:
1. 等待事件響應 (Proactor job)
2. 讀數據 (Proactor job)
3. 分發 「Read-Completed」 事件給用戶句柄 (Proactor job)
4. 處理數據(user handler job)網絡

mycat的NIO實現app

Selector(選擇器)是Java NIO中可以檢測一到多個NIO通道,並可以知曉通道是否爲諸如讀寫事件作好準備的組件。這樣,一個單獨的線程能夠管理多個channel,從而管理多個網絡鏈接。
Selector能夠監聽四種不一樣類型的事件:
- Connect
- Accept
- Read
- Write
這四種事件用SelectionKey的四個常量來表示:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
前面已經說了,NIO採用的Reactor模式:例如汽車是乘客訪問的主體(Reactor),乘客上車後,到售票員(acceptor)處登記,以後乘客即可以休息睡覺去了,當到達乘客所要到達的目的地後,售票員將其喚醒便可。異步

 

核心順序socket

mycat管理端的啓動流程

1:new ManagerConnectionFactory extends FrontendConnectionFactorypost

2:new NIOReactorPool,new NIOReactor,new RW中new ConcurrentLinkedQueue<AbstractConnection>()而AbstractConnection中new NIOSocketWR

3:new NIOAcceptor中向反應堆中註冊了OP_ACCEPT,該類繼承了Thread而後start啓動

accept

			channel = serverChannel.accept();
			channel.configureBlocking(false);
			FrontendConnection c = factory.make(channel);
			c.setAccepted(true);
			c.setId(ID_GENERATOR.getId());
			NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
					.nextProcessor();
			c.setProcessor(processor);

			LOGGER.info("accept");

			NIOReactor reactor = reactorPool.getNextReactor();
			reactor.postRegister(c);

factory.make(channel):最終構造了ManagerQueryHandler(管理命令解析器)和FrontendAuthenticator(mycat權限解析器)

reactor.postRegister(c):把當前連接添加到reactor的registerQueue中並喚醒reactor的selector

read

在NIOReactor的registerQueue爲空的時候run循環空運轉,當上一步把accept的連接放到隊列的時候則

			for (;;) {



				++reactCount;
				try {
					selector.select(500L);
					register(selector);
					keys = selector.selectedKeys();
					for (SelectionKey key : keys) {
						AbstractConnection con = null;
						try {
							Object att = key.attachment();
							if (att != null) {
								con = (AbstractConnection) att;
								if (key.isValid() && key.isReadable()) {
									try {
										con.asynRead();
									} catch (IOException e) {
                                        con.close("program err:" + e.toString());
										continue;
									} catch (Exception e) {
										LOGGER.debug("caught err:", e);
										con.close("program err:" + e.toString());
										continue;
									}
								}
								if (key.isValid() && key.isWritable()) {
									con.doNextWriteCheck();
								}
							} else {
								key.cancel();
							}
                        } catch (CancelledKeyException e) {
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug(con + " socket key canceled");
                            }
                        } catch (Exception e) {
                            LOGGER.warn(con + " " + e);
                        }
					}
				} catch (Exception e) {
					LOGGER.warn(name, e);
				} finally {
					if (keys != null) {
						keys.clear();
					}

				}

register(selector);也即

((NIOSocketWR) c.getSocketWR()).register(selector); 註冊OP_READ事件
c.register();即FrontendConnection的register發送握手數據包

con.asynRead();即NIOSocketWR的asynRead即

	public void asynRead() throws IOException {
		LOGGER.info("asynRead");
		ByteBuffer theBuffer = con.readBuffer;
		if (theBuffer == null) {
			theBuffer = con.processor.getBufferPool().allocate();
			con.readBuffer = theBuffer;
		}
		int got = channel.read(theBuffer);
		con.onReadData(got);

	}

con.onReadData(got);即AbstractConnection的onReadData這裏拆包獲得完成的數據包後調用

handler.handle(data);也即FrontendAuthenticator的handle在這裏check user;check password;check schema若是失敗則將失敗信息寫入緩衝區,若是成功

則把AbstractConnection的默認hander從FrontendAuthenticator換成FrontendCommandHandler等待接下來的處理(好比show命令等,

以上的處理是發生在輸入mysql -utest -ptest -h10.97.177.83 -P9066時)

認證完成後下一次的handler.handle(data)則使用FrontendCommandHandler的handle來處理也即

    public void handle(byte[] data)
    {
        if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
        {
            MySQLMessage mm = new MySQLMessage(data);
            int  packetLength = mm.readUB3();
            if(packetLength+4==data.length)
            {
                source.loadDataInfileData(data);
            }
            return;
        }
        switch (data[4])
        {
            case MySQLPacket.COM_INIT_DB:
                commands.doInitDB();
                source.initDB(data);
                break;
            case MySQLPacket.COM_QUERY:
                commands.doQuery();
                source.query(data);
                break;
            case MySQLPacket.COM_PING:
                commands.doPing();
                source.ping();
                break;
            case MySQLPacket.COM_QUIT:
                commands.doQuit();
                source.close("quit cmd");
                break;
            case MySQLPacket.COM_PROCESS_KILL:
                commands.doKill();
                source.kill(data);
                break;
            case MySQLPacket.COM_STMT_PREPARE:
                commands.doStmtPrepare();
                source.stmtPrepare(data);
                break;
            case MySQLPacket.COM_STMT_EXECUTE:
                commands.doStmtExecute();
                source.stmtExecute(data);
                break;
            case MySQLPacket.COM_STMT_CLOSE:
                commands.doStmtClose();
                source.stmtClose(data);
                break;
            case MySQLPacket.COM_HEARTBEAT:
                commands.doHeartbeat();
                source.heartbeat(data);
                break;
            default:
                     commands.doOther();
                     source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
                             "Unknown command");

        }
    }

source.query(data);即queryHandler.query(sql);這裏的queryHandler是ManagerQueryHandler即

    public void query(String sql) {
        ManagerConnection c = this.source;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
        }
        int rs = ManagerParse.parse(sql);
        switch (rs & 0xff) {
            case ManagerParse.SELECT:
                SelectHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.SET:
                c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));
                break;
            case ManagerParse.SHOW:
                ShowHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.SWITCH:
                SwitchHandler.handler(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.KILL_CONN:
                KillConnection.response(sql, rs >>> SHIFT, c);
                break;
            case ManagerParse.OFFLINE:
                Offline.execute(sql, c);
                break;
            case ManagerParse.ONLINE:
                Online.execute(sql, c);
                break;
            case ManagerParse.STOP:
                StopHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.RELOAD:
                ReloadHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.ROLLBACK:
                RollbackHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.CLEAR:
                ClearHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.CONFIGFILE:
                ConfFileHandler.handle(sql, c);
                break;
            case ManagerParse.LOGFILE:
                ShowServerLog.handle(sql, c);
                break;
            default:
                c.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement");
        }
    }

總結

mycat的網絡處理邏輯上是經過隊列加上後臺線程來實現了accept和read的解耦從而實現了高性能,可是代碼寫的就不敢恭維。

相關文章
相關標籤/搜索