MyCat - 源代碼篇(7)

數據庫路由中間件MyCat - 源代碼篇(7)

3. 鏈接模塊

這裏寫圖片描述

3.4 FrontendConnection前端鏈接

這裏寫圖片描述
構造方法:前端

public FrontendConnection(NetworkChannel channel) throws IOException {
        super(channel);
        InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalAddress();
        InetSocketAddress remoteAddr = null;
        if (channel instanceof SocketChannel) {
            remoteAddr = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress();  

        } else if (channel instanceof AsynchronousSocketChannel) {
            remoteAddr = (InetSocketAddress) ((AsynchronousSocketChannel) channel).getRemoteAddress();
        }

        this.host = remoteAddr.getHostString();
        this.port = localAddr.getPort();
        this.localPort = remoteAddr.getPort();
        this.handler = new FrontendAuthenticator(this);
    }

FrontendConnection是對前端鏈接channel的封裝,接受NetworkChannel做爲參數構造。前端鏈接創建,須要先驗證其權限,因此,handler首先設置爲FrontendAuthenticator
等到驗證成功,handler會被設置成FrontendCommandHandler。
下面來看和FrontendConnection相關的Handler:
這裏寫圖片描述
FrontendCommandHandler會先解析請求類型,以後調用不一樣的方法處理不一樣類型的請求。例如,FrontendQueryHandler會解析query類型的sql請求語句:java

@Override
    public void handle(byte[] data)
    {
        if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
        {
            MySQLMessage mm = new MySQLMessage(data);
            int  packetLength = mm.readUB3();
            if(packetLength+4==data.length)
            {
                source.loadDataInfileData(data);
            }
            return;
        }
        switch (data[4])
        {
            case MySQLPacket.COM_INIT_DB:
                commands.doInitDB();
                source.initDB(data);
                break;
            case MySQLPacket.COM_QUERY:
                commands.doQuery();
                source.query(data);
                break;
            case MySQLPacket.COM_PING:
                commands.doPing();
                source.ping();
                break;
            case MySQLPacket.COM_QUIT:
                commands.doQuit();
                source.close("quit cmd");
                break;
            case MySQLPacket.COM_PROCESS_KILL:
                commands.doKill();
                source.kill(data);
                break;
            case MySQLPacket.COM_STMT_PREPARE:
                commands.doStmtPrepare();
                source.stmtPrepare(data);
                break;
            case MySQLPacket.COM_STMT_EXECUTE:
                commands.doStmtExecute();
                source.stmtExecute(data);
                break;
            case MySQLPacket.COM_STMT_CLOSE:
                commands.doStmtClose();
                source.stmtClose(data);
                break;
            case MySQLPacket.COM_HEARTBEAT:
                commands.doHeartbeat();
                source.heartbeat(data);
                break;
            default:
                     commands.doOther();
                     source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
                             "Unknown command");

        }
    }

FrontendCommandHandler會調用FrontendConnection合適的方法解析處理不一樣的請求,例如它的initDB(byte[] data)方法:node

public void initDB(byte[] data) {

        MySQLMessage mm = new MySQLMessage(data);
        mm.position(5);
        String db = mm.readString();

        // 檢查schema的有效性
        if (db == null || !privileges.schemaExists(db)) {
            writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + db + "'");
            return;
        }

        if (!privileges.userExists(user, host)) {
            writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + user + "'");
            return;
        }

        Set<String> schemas = privileges.getUserSchemas(user);
        if (schemas == null || schemas.size() == 0 || schemas.contains(db)) {
            this.schema = db;
            write(writeToBuffer(OkPacket.OK, allocate()));
        } else {
            String s = "Access denied for user '" + user + "' to database '" + db + "'";
            writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, s);
        }
    }

方法調用:
這裏寫圖片描述
經過查看能夠發現,在command packet被解析出是initDB類型的請求時(其實就是用戶發送的查詢語句爲「use XXX」),會調用此方法進行處理,同時,這些方法都是被RW線程執行的。
此方法從FrontedPrivilege中驗證用戶是否有權限訪問這個邏輯庫,若是有就把當前鏈接的邏輯庫設爲用戶請求的邏輯庫。
其餘方法與handler也是類似的關係,能夠看出,FrontendConnection組合了多種封裝的handler來處理不一樣的請求的不一樣階段。至於各類handler,會在以後sql解析,sql路由,協議實現等模塊詳細介紹。sql

3.4.1 ServerConnection服務端鏈接

前端鏈接包括ServerConnection(服務端鏈接)和ManagerConnection(管理端鏈接)。前端連接不會直接建立,而是經過工廠建立:
工廠方法:數據庫

