說在前面sql
本文轉自「天河聊技術」微信公衆號數據庫
sql路由這裏的內容比較多,包含單表路由或者綁定表路由、多庫多表路由、笛卡爾積路由,分三部分來介紹,今天先介紹單表或綁定表路由。微信
sql路由源碼解析app
com.dangdang.ddframe.rdb.sharding.routing.PreparedStatementRoutingEngine、com.dangdang.ddframe.rdb.sharding.routing.StatementRoutingEngine兩個sql路由引擎類,預編譯的用的比較多,咱們以預編譯的Statement的引擎類來跟蹤下sharding-jdbc是對sql怎麼進行路由的。ide
上層sql執行器接收到邏輯sql後再進行sql路由的時候會建立預編譯statement對象的路由器,所以會調用其構造器性能
/** * 預解析的SQL路由器. * * @author zhangliang */ public final class PreparedStatementRoutingEngine { // 邏輯sql private final String logicSQL; //sql路由器 private final SQLRouter sqlRouter; // sql語句對象 private SQLStatement sqlStatement; public PreparedStatementRoutingEngine(final String logicSQL, final ShardingContext shardingContext) { this.logicSQL = logicSQL; sqlRouter = SQLRouterFactory.createSQLRouter(shardingContext); }
/** * 路由引擎工廠. * * @author zhangiang */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class SQLRouterFactory { /** * 建立SQL路由器. * * @param shardingContext 數據源運行期上下文 * @return SQL路由器 */ // 這裏是靜態工廠方法實現 public static SQLRouter createSQLRouter(final ShardingContext shardingContext) { return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext); } }
接下來會建立ParsingSQLRouter對象ui
** * 須要解析的SQL路由器. * * @author zhangiang */ public final class ParsingSQLRouter implements SQLRouter { // 分庫分表配置對象 private final ShardingRule shardingRule; // 支持的數據庫類型 private final DatabaseType databaseType; // 是否要展現sql private final boolean showSQL; private final List<Number> generatedKeys; // 上面這些屬性值都是存儲在分片上下文中 public ParsingSQLRouter(final ShardingContext shardingContext) { shardingRule = shardingContext.getShardingRule(); databaseType = shardingContext.getDatabaseType(); showSQL = shardingContext.isShowSQL(); generatedKeys = new LinkedList<>(); }
這個方法是sql路由的入口方法this
/** * SQL路由. * 當第一次路由時進行SQL解析,以後的路由複用第一次的解析結果. * * @param parameters SQL中的參數 * @return 路由結果 */ public SQLRouteResult route(final List<Object> parameters) {//sql路由業務方法 if (null == sqlStatement) { sqlStatement = sqlRouter.parse(logicSQL, parameters.size()); } return sqlRouter.route(logicSQL, parameters, sqlStatement); }
進入到這個parse方法對象
sqlStatement = sqlRouter.parse(logicSQL, parameters.size());
@Override public SQLStatement parse(final String logicSQL, final int parametersSize) { // 建立sql解析引擎 SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule); // 開啓度量上下文 Context context = MetricsContext.start("Parse SQL"); // sql解析器解析得到sql語句對象 SQLStatement result = parsingEngine.parse(); if (result instanceof InsertStatement) { ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize); } MetricsContext.stop(context); return result; }
進入下面的sql路由方法,返回路由結果ip
return sqlRouter.route(logicSQL, parameters, sqlStatement);
private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) { Collection<String> tableNames = sqlStatement.getTables().getTableNames(); RoutingEngine routingEngine; // 若是表集合是1,或者是綁定表路由就走簡單路由規則 if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames)) { routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);//單表路由 } else { // TODO 可配置是否執行笛卡爾積 routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement); } return routingEngine.route();//tianhe TODO 笛卡爾積 }
建立簡單路由引擎 routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);//單表路由
/** * 簡單路由引擎. * * @author zhangliang */ @RequiredArgsConstructor public final class SimpleRoutingEngine implements RoutingEngine { // 分庫分表配置對象 private final ShardingRule shardingRule; // sql參數 private final List<Object> parameters; // 邏輯表名 private final String logicTableName; // sql語句對象 private final SQLStatement sqlStatement;
不是單表路由,就走多庫多表路由引擎,建立多庫多表路由對象
routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
/** * 混合多庫表路由引擎. * * @author gaohongtao * @author zhangliang */ @RequiredArgsConstructor @Slf4j public final class ComplexRoutingEngine implements RoutingEngine { private final ShardingRule shardingRule; private final List<Object> parameters; private final Collection<String> logicTables; private final SQLStatement sqlStatement;
return routingEngine.route();//tianhe TODO 笛卡爾積
這裏是路由邏輯,這裏有三種實現,一種是單表或者綁定表路由,一種是多庫多表路由,一種是笛卡爾積路由
單表或者綁定表路由
@Override public RoutingResult route() { // 根據邏輯表名得到表規則配置對象 TableRule tableRule = shardingRule.getTableRule(logicTableName); // 根據表規則配置對象得到數據源集合 Collection<String> routedDataSources = routeDataSources(tableRule); Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
根據邏輯表名得到表規則配置對象 TableRule tableRule = shardingRule.getTableRule(logicTableName);
/** * 根據邏輯表名稱查找分片規則. * * @param logicTableName 邏輯表名稱 * @return 該邏輯表的分片規則 */ public TableRule getTableRule(final String logicTableName) { // 根據邏輯表返回表規則配置對象 Optional<TableRule> tableRule = tryFindTableRule(logicTableName); if (tableRule.isPresent()) { return tableRule.get(); } // 若是默認數據源不爲空就根據默認數據源建立表配置規則對象 if (dataSourceRule.getDefaultDataSource().isPresent()) { return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule); } throw new ShardingJdbcException("Cannot find table rule and default data source with logic table: '%s'", logicTableName); }
// 若是默認數據源不爲空就根據默認數據源建立表配置規則對象 if (dataSourceRule.getDefaultDataSource().isPresent()) { return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule); }
// 根據默認數據源建立部分庫數據分片策略,數據表不分表分片策略對象,並建立表配置規則對象進行裝載 private TableRule createTableRuleWithDefaultDataSource(final String logicTableName, final DataSourceRule defaultDataSourceRule) { Map<String, DataSource> defaultDataSourceMap = new HashMap<>(1); defaultDataSourceMap.put(defaultDataSourceRule.getDefaultDataSourceName(), defaultDataSourceRule.getDefaultDataSource().get()); return TableRule.builder(logicTableName) .dataSourceRule(new DataSourceRule(defaultDataSourceMap)) .databaseShardingStrategy(new DatabaseShardingStrategy("", new NoneDatabaseShardingAlgorithm())) .tableShardingStrategy(new TableShardingStrategy("", new NoneTableShardingAlgorithm())).build(); }
返回到這裏
@Override public RoutingResult route() { // 根據邏輯表名得到表規則配置對象 TableRule tableRule = shardingRule.getTableRule(logicTableName); // 根據表規則配置對象得到數據源集合 Collection<String> routedDataSources = routeDataSources(tableRule); Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
// 根據表規則配置對象得到數據源集合 Collection<String> routedDataSources = routeDataSources(tableRule);
根據分片列獲取分片值
getShardingValues(strategy.getShardingColumns());
// 根據真實的數據源名稱和分片值計算靜態分片 Collection<String> result = strategy.doStaticSharding(tableRule.getActualDatasourceNames(), shardingValues);
/** * 計算靜態分片. * * @param availableTargetNames 全部的可用分片資源集合 * @param shardingValues 分片值集合 * @return 分庫後指向的數據源名稱集合 */ public Collection<String> doStaticSharding(final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) { Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); // 若是沒有解析到傳入的數據源分片值,要走全庫路由 if (shardingValues.isEmpty()) { result.addAll(availableTargetNames); } else { // 若是傳入分片值,根據分片值去獲取具體的數據源 result.addAll(doSharding(shardingValues, availableTargetNames)); } return result; }
注意上面的數據庫路由的默認實現,若是不傳入數據庫分片值會走全庫路由的,數據量大的話是會影響性能的,因此建議必需要傳入分片值,阿里的TDDL這裏的實現是直接報錯的。
// 若是傳入分片值,根據分片值去獲取具體的數據源 result.addAll(doSharding(shardingValues, availableTargetNames));
private Collection<String> doSharding(final Collection<ShardingValue<?>> shardingValues, final Collection<String> availableTargetNames) { // 若是沒分片 if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) { return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next())); } // 若是按一個分片值分片 if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) { SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm<?>) shardingAlgorithm; ShardingValue shardingValue = shardingValues.iterator().next(); switch (shardingValue.getType()) { case SINGLE: // = 元算符分片 return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue)); case LIST: // in運算符分片 return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue); case RANGE: // between運算符分片 return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue); default: // 如今只支持這三種運算符分片 throw new UnsupportedOperationException(shardingValue.getType().getClass().getName()); } } // 若是是多個分片值 if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) { return ((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues); } // 其餘方式的分片不支持 throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName()); }
返回到這裏
@Override public RoutingResult route() { // 根據邏輯表名得到表規則配置對象 TableRule tableRule = shardingRule.getTableRule(logicTableName); // 根據表規則配置對象得到數據源集合 Collection<String> routedDataSources = routeDataSources(tableRule); Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
根據數據源和表配置規則組裝路由map
routedMap.put(each, routeTables(tableRule, each));
下面這個方法是獲取路由的表的集合
private Collection<String> routeTables(final TableRule tableRule, final String routedDataSource) { // 獲取表分片策略 TableShardingStrategy strategy = shardingRule.getTableShardingStrategy(tableRule); // 獲取分片值 List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getTableShardingValuesFromHint(strategy.getShardingColumns()) : getShardingValues(strategy.getShardingColumns());//doDynamicSharding // 若是是動態分片走動態分片,若是是靜態分片走靜態分片 Collection<String> result = tableRule.isDynamic() ? strategy.doDynamicSharding(shardingValues) : strategy.doStaticSharding(tableRule.getActualTableNames(routedDataSource), shardingValues); Preconditions.checkState(!result.isEmpty(), "no table route info"); return result; }
/** * 計算動態分片. * * @param shardingValues 分片值集合 * @return 分庫後指向的分片資源集合 */ public Collection<String> doDynamicSharding(final Collection<ShardingValue<?>> shardingValues) {//doDynamicSharding Preconditions.checkState(!shardingValues.isEmpty(), "Dynamic table should contain sharding value."); Collection<String> availableTargetNames = Collections.emptyList(); Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); result.addAll(doSharding(shardingValues, availableTargetNames)); return result; }
返回到這裏
@Override public RoutingResult route() { // 根據邏輯表名得到表規則配置對象 TableRule tableRule = shardingRule.getTableRule(logicTableName); // 根據表規則配置對象得到數據源集合 Collection<String> routedDataSources = routeDataSources(tableRule); Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
// 生成路由結果 return generateRoutingResult(tableRule, routedMap);
private RoutingResult generateRoutingResult(final TableRule tableRule, final Map<String, Collection<String>> routedMap) { RoutingResult result = new RoutingResult(); // 遍歷roadMap,roadMap裏面key值存儲的是數據源名稱,value值是物理數據表集合 for (Entry<String, Collection<String>> entry : routedMap.entrySet()) { // 獲取最下數據單元,每一個數據單元是一個DataNode Collection<DataNode> dataNodes = tableRule.getActualDataNodes(entry.getKey(), entry.getValue()); for (DataNode each : dataNodes) { // 組裝數據表單元裝載到路由結果中 result.getTableUnits().getTableUnits().add(new TableUnit(each.getDataSourceName(), logicTableName, each.getTableName())); } } return result; }
數據模型
/** * SQL路由結果. * * @author gaohongtao * @author zhangliang */ @RequiredArgsConstructor @Getter public final class SQLRouteResult { // sql語句對象 private final SQLStatement sqlStatement; // 最小sql執行單元集合 private final Set<SQLExecutionUnit> executionUnits = new LinkedHashSet<>(); private final List<Number> generatedKeys = new LinkedList<>(); }
/** * SQL最小執行單元. * * @author gaohongtao */ @RequiredArgsConstructor @Getter @EqualsAndHashCode @ToString public final class SQLExecutionUnit { // 具體的數據源 private final String dataSource; // 具體要執行的物理sql語句 private final String sql; }
/** * 路由表單元. * * @author zhangliang */ @RequiredArgsConstructor @Getter @EqualsAndHashCode @ToString public final class TableUnit { // 數據源名 private final String dataSourceName; // 邏輯表名 private final String logicTableName; // 物理表名 private final String actualTableName; }
/** * 路由表單元集合. * * @author zhangliang */ @Getter @ToString public final class TableUnits { // 路由表單元集合 private final List<TableUnit> tableUnits = new LinkedList<>();
/** * 路由結果. * * @author zhangliang */ @Getter public class RoutingResult { // 表路由單元集合 private final TableUnits tableUnits = new TableUnits();
/** * 路由表單元. * * @author zhangliang */ @RequiredArgsConstructor @Getter @EqualsAndHashCode @ToString public final class TableUnit { // 數據源名 private final String dataSourceName; // 邏輯表名 private final String logicTableName; // 物理表名 private final String actualTableName; }
/** * 分庫分表數據單元. * * @author zhangliang */ @RequiredArgsConstructor @Getter @EqualsAndHashCode @ToString public class DataNode { private static final String DELIMITER = "."; private final String dataSourceName;//數據庫名 private final String tableName;//表名
說到最後
以上介紹,僅供參考。