【圖文並茂】源碼解析MyBatis Sharding-Jdbc SQL語句執行流程詳解

源碼分析Mybatis系列目錄:
一、源碼分析Mybatis MapperProxy初始化【圖文並茂】
二、源碼分析Mybatis MappedStatement的建立流程
三、【圖文並茂】Mybatis執行SQL的4大基礎組件詳解java

若是圖不清晰的話,能夠查看CSDN博客連接:http://www.javashuo.com/article/p-rehfkmue-mh.htmlspring

本文將詳細介紹Mybatis SQL語句執行的全流程,本文與上篇具備必定的關聯性,建議先閱讀該系列中的前面3篇文章,重點掌握Mybatis Mapper類的初始化過程,由於在Mybatis中,Mapper是執行SQL語句的入口,相似下面這段代碼:sql

1@Service
2public UserService implements IUserService {
3     @Autowired
4    private UserMapper userMapper;
5    public User findUser(Integer id) {
6        return userMapper.find(id);
7    }
8}

開始進入本文的主題,以源碼爲手段,分析Mybatis執行SQL語句的流行,而且使用了數據庫分庫分表中間件sharding-jdbc,其版本爲sharding-jdbc1.4.1。數據庫

爲了方便你們對本文的源碼分析,先給出Mybatis層面核心類的方法調用序列圖。緩存

SQL執行序列圖

【圖文並茂】源碼解析MyBatis Sharding-Jdbc SQL語句執行流程詳解

源碼解析SQL執行流程

接下來從從源碼的角度對其進行剖析。微信

舒適提示:在本文的末尾,還會給出一張詳細的Mybatis Shardingjdbc語句執行流程圖。(請勿錯過哦)。mybatis

2.1 MapperProxy#invoker

1public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 2    if (Object.class.equals(method.getDeclaringClass())) {
 3      try {
 4        return method.invoke(this, args);
 5      } catch (Throwable t) {
 6        throw ExceptionUtil.unwrapThrowable(t);
 7      }
 8    }
 9    final MapperMethod mapperMethod = cachedMapperMethod(method);   // @1
10    return mapperMethod.execute(sqlSession, args);                                     // @2
11  }

代碼@1:建立並緩存MapperMethod對象。app

代碼@2:調用MapperMethod對象的execute方法,即mapperInterface中定義的每個方法最終會對應一個MapperMethod。框架

2.2 MapperMethod#execute

1public Object execute(SqlSession sqlSession, Object[] args) {
 2    Object result;
 3    if (SqlCommandType.INSERT == command.getType()) { 
 4      Object param = method.convertArgsToSqlCommandParam(args);
 5      result = rowCountResult(sqlSession.insert(command.getName(), param));
 6    } else if (SqlCommandType.UPDATE == command.getType()) {
 7      Object param = method.convertArgsToSqlCommandParam(args);
 8      result = rowCountResult(sqlSession.update(command.getName(), param));
 9    } else if (SqlCommandType.DELETE == command.getType()) {
10      Object param = method.convertArgsToSqlCommandParam(args);
11      result = rowCountResult(sqlSession.delete(command.getName(), param));
12    } else if (SqlCommandType.SELECT == command.getType()) {
13      if (method.returnsVoid() && method.hasResultHandler()) {
14        executeWithResultHandler(sqlSession, args);
15        result = null;
16      } else if (method.returnsMany()) {
17        result = executeForMany(sqlSession, args);
18      } else if (method.returnsMap()) {
19        result = executeForMap(sqlSession, args);
20      } else {
21        Object param = method.convertArgsToSqlCommandParam(args);
22        result = sqlSession.selectOne(command.getName(), param);
23      }
24    } else {
25      throw new BindingException("Unknown execution method for: " + command.getName());
26    }
27    if (result == null && method.getReturnType().isPrimitive() && !method.returnsVoid()) {
28      throw new BindingException("Mapper method '" + command.getName() 
29          + " attempted to return null from a method with a primitive return type (" + method.getReturnType() + ").");
30    }
31    return result;
32  }

該方法主要是根據SQL類型,insert、update、select等操做,執行對應的邏輯,本文咱們以查詢語句,進行跟蹤,進入executeForMany(sqlSession, args)方法。運維

2.3 MapperMethod#executeForMany

