上一篇文章咱們分析了sharding-jdbc 解析select語句(sql 解析之 Select),今天咱們分析下sql路由。sql
聲明:本文基於1.5.M1版本數據庫
下面咱們以上篇文章的Select語句分析:編程
SELECT o.order_id FROM order o WHERE o.order_id = 4bash
在分析以前首先看下分庫分表的配置:post
Map<String, DataSource> dataSourceMap = new HashMap<>();
dataSourceMap.put("ds_0", null);
dataSourceMap.put("ds_1", null);
DataSourceRule dataSourceRule = new DataSourceRule(dataSourceMap);
TableRule orderTableRule = TableRule.builder("order").actualTables(Lists.newArrayList("order_0", "order_1")).dataSourceRule(dataSourceRule).build();
TableRule orderItemTableRule = TableRule.builder("order_item").actualTables(Lists.newArrayList("order_item_0", "order_item_1")).dataSourceRule(dataSourceRule).build();
TableRule orderAttrTableRule = TableRule.builder("order_attr").actualTables(Lists.newArrayList("ds_0.order_attr_a", "ds_1.order_attr_b")).dataSourceRule(dataSourceRule)
.tableShardingStrategy(new TableShardingStrategy("order_id", new OrderAttrShardingAlgorithm())).build();
shardingRule = ShardingRule.builder().dataSourceRule(dataSourceRule).tableRules(Lists.newArrayList(orderTableRule, orderItemTableRule, orderAttrTableRule))
.bindingTableRules(Collections.singletonList(new BindingTableRule(Arrays.asList(orderTableRule, orderItemTableRule))))
.databaseShardingStrategy(new DatabaseShardingStrategy("order_id", new OrderShardingAlgorithm()))
.tableShardingStrategy(new TableShardingStrategy("order_id", new OrderShardingAlgorithm())).build();
複製代碼
order表分了2個庫,2個表,以order_id爲分片鍵ui
public StatementRoutingEngine(final ShardingContext shardingContext) {
sqlRouter = SQLRouterFactory.createSQLRouter(shardingContext);
}
/**
* SQL路由.
*
* @param logicSQL 邏輯SQL
* @return 路由結果
*/
public SQLRouteResult route(final String logicSQL) {
SQLStatement sqlStatement = sqlRouter.parse(logicSQL, 0);
return sqlRouter.route(logicSQL, Collections.emptyList(), sqlStatement);
}
複製代碼
這裏判斷是否只分庫,若只分庫,則new UnparsingSQLRouter,不須要走SQL解析的邏輯(直接落到具體的庫,執行SQL便可),不然new ParsingSQLRouterspa
/**
* 建立SQL路由器.
*
* @param shardingContext 數據源運行期上下文
* @return SQL路由器
*/
public static SQLRouter createSQLRouter(final ShardingContext shardingContext) {
return HintManagerHolder.isDatabaseShardingOnly() ? new UnparsingSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext);
}
複製代碼
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
final Context context = MetricsContext.start("Route SQL");
SQLRouteResult result = new SQLRouteResult(sqlStatement);
if (sqlStatement instanceof InsertStatement && null != ((InsertStatement) sqlStatement).getGeneratedKey()) {
//insert 語句處理主鍵(有空分析分析)
processGeneratedKey(parameters, (InsertStatement) sqlStatement, result);
}
//路由
RoutingResult routingResult = route(parameters, sqlStatement);
...(後面是重寫的邏輯,先省略)
MetricsContext.stop(context);
logSQLRouteResult(result, parameters);
return result;
}
複製代碼
若單表,走SimpleRoutingEngine#route,不然走ComplexRoutingEngine#routecode
private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
Collection<String> tableNames = sqlStatement.getTables().getTableNames();
RoutingEngine routingEngine;
if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames)) {
routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
} else {
// TODO 可配置是否執行笛卡爾積
routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
}
return routingEngine.route();
}
複製代碼
正常狀況下都是單表,咱們就以單表的狀況分析cdn
public RoutingResult route() {
TableRule tableRule = shardingRule.getTableRule(logicTableName);
Collection<String> routedDataSources = routeDataSources(tableRule);
Collection<String> routedTables = routeTables(tableRule, routedDataSources);
return generateRoutingResult(tableRule, routedDataSources, routedTables);
}
複製代碼
/**
* 根據邏輯表名稱查找分片規則.
*
* @param logicTableName 邏輯表名稱
* @return 該邏輯表的分片規則
*/
public TableRule getTableRule(final String logicTableName) {
Optional<TableRule> tableRule = tryFindTableRule(logicTableName);
if (tableRule.isPresent()) {
return tableRule.get();
}
if (dataSourceRule.getDefaultDataSource().isPresent()) {
return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule);
}
throw new ShardingJdbcException("Cannot find table rule and default data source with logic table: '%s'", logicTableName);
}
複製代碼
private TableRule createTableRuleWithDefaultDataSource(final String logicTableName, final DataSourceRule defaultDataSourceRule) {
Map<String, DataSource> defaultDataSourceMap = new HashMap<>(1);
defaultDataSourceMap.put(defaultDataSourceRule.getDefaultDataSourceName(), defaultDataSourceRule.getDefaultDataSource().get());
return TableRule.builder(logicTableName)
.dataSourceRule(new DataSourceRule(defaultDataSourceMap))
.databaseShardingStrategy(new DatabaseShardingStrategy("", new NoneDatabaseShardingAlgorithm()))
.tableShardingStrategy(new TableShardingStrategy("", new NoneTableShardingAlgorithm())).build();
}
複製代碼
private Collection<String> routeDataSources(final TableRule tableRule) {
一、根據TableRule 獲取數據庫分片策略
DatabaseShardingStrategy strategy = shardingRule.getDatabaseShardingStrategy(tableRule);
二、判斷有沒有用強制路由,有的話直接用強制路由的value,沒有的話就用咱們查詢條件裏面用到的分片value
List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getDatabaseShardingValuesFromHint(strategy.getShardingColumns())
: getShardingValues(strategy.getShardingColumns());
logBeforeRoute("database", logicTableName, tableRule.getActualDatasourceNames(), strategy.getShardingColumns(), shardingValues);
三、調用分片策略計算分片值
Collection<String> result = strategy.doStaticSharding(sqlStatement.getType(), tableRule.getActualDatasourceNames(), shardingValues);
logAfterRoute("database", logicTableName, result);
Preconditions.checkState(!result.isEmpty(), "no database route info");
return result;
}
複製代碼
private Collection<String> routeTables(final TableRule tableRule, final Collection<String> routedDataSources) {
TableShardingStrategy strategy = shardingRule.getTableShardingStrategy(tableRule);
List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getTableShardingValuesFromHint(strategy.getShardingColumns())
: getShardingValues(strategy.getShardingColumns());
logBeforeRoute("table", logicTableName, tableRule.getActualTables(), strategy.getShardingColumns(), shardingValues);
Collection<String> result = tableRule.isDynamic() ? strategy.doDynamicSharding(shardingValues)
: strategy.doStaticSharding(sqlStatement.getType(), tableRule.getActualTableNames(routedDataSources), shardingValues);
logAfterRoute("table", logicTableName, result);
Preconditions.checkState(!result.isEmpty(), "no table route info");
return result;
}
複製代碼
強制路由的感受能夠單獨寫一篇文章說,因此就不分析了,之後寫,咱們看不走強制路由的邏輯。對象
private List<ShardingValue<?>> getShardingValues(final Collection<String> shardingColumns) {
List<ShardingValue<?>> result = new ArrayList<>(shardingColumns.size());
for (String each : shardingColumns) {
//SQL解析的getConditions對象(上一篇文章簡單分析過),這裏查找分片列是否存在,存在就轉換爲ShardingValue
Optional<Condition> condition = sqlStatement.getConditions().find(new Column(each, logicTableName));
if (condition.isPresent()) {
result.add(condition.get().getShardingValue(parameters));
}
}
return result;
}
複製代碼
operator:這個能夠理解爲條件對象的操做符號(=、in、between)
/**
* 將條件對象轉換爲分片值.
*
* @param parameters 參數列表
* @return 分片值
*/
public ShardingValue<?> getShardingValue(final List<Object> parameters) {
List<Comparable<?>> conditionValues = getValues(parameters);
switch (operator) {
case EQUAL:
return new ShardingValue<Comparable<?>>(column.getTableName(), column.getName(), conditionValues.get(0));
case IN:
return new ShardingValue<>(column.getTableName(), column.getName(), conditionValues);
case BETWEEN:
return new ShardingValue<>(column.getTableName(), column.getName(), Range.range(conditionValues.get(0), BoundType.CLOSED, conditionValues.get(1), BoundType.CLOSED));
default:
throw new UnsupportedOperationException(operator.getExpression());
}
}
複製代碼
positionValueMap:存放條件值(分片Value),positionIndexMap:這段邏輯彷佛沒太看懂。。
private List<Comparable<?>> getValues(final List<Object> parameters) {
List<Comparable<?>> result = new LinkedList<>(positionValueMap.values());
for (Entry<Integer, Integer> entry : positionIndexMap.entrySet()) {
Object parameter = parameters.get(entry.getValue());
if (!(parameter instanceof Comparable<?>)) {
throw new ShardingJdbcException("Parameter `%s` should extends Comparable for sharding value.", parameter);
}
if (entry.getKey() < result.size()) {
result.add(entry.getKey(), (Comparable<?>) parameter);
} else {
result.add((Comparable<?>) parameter);
}
}
return result;
}
複製代碼
/**
* 計算靜態分片.
*
* @param sqlType SQL語句的類型
* @param availableTargetNames 全部的可用分片資源集合
* @param shardingValues 分片值集合
* @return 分庫後指向的數據源名稱集合
*/
public Collection<String> doStaticSharding(final SQLType sqlType, final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) {
Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
if (shardingValues.isEmpty()) {
Preconditions.checkState(!isInsertMultiple(sqlType, availableTargetNames), "INSERT statement should contain sharding value.");
result.addAll(availableTargetNames);
} else {
result.addAll(doSharding(shardingValues, availableTargetNames));
}
return result;
}
複製代碼
通常咱們的表實現SingleKeyShardingAlgorithm類,定義咱們本身的分片邏輯,返回計算出來的結果值
private Collection<String> doSharding(final Collection<ShardingValue<?>> shardingValues, final Collection<String> availableTargetNames) {
if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) {
return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next()));
}
if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) {
SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm<?>) shardingAlgorithm;
ShardingValue shardingValue = shardingValues.iterator().next();
switch (shardingValue.getType()) {
case SINGLE:
return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue));
case LIST:
return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue);
case RANGE:
return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue);
default:
throw new UnsupportedOperationException(shardingValue.getType().getClass().getName());
}
}
if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) {
return ((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues);
}
throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName());
}
複製代碼
過濾獲取真實的DataNode,組裝RoutingResult
private RoutingResult generateRoutingResult(final TableRule tableRule, final Collection<String> routedDataSources, final Collection<String> routedTables) {
RoutingResult result = new RoutingResult();
for (DataNode each : tableRule.getActualDataNodes(routedDataSources, routedTables)) {
result.getTableUnits().getTableUnits().add(new TableUnit(each.getDataSourceName(), logicTableName, each.getTableName()));
}
return result;
}
複製代碼
小尾巴走一波,歡迎關注個人公衆號,不按期分享編程、投資、生活方面的感悟:)