1、前言java
Sharding-JDBC 是一款優秀的分庫分表框架,從3.0開始,Sharding-JDBC改名爲Sharding-Sphere,以前用Sharding-JDBC 2時,對於同庫分表而言,sql執行是串行的,由於同數據源的connection只會獲取一個,而且對於connection加上了synchronized,因此對於同庫分表而言,整個執行過程徹底是串行的。最後爲了同庫分表能夠並行,不得不爲同一個庫配置多個鏈接池。Sharding-Sphere 3.0對執行引擎進行了優化,引入內存限制模式和鏈接限制模式來動態控制並行度。算法
本篇博客主要剖析如下兩個問題:sql
一、內存限制模式和鏈接限制模式是如何控制同一數據源串行和並行的多線程
二、執行引擎優雅的設計框架
2、Sharding-Sphere的兩種模式的差異異步
內存限制模式:對於同一數據源,若是有10張分表,那麼執行時,會獲取10個鏈接並行async
鏈接限制模式:對於同一數據源,若是有10張分表,那麼執行時,只會獲取1個鏈接串行ide
控制鏈接模式的算法以下:函數
更多設計的細節能夠仔細閱讀Sharding-Sphere官網:http://shardingsphere.io/document/current/cn/features/sharding/principle/execute/優化
3、jdbc知識點回顧
對於一個龐大分庫分表框架,咱們應該從哪一個入口看進去呢?對於基於JDBC規範實現的分庫分表框架,咱們只要理一下jdbc的執行過程,就知道了這個龐大框架的脈絡,下面一塊兒來回顧jdbc的執行過程。
一、加載驅動:Class.forName()
二、獲取鏈接connection
三、由connection建立Statement或者PreparedStatement
四、用Statement或者PreparedStatement執行SQL獲取結果集
五、關閉資源,流程結束
那麼要看懂Sharding-Sphere的SQL執行過程,從Statement或者PreparedStatement看進去就夠了。
4、源碼解析
從PreparedStatement爲入口,看進去,主要有以下5個類
一、ShardingPreparedStatement 實現了PreparedStatement接口
二、PreparedStatementExecutor繼承於AbstractStatementExecutor,是SQL的執行器
三、SQLExecutePrepareTemplate用於獲取分片執行單元,以及肯定鏈接模式(內存限制模式和鏈接限制模式)
四、ShardingExecuteEngine是執行引擎,提供一個多線程的執行環境,本質上而言,ShardingExecuteEngine不作任何業務相關的事情,只是提供多線程執行環境,執行傳入的回調函數(很是巧妙的設計)
類的關係以下,一目瞭然:
接下來,咱們從ShardingPreparedStatement的executeQuery方法看進去,代碼以下:
@Override public ResultSet executeQuery() throws SQLException { ResultSet result; try { clearPrevious(); sqlRoute(); initPreparedStatementExecutor(); MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getShardingRule(), preparedStatementExecutor.executeQuery(), routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable()); result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergeEngine.merge(), this); } finally { clearBatch(); } currentResultSet = result; return result; }
其中,initPreparedStatementExecutor用於初始化preparedStatementExecutor,初始化作了以下操做,根據路由單元獲取statement執行單元
public void init(final SQLRouteResult routeResult) throws SQLException { setSqlType(routeResult.getSqlStatement().getType()); getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits())); cacheStatements(); } private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException { return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() { @Override public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize); } @Override public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode); } }); }
那麼獲取statement執行單元時,是如何肯定鏈接模式的呢?getSqlExecutePrepareTemplate().getExecuteUnitGroups點進去看,SQLExecutePrepareTemplate作了什麼操做?
private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups( final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException { List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>(); int desiredPartitionSize = Math.max(sqlUnits.size() / maxConnectionsSizePerQuery, 1); List<List<SQLUnit>> sqlUnitGroups = Lists.partition(sqlUnits, desiredPartitionSize); ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY; List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitGroups.size()); int count = 0; for (List<SQLUnit> each : sqlUnitGroups) { result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback)); } return result; }
上面這段代碼就是文章開頭的公式,經過 maxConnectionsSizePerQuery來控制鏈接模式,當maxConnectionsSizePerQuery小於本數據源執行單元時,選擇鏈接限制模式,反之,則選擇內存限制模式
當preparedStatementExecutor被初始化完成,即可進行查詢
public List<QueryResult> executeQuery() throws SQLException { final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), getSqlType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException { return getQueryResult(statementExecuteUnit); } }; return executeCallback(executeCallback); }
這裏,callback是一個很是巧妙的設計,executeSQL便是須要執行的sql,這裏能夠根據須要去靈活實現,例如select、update等等操做,而executeCallback(executeCallback)即是真正的執行者,executeCallback調用sqlExecuteTemplate的executeGroup,把執行分組傳入ShardingExecuteEngine執行引擎。
@SuppressWarnings("unchecked") protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException { return sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback); } public final class SQLExecuteTemplate { private final ShardingExecuteEngine executeEngine; /** * Execute group. * * @param sqlExecuteGroups SQL execute groups * @param callback SQL execute callback * @param <T> class type of return value * @return execute result * @throws SQLException SQL exception */ public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups, final SQLExecuteCallback<T> callback) throws SQLException { return executeGroup(sqlExecuteGroups, null, callback); } /** * Execute group. * * @param sqlExecuteGroups SQL execute groups * @param firstCallback first SQL execute callback * @param callback SQL execute callback * @param <T> class type of return value * @return execute result * @throws SQLException SQL exception */ @SuppressWarnings("unchecked") public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups, final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException { try { return executeEngine.groupExecute((Collection) sqlExecuteGroups, firstCallback, callback); } catch (final SQLException ex) { ExecutorExceptionHandler.handleException(ex); return Collections.emptyList(); } } }
接下來,精彩的時刻到了,執行引擎作了哪些事情呢?請繼續往下看。
public <I, O> List<O> groupExecute( final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException { if (inputGroups.isEmpty()) { return Collections.emptyList(); } Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator(); ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next(); Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback); return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures); } private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) { Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>(); for (ShardingExecuteGroup<I> each : inputGroups) { result.add(asyncGroupExecute(each, callback)); } return result; } private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) { final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap(); return executorService.submit(new Callable<Collection<O>>() { @Override public Collection<O> call() throws SQLException { ShardingExecuteDataMap.setDataMap(dataMap); return callback.execute(inputGroup.getInputs(), false); } }); } private <I, O> Collection<O> syncGroupExecute(final ShardingExecuteGroup<I> executeGroup, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException { return callback.execute(executeGroup.getInputs(), true); }
sqlExecuteTemplate調用了ShardingExecuteEngine的groupExecute,groupExecute分爲兩個主要方法,asyncGroupExecute異步執行方法和syncGroupExecute同步執行方法,乍一看,不是多線程嗎?怎麼出現了一個同步,這裏的多線程運用很是巧妙,先從執行分組中取出第一個元素firstInputs,剩下的丟進asyncGroupExecute的線程池,第一個任務讓當前線程執行,不浪費一個線程。
這裏執行引擎真正執行的是傳入的回調函數,那麼這個回調源於哪裏呢?咱們再回頭去看看PreparedStatementExecutor的executeQuery方法,回調函數由此建立。
public List<QueryResult> executeQuery() throws SQLException { final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), getSqlType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException { return getQueryResult(statementExecuteUnit); } }; return executeCallback(executeCallback); }
全部的邏輯一鼓作氣,易於擴展,設計之巧妙,可貴的好代碼。
最後,Sharding-Sphere是一個很是優秀的分庫分表框架。
---------------------------------------------------------------------------------------------------------
快樂源於分享。
此博客乃做者原創, 轉載請註明出處