@Override
    protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
        SystemConfig sys = MycatServer.getInstance().getConfig().getSystem();
        ServerConnection c = new ServerConnection(channel);
        MycatServer.getInstance().getConfig().setSocketParams(c, true);
        c.setPrivileges(MycatPrivileges.instance());
        c.setQueryHandler(new ServerQueryHandler(c));
        c.setLoadDataInfileHandler(new ServerLoadDataInfileHandler(c));
        // c.setPrepareHandler(new ServerPrepareHandler(c));
        c.setTxIsolation(sys.getTxIsolation());
        c.setSession2(new NonBlockingSession(c));
        return c;
    }

能夠看出,每一個新的ServerConnection都會綁定一個新的ServerQueryHandler負責處理sql指令,一個ServerLoadDataInfileHandler負責處理文件載入命令,一個session負責處理事務
下面是相關的類圖
這裏寫圖片描述
這裏的全部獨立的handler裏面都是static方法,可供其餘類直接調用。每一個ServerConnection都會有一個NonBlockingSession來處理。
這裏說下鏈接、會話、邏輯庫、MyCat實例的關係(與MySQL裏面的鏈接、會話、數據庫、MySQL實例的關係不太同樣);首先每一個MyCat實例都管理多個數據庫。鏈接是針對MyCat實例創建的,而且,MyCat的鏈接(AbstractConnection)是不可複用的,在close方法會關閉鏈接並清理使用的資源。可是緩存資源(buffer)是能夠複用的。好比,在一個前端鏈接長時間空閒時或者出現異常時,會被清理掉。每一個鏈接會擁有一個session來處理事務,保存會話信息。
這裏,每一個鏈接擁有一個會話。每一個鏈接中的方法,被RW線程執行,至關於與RW線程綁定。RW線程是能夠複用的,這裏至關於MySQL中的鏈接是能夠複用的(鏈接池)。
Session.java:緩存

public interface Session {

    /** * 取得源端鏈接 */
    FrontendConnection getSource();

    /** * 取得當前目標端數量 */
    int getTargetCount();

    /** * 開啓一個會話執行 */
    void execute(RouteResultset rrs, int type);

    /** * 提交一個會話執行 */
    void commit();

    /** * 回滾一個會話執行 */
    void rollback();

    /** * 取消一個正在執行中的會話 * * @param sponsor * 若是發起者爲null,則表示由本身發起。 */
    void cancel(FrontendConnection sponsor);

    /** * 終止會話,必須在關閉源端鏈接後執行該方法。 */
    void terminate();

}

下面咱們着重研究它的實現類NonBlockingSession:
首先,取得源端鏈接方法FrontendConnection getSource();,其實就是NonBlockingSession在建立時就已綁定一個鏈接,誰會調用這個方法取得源端連接呢?
這裏寫圖片描述
能夠發現,主要有各類查詢的handler還有SQLengine會去調用。由於處理不管返回什麼結果,都須要返回給源端。
int getTargetCount();取得當前目標端數量。根據目標端的數量不一樣會用不一樣的handler處理轉發SQL和合並結果。markdown

@Override
    public void execute(RouteResultset rrs, int type) {
        // 清理以前處理用的資源
        clearHandlesResources();
        if (LOGGER.isDebugEnabled()) {
            StringBuilder s = new StringBuilder();
            LOGGER.debug(s.append(source).append(rrs).toString() + " rrs ");
        }

        // 檢查路由結果是否爲空
        RouteResultsetNode[] nodes = rrs.getNodes();
        if (nodes == null || nodes.length == 0 || nodes[0].getName() == null
                || nodes[0].getName().equals("")) {
            //若是爲空,則表名有誤,提示客戶端
            source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
                    "No dataNode found ,please check tables defined in schema:"
                            + source.getSchema());
            return;
        }
        //若是路由結果個數爲1,則爲單點查詢或事務
        if (nodes.length == 1) {
            //使用SingleNodeHandler處理單點查詢或事務
            singleNodeHandler = new SingleNodeHandler(rrs, this);
            try {
                singleNodeHandler.execute();
            } catch (Exception e) {
                LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
            }
        } else {
            //若是路由結果>1,則爲多點查詢或事務
            boolean autocommit = source.isAutocommit();
            SystemConfig sysConfig = MycatServer.getInstance().getConfig()
                    .getSystem();
            //mutiNodeLimitType沒有用。。。
            int mutiNodeLimitType = sysConfig.getMutiNodeLimitType();
            //使用multiNodeHandler處理多點查詢或事務
            multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit,
                    this);

            try {
                multiNodeHandler.execute();
            } catch (Exception e) {
                LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
            }
        }
    }

每次一個Session執行SQL時,會先清理handler使用的資源。SingleNodeHandler與multiNodeHandler以後會講。這裏的handler咱們以後會在每一個模塊去講,Session以後也還會提到,敬請期待session

相關文章
相關標籤/搜索