1private <E> Object executeForMany(SqlSession sqlSession, Object[] args) {
 2    List<E> result;
 3    Object param = method.convertArgsToSqlCommandParam(args);
 4    if (method.hasRowBounds()) {
 5      RowBounds rowBounds = method.extractRowBounds(args);
 6      result = sqlSession.<E>selectList(command.getName(), param, rowBounds);
 7    } else {
 8      result = sqlSession.<E>selectList(command.getName(), param);
 9    }
10    // issue #510 Collections & arrays support
11    if (!method.getReturnType().isAssignableFrom(result.getClass())) {
12      if (method.getReturnType().isArray()) {
13        return convertToArray(result);
14      } else {
15        return convertToDeclaredCollection(sqlSession.getConfiguration(), result);
16      }
17    }
18    return result;
19  }

該方法也比較簡單,最終經過SqlSession調用selectList方法。

2.4 DefaultSqlSession#selectList

1public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
 2    try {
 3      MappedStatement ms = configuration.getMappedStatement(statement);   // @1
 4      List<E> result = executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);   // @2
 5      return result;
 6    } catch (Exception e) {
 7      throw ExceptionFactory.wrapException("Error querying database.  Cause: " + e, e);
 8    } finally {
 9      ErrorContext.instance().reset();
10    }
11  }

代碼@1:根據資源名稱獲取對應的MappedStatement對象,此時的statement爲資源名稱,例如com.demo.UserMapper.findUser。至於MappedStatement對象的生成在上一節初始化時已詳細介紹過,此處再也不重複介紹。

代碼@2:調用Executor的query方法。這裏說明一下,其實一開始會進入到CachingExecutor#query方法,因爲CachingExecutor的Executor delegate屬性默認是SimpleExecutor,故最終仍是會進入到SimpleExecutor#query中。

接下來咱們進入到SimpleExecutor的父類BaseExecutor的query方法中。

2.5 BaseExecutor#query

1public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {   // @1
 2    ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
 3    if (closed) throw new ExecutorException("Executor was closed.");
 4    if (queryStack == 0 && ms.isFlushCacheRequired()) {
 5      clearLocalCache();
 6    }
 7    List<E> list;
 8    try {
 9      queryStack++;
10      list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;                                            // @2
11      if (list != null) {
12        handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
13      } else {
14        list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);                   // @3
15      }
16    } finally {
17      queryStack--;
18    }
19    if (queryStack == 0) {
20      for (DeferredLoad deferredLoad : deferredLoads) {
21        deferredLoad.load();
22      }
23      deferredLoads.clear(); // issue #601
24      if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {                         // @4
25        clearLocalCache(); // issue #482
26      }
27    }
28    return list;
29  }

代碼@1:首先介紹一下該方法的入參,這些類都是Mybatis的重要類:

  • MappedStatement ms
    映射語句,一個MappedStatemnet對象表明一個Mapper中的一個方法,是映射的最基本對象。
  • Object parameter
    SQL語句的參數列表。
  • RowBounds rowBounds
    行邊界對象,其實就是分頁參數limit與size。
  • ResultHandler resultHandler
    結果處理Handler。
  • CacheKey key
    Mybatis緩存Key
  • BoundSql boundSql
    SQL與參數綁定信息,從該對象能夠獲取在映射文件中的SQL語句。

    碼@2:首先從緩存中獲取,Mybatis支持一級緩存(SqlSession)與二級緩存(多個SqlSession共享)。

代碼@3:從數據庫查詢結果,而後進入到doQuery方法,執行真正的查詢動做。

代碼@4:若是一級緩存是語句級別的,則語句執行完畢後,刪除緩存。

2.6 SimpleExecutor#doQuery

1public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
 2    Statement stmt = null;
 3    try {
 4      Configuration configuration = ms.getConfiguration();
 5      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);   // @1
 6      stmt = prepareStatement(handler, ms.getStatementLog());                                                                                                                   // @2
 7      return handler.<E>query(stmt, resultHandler);                                                                                                                                        // @3
 8    } finally {
 9      closeStatement(stmt);
10    }
11  }

代碼@1:建立StatementHandler,這裏會加入Mybatis的插件擴展機制(將在下篇詳細介紹),如圖所示:

【圖文並茂】源碼解析MyBatis Sharding-Jdbc SQL語句執行流程詳解
在這裏插入圖片描述

代碼@2:建立Statement對象,注意,這裏就是JDBC協議的java.sql.Statement對象了。
代碼@3:使用Statment對象執行SQL語句。

接下來詳細介紹Statement對象的建立過程與執行過程,即分佈詳細跟蹤代碼@2與代碼@3。

Statement對象建立流程

3.1 java.sql.Connection對象建立

SimpleExecutor#prepareStatement

1private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
2    Statement stmt;
3    Connection connection = getConnection(statementLog);  // @1
4    stmt = handler.prepare(connection);                                  // @2
5    handler.parameterize(stmt);                                               // @3
6    return stmt;
7}

建立Statement對象,分紅三步:
代碼@1:建立java.sql.Connection對象。

代碼@2:使用Connection對象建立Statment對象。

代碼@3:對Statement進行額外處理,特別是PrepareStatement的參數設置(ParameterHandler)。

SimpleExecutor#getConnection

getConnection方法,根據上面流程圖所示,先是進入到SpringManagedTransaction,再經過spring-jdbc框架,利用DataSourceUtils獲取鏈接,其代碼以下:

1org.mybatis.spring.transaction.SpringManagedTransaction#doGetConnection
 2public static Connection doGetConnection(DataSource dataSource) throws SQLException {  
 3        Assert.notNull(dataSource, "No DataSource specified");
 4        ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); 
 5        if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
 6            conHolder.requested();
 7            if (!conHolder.hasConnection()) {
 8                conHolder.setConnection(dataSource.getConnection());
 9            }
10            return conHolder.getConnection();
11        }
12        // Else we either got no holder or an empty thread-bound holder here.
13
14        logger.debug("Fetching JDBC Connection from DataSource");
15        Connection con = dataSource.getConnection();      // @1
16
17        // 這裏省略與事務處理相關的代碼
18        return con;
19    }

代碼@1:經過DataSource獲取connection,那此處的DataSource是「誰」呢?看一下咱們工程的配置:

【圖文並茂】源碼解析MyBatis Sharding-Jdbc SQL語句執行流程詳解
【圖文並茂】源碼解析MyBatis Sharding-Jdbc SQL語句執行流程詳解
故最終dataSouce.getConnection獲取的鏈接,是從SpringShardingDataSource中獲取鏈接。

1com.dangdang.ddframe.rdb.sharding.jdbc.ShardingDataSource#getConnection
2public ShardingConnection getConnection() throws SQLException {
3        MetricsContext.init(shardingProperties);
4        return new ShardingConnection(shardingContext);
5}

返回的結果以下:

【圖文並茂】源碼解析MyBatis Sharding-Jdbc SQL語句執行流程詳解

備註:這裏只是返回了一個ShardingConnection對象,該對象包含了分庫分表上下文,但此時並無執行具體的分庫操做(切換數據源)。
Connection的獲取流程清楚後,咱們繼續來看一下Statemnet對象的建立。

3.2 java.sql.Statement對象建立

1stmt = prepareStatement(handler, ms.getStatementLog());

上面語句的調用鏈:RoutingStatementHandler -》BaseStatementHand

BaseStatementHandler#prepare

3public Statement prepare(Connection connection) throws SQLException {
 4    ErrorContext.instance().sql(boundSql.getSql());
 5    Statement statement = null;
 6    try {
 7      statement = instantiateStatement(connection);    // @1
 8      setStatementTimeout(statement);                         // @2
 9      setFetchSize(statement);                                      // @3
10      return statement;
11    } catch (SQLException e) {
12      closeStatement(statement);
13      throw e;
14    } catch (Exception e) {
15      closeStatement(statement);
16      throw new ExecutorException("Error preparing statement.  Cause: " + e, e);
17    }
18  }

代碼@1:根據Connection對象(本文中是ShardingConnection)來建立Statement對象,其默認實現類:PreparedStatementHandler#instantiateStatement方法。

代碼@2:爲Statement設置超時時間。

代碼@3:設置fetchSize。

1PreparedStatementHandler#instantiateStatement
 2protected Statement instantiateStatement(Connection connection) throws SQLException {
 3    String sql = boundSql.getSql();
 4    if (mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) {
 5      String[] keyColumnNames = mappedStatement.getKeyColumns();
 6      if (keyColumnNames == null) {
 7        return connection.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS);
 8      } else {
 9        return connection.prepareStatement(sql, keyColumnNames);
10      }
11    } else if (mappedStatement.getResultSetType() != null) {
12      return connection.prepareStatement(sql, mappedStatement.getResultSetType().getValue(), ResultSet.CONCUR_READ_ONLY);
13    } else {
14      return connection.prepareStatement(sql);
15    }
16  }

其實Statement對象的建立,就比較簡單了,既然Connection是ShardingConnection,那就看一下其對應的prepareStatement方法便可。

ShardingConnection#prepareStatement

1
 3public PreparedStatement prepareStatement(final String sql) throws SQLException {   // sql,爲配置在mybatis xml文件中的sql語句
 4        return new ShardingPreparedStatement(this, sql);
 5}
 6ShardingPreparedStatement(final ShardingConnection shardingConnection, 
 7            final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
 8        super(shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability);
 9        preparedSQLRouter = shardingConnection.getShardingContext().getSqlRouteEngine().prepareSQL(sql);
10}

在構建ShardingPreparedStatement對象的時候,會根據SQL語句建立解析SQL路由的解析器對象,但此時並不會執行相關的路由計算,PreparedStatement對象建立完成後,就開始進入SQL執行流程中。

SQL執行流程

接下來咱們繼續看SimpleExecutor#doQuery方法的第3步,執行SQL語句:

1handler.<E>query(stmt, resultHandler)。

首先會進入RoutingStatementHandler這個類中,進行Mybatis層面的路由(主要是根據Statement類型)

【圖文並茂】源碼解析MyBatis Sharding-Jdbc SQL語句執行流程詳解

而後進入到PreparedStatementHandler#query中。

PreparedStatementHandler#query

3public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {
4    PreparedStatement ps = (PreparedStatement) statement;
5    ps.execute();  // @1
6    return resultSetHandler.<E> handleResultSets(ps);  // @2
7}

代碼@1:調用PreparedStatement的execute方法,因爲本例是使用了Sharding-jdbc分庫分表,此時調用的具體實現爲:ShardingPreparedStatement。

代碼@2:處理結果。

咱們接下來分別來跟進execute與結果處理方法。

ShardingPreparedStatement#execute

2public boolean execute() throws SQLException {
3    try {
4        return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).execute(); // @1
5    } finally {
6        clearRouteContext();
7    }
8}

這裏奧妙無窮,其關鍵點以下:
1)創造PreparedStatementExecutor對象,其兩個核心參數:

  • ExecutorEngine executorEngine:shardingjdbc執行引擎。
  • Collection< PreparedStatementExecutorWrapper> preparedStatemenWrappers
    一個集合,每個集合是PreparedStatement的包裝類,這個集合如何而來?
    2)preparedStatemenWrappers是經過routeSQL方法產生的。

3)最終調用PreparedStatementExecutor方法的execute來執行。

接下來分別看一下routeSQL與execute方法。

ShardingPreparedStatement#routeSQL

3private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException {
 4        List<PreparedStatementExecutorWrapper> result = new ArrayList<>();
 5        SQLRouteResult sqlRouteResult = preparedSQLRouter.route(getParameters());   // @1
 6        MergeContext mergeContext = sqlRouteResult.getMergeContext();                      
 7        setMergeContext(mergeContext);
 8        setGeneratedKeyContext(sqlRouteResult.getGeneratedKeyContext());
 9        for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) {                      // @2          
10            PreparedStatement preparedStatement = (PreparedStatement) getStatement(getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatementType()), each.getSql());     // @3
11            replayMethodsInvocation(preparedStatement);
12            getParameters().replayMethodsInvocation(preparedStatement);
13            result.add(wrap(preparedStatement, each));
14        }
15        return result;
16}

代碼@1:根據SQL參數進行路由計算,本文暫不關注其具體實現細節,這些將在具體分析Sharding-jdbc時具體詳解,在這裏就直觀看一下其結果:

代碼@二、@3:對分庫分表的結果進行遍歷,而後使用底層Datasource來建立Connection,建立PreparedStatement 對象。

routeSQL就暫時講到這,從這裏咱們得知,會在這裏根據路由結果,使用底層的具體數據源建立對應的Connection與PreparedStatement 對象。

PreparedStatementExecutor#execute

1
 3public boolean execute() {
 4    Context context = MetricsContext.start("ShardingPreparedStatement-execute");
 5    eventPostman.postExecutionEvents();
 6    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
 7    final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
 8    try {
 9        if (1 == preparedStatementExecutorWrappers.size()) {     // @1
10            PreparedStatementExecutorWrapper preparedStatementExecutorWrapper = preparedStatementExecutorWrappers.iterator().next();
11            return executeInternal(preparedStatementExecutorWrapper, isExceptionThrown, dataMap);
12        }
13        List<Boolean> result = executorEngine.execute(preparedStatementExecutorWrappers, new ExecuteUnit<PreparedStatementExecutorWrapper, Boolean>() {    // @2
14
15            @Override
16            public Boolean execute(final PreparedStatementExecutorWrapper input) throws Exception {
17                synchronized (input.getPreparedStatement().getConnection()) {
18                    return executeInternal(input, isExceptionThrown, dataMap);
19                }
20            }
21        });
22        return (null == result || result.isEmpty()) ? false : result.get(0);
23    } finally {
24        MetricsContext.stop(context);
25    }
26 }

代碼@1:若是計算出來的路由信息爲1個,則同步執行。

代碼@2:若是計算出來的路由信息有多個,則使用線程池異步執行。

那還有一個問題,經過PreparedStatement#execute方法執行後,如何返回結果呢?特別是異步執行的。

在上文其實已經談到:

DefaultResultSetHandler#handleResultSets

1
 3public List<Object> handleResultSets(Statement stmt) throws SQLException {
 4    ErrorContext.instance().activity("handling results").object(mappedStatement.getId());
 5
 6    final List<Object> multipleResults = new ArrayList<Object>();
 7
 8    int resultSetCount = 0;
 9    ResultSetWrapper rsw = getFirstResultSet(stmt);         // @1
10    //省略部分代碼,完整代碼能夠查看DefaultResultSetHandler方法。
11    return collapseSingleResultList(multipleResults);
12  }
13
14private ResultSetWrapper getFirstResultSet(Statement stmt) throws SQLException {
15    ResultSet rs = stmt.getResultSet();              // @2
16    while (rs == null) {
17      // move forward to get the first resultset in case the driver
18      // doesn't return the resultset as the first result (HSQLDB 2.1)
19      if (stmt.getMoreResults()) {
20        rs = stmt.getResultSet();
21      } else {
22        if (stmt.getUpdateCount() == -1) {
23          // no more results. Must be no resultset
24          break;
25        }
26      }
27    }
28    return rs != null ? new ResultSetWrapper(rs, configuration) : null;
29  }

咱們看一下其關鍵代碼以下:
代碼@1:調用Statement#getResultSet()方法,若是使用shardingJdbc,則會調用ShardingStatement#getResultSet(),並會處理分庫分表結果集的合併,在這裏就不詳細進行介紹,該部分會在shardingjdbc專欄詳細分析。

代碼@2:jdbc statement中獲取結果集的通用寫法,這裏也不過多的介紹。

mybatis shardingjdbc SQL執行流程就介紹到這裏了,爲了方便你們對上述流程的理解,最後給出SQL執行的流程圖:
【圖文並茂】源碼解析MyBatis Sharding-Jdbc SQL語句執行流程詳解
Mybatis Sharding-Jdbc的SQL執行流程就介紹到這裏了,從圖中也能清晰看到Mybatis的拆件機制,將在下文詳細介紹。
查看更多文章請關注微信公衆號:
【圖文並茂】源碼解析MyBatis Sharding-Jdbc SQL語句執行流程詳解
一波廣告來襲,做者新書《RocketMQ技術內幕》已出版上市:
【圖文並茂】源碼解析MyBatis Sharding-Jdbc SQL語句執行流程詳解

《RocketMQ技術內幕》已出版上市,目前可在主流購物平臺(京東、天貓等)購買,本書從源碼角度深度分析了RocketMQ NameServer、消息發送、消息存儲、消息消費、消息過濾、主從同步HA、事務消息;在實戰篇重點介紹了RocketMQ運維管理界面與當前支持的39個運維命令;並在附錄部分羅列了RocketMQ幾乎全部的配置參數。本書獲得了RocketMQ創始人、阿里巴巴Messaging開源技術負責人、Linux OpenMessaging 主席的高度承認並做序推薦。目前是國內第一本成體系剖析RocketMQ的書籍。

相關文章
相關標籤/搜索