🙂🙂🙂關注微信公衆號:【芋道源碼】有福利:java
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
本文主要基於 Sharding-JDBC 1.5.0 正式版git
本文分享分表分庫路由相關的實現。涉及內容以下:github
內容順序如編號。算法
Sharding-JDBC 正在收集使用公司名單:傳送門。
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門sql
SQL 路由大致流程以下:數據庫
通過 SQL解析、SQL路由後,產生SQL路由結果,即 SQLRouteResult。根據路由結果,生成SQL,執行SQL。編程
sqlStatement
:SQL語句對象,通過SQL解析的結果對象。executionUnits
:SQL最小執行單元集合。SQL執行時,執行每一個單元。generatedKeys
:插入SQL語句生成的主鍵編號集合。目前不支持批量插入而使用集合的緣由,猜想是爲了將來支持批量插入作準備。ShardingStrategy,分片策略。目前支持兩種分片:api
分片資源:在分庫策略裏指的是庫,在分表策略裏指的是表。微信
【1】 計算靜態分片(經常使用)app
// ShardingStrategy.java /** * 計算靜態分片. * @param sqlType SQL語句的類型 * @param availableTargetNames 全部的可用分片資源集合 * @param shardingValues 分片值集合 * @return 分庫後指向的數據源名稱集合 */ public Collection<String> doStaticSharding(final SQLType sqlType, final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) { Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); if (shardingValues.isEmpty()) { Preconditions.checkState(!isInsertMultiple(sqlType, availableTargetNames), "INSERT statement should contain sharding value."); // 插入不能有多資源對象 result.addAll(availableTargetNames); } else { result.addAll(doSharding(shardingValues, availableTargetNames)); } return result; } /** * 插入SQL 是否插入多個分片 * @param sqlType SQL類型 * @param availableTargetNames 全部的可用分片資源集合 * @return 是否 */ private boolean isInsertMultiple(final SQLType sqlType, final Collection<String> availableTargetNames) { return SQLType.INSERT == sqlType && availableTargetNames.size() > 1; }
【2】計算動態分片
// ShardingStrategy.java /** * 計算動態分片. * @param shardingValues 分片值集合 * @return 分庫後指向的分片資源集合 */ public Collection<String> doDynamicSharding(final Collection<ShardingValue<?>> shardingValues) { Preconditions.checkState(!shardingValues.isEmpty(), "Dynamic table should contain sharding value."); // 動態分片必須有分片值 Collection<String> availableTargetNames = Collections.emptyList(); Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); result.addAll(doSharding(shardingValues, availableTargetNames)); return result; }
TableRule.dynamic=true
😈 悶了,看起來二者沒啥區別?答案在分片算法上。咱們先看 #doSharding()
方法的實現。
// ShardingStrategy.java /** * 計算分片 * @param shardingValues 分片值集合 * @param availableTargetNames 全部的可用分片資源集合 * @return 分庫後指向的分片資源集合 */ private Collection<String> doSharding(final Collection<ShardingValue<?>> shardingValues, final Collection<String> availableTargetNames) { // 無片鍵 if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) { return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next())); } // 單片鍵 if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) { SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm<?>) shardingAlgorithm; ShardingValue shardingValue = shardingValues.iterator().next(); switch (shardingValue.getType()) { case SINGLE: return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue)); case LIST: return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue); case RANGE: return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue); default: throw new UnsupportedOperationException(shardingValue.getType().getClass().getName()); } } // 多片鍵 if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) { return ((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues); } throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName()); }
public interface NoneKeyShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm { String doSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue); }
public interface SingleKeyShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm { String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue); Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue); Collection<String> doBetweenSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue); }
ShardingValueType | SQL 操做符 | 接口方法 |
---|---|---|
SINGLE | = | #doEqualSharding() |
LIST | IN | #doInSharding() |
RANGE | BETWEEN | #doBetweenSharding() |
public interface MultipleKeysShardingAlgorithm extends ShardingAlgorithm { Collection<String> doSharding(Collection<String> availableTargetNames, Collection<ShardingValue<?>> shardingValues); }
分片算法類結構以下:
來看看 Sharding-JDBC 實現的無需分庫的分片算法 NoneDatabaseShardingAlgorithm (NoneTableShardingAlgorithm 基本如出一轍):
public final class NoneDatabaseShardingAlgorithm implements SingleKeyDatabaseShardingAlgorithm<String>, MultipleKeysDatabaseShardingAlgorithm { @Override public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) { return availableTargetNames; } @Override public String doEqualSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) { return availableTargetNames.isEmpty() ? null : availableTargetNames.iterator().next(); } @Override public Collection<String> doInSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) { return availableTargetNames; } @Override public Collection<String> doBetweenSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) { return availableTargetNames; } }
#doEqualSharding()
返回的是第一個分片資源。再來看測試目錄下實現的餘數基偶分表算法 ModuloTableShardingAlgorithm 的實現:
// com.dangdang.ddframe.rdb.integrate.fixture.ModuloTableShardingAlgorithm.java public final class ModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> { @Override public String doEqualSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) { for (String each : tableNames) { if (each.endsWith(shardingValue.getValue() % 2 + "")) { return each; } } throw new UnsupportedOperationException(); } @Override public Collection<String> doInSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) { Collection<String> result = new LinkedHashSet<>(tableNames.size()); for (Integer value : shardingValue.getValues()) { for (String tableName : tableNames) { if (tableName.endsWith(value % 2 + "")) { result.add(tableName); } } } return result; } @Override public Collection<String> doBetweenSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) { Collection<String> result = new LinkedHashSet<>(tableNames.size()); Range<Integer> range = shardingValue.getValueRange(); for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) { for (String each : tableNames) { if (each.endsWith(i % 2 + "")) { result.add(each); } } } return result; } }
😈 來看看動態計算分片須要怎麼實現分片算法。
// com.dangdang.ddframe.rdb.integrate.fixture.SingleKeyDynamicModuloTableShardingAlgorithm.java public final class SingleKeyDynamicModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> { /** * 表前綴 */ private final String tablePrefix; @Override public String doEqualSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) { return tablePrefix + shardingValue.getValue() % 10; } @Override public Collection<String> doInSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) { Collection<String> result = new LinkedHashSet<>(shardingValue.getValues().size()); for (Integer value : shardingValue.getValues()) { result.add(tablePrefix + value % 10); } return result; } @Override public Collection<String> doBetweenSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) { Collection<String> result = new LinkedHashSet<>(availableTargetNames.size()); Range<Integer> range = shardingValue.getValueRange(); for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) { result.add(tablePrefix + i % 10); } return result; } }
SQLRouter,SQL 路由器接口,共有兩種實現:
它們實現 #parse()
進行SQL解析,#route()
進行SQL路由。
RoutingEngine,路由引擎接口,共有四種實現:
ComplexRoutingEngine 根據路由結果會轉化成 SimpleRoutingEngine 或 ComplexRoutingEngine。下文會看相應源碼。
路由結果有兩種:
從圖中,咱們已經能大概看到二者有什麼區別,更具體的下文隨源碼一塊兒分享。
😈 SQLRouteResult 和 RoutingResult 有什麼區別?
一會兒看到這麼多"對象",可能有點緊張。沒關係張,咱們一塊兒在整理下。
路由器 | 路由引擎 | 路由結果 |
---|---|---|
DatabaseHintSQLRouter | DatabaseHintRoutingEngine | RoutingResult |
ParsingSQLRouter | SimpleRoutingEngine | RoutingResult |
ParsingSQLRouter | CartesianRoutingEngine | CartesianRoutingResult |
😈 逗比博主給你們解決了"對象",是否是應該分享朋友圈。
DatabaseHintSQLRouter,基於數據庫提示的路由引擎。路由器工廠 SQLRouterFactory 建立路由器時,判斷到使用數據庫提示( Hint ) 時,建立 DatabaseHintSQLRouter。
// DatabaseHintRoutingEngine.java public static SQLRouter createSQLRouter(final ShardingContext shardingContext) { return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext); }
先來看下 HintManagerHolder、HintManager 部分相關的代碼:
// HintManagerHolder.java public final class HintManagerHolder { /** * HintManager 線程變量 */ private static final ThreadLocal<HintManager> HINT_MANAGER_HOLDER = new ThreadLocal<>(); /** * 判斷是否當前只分庫. * * @return 是否當前只分庫. */ public static boolean isDatabaseShardingOnly() { return null != HINT_MANAGER_HOLDER.get() && HINT_MANAGER_HOLDER.get().isDatabaseShardingOnly(); } /** * 清理線索分片管理器的本地線程持有者. */ public static void clear() { HINT_MANAGER_HOLDER.remove(); } } // HintManager.java public final class HintManager implements AutoCloseable { /** * 庫分片值集合 */ private final Map<ShardingKey, ShardingValue<?>> databaseShardingValues = new HashMap<>(); /** * 只作庫分片 * {@link DatabaseHintRoutingEngine} */ @Getter private boolean databaseShardingOnly; /** * 獲取線索分片管理器實例. * * @return 線索分片管理器實例 */ public static HintManager getInstance() { HintManager result = new HintManager(); HintManagerHolder.setHintManager(result); return result; } /** * 設置分庫分片值. * * <p>分片操做符爲等號.該方法適用於只分庫的場景</p> * * @param value 分片值 */ public void setDatabaseShardingValue(final Comparable<?> value) { databaseShardingOnly = true; addDatabaseShardingValue(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME, value); } }
那麼若是要使用 DatabaseHintSQLRouter,咱們只須要 HintManager.getInstance().setDatabaseShardingValue(庫分片值)
便可。這裏有兩點要注意下:
HintManager#getInstance()
,每次獲取到的都是新的 HintManager,屢次賦值須要當心。HintManager#close()
,使用完須要去清理,避免下個請求讀到遺漏的線程變量。看看 DatabaseHintSQLRouter 的實現:
// DatabaseHintSQLRouter.java @Override public SQLStatement parse(final String logicSQL, final int parametersSize) { return new SQLJudgeEngine(logicSQL).judge(); // 只解析 SQL 類型 } @Override // TODO insert的SQL仍然須要解析自增主鍵 public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) { Context context = MetricsContext.start("Route SQL"); SQLRouteResult result = new SQLRouteResult(sqlStatement); // 路由 RoutingResult routingResult = new DatabaseHintRoutingEngine(shardingRule.getDataSourceRule(), shardingRule.getDatabaseShardingStrategy(), sqlStatement.getType()) .route(); // SQL最小執行單元 for (TableUnit each : routingResult.getTableUnits().getTableUnits()) { result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), logicSQL)); } MetricsContext.stop(context); if (showSQL) { SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters); } return result; }
#parse()
只解析了 SQL 類型,即 SELECT / UPDATE / DELETE / INSERT 。actualTables
屬性也是沒有效果的。TODO
應該會支持。HintManager.getInstance().setDatabaseShardingValue(庫分片值)
設置的庫分片值使用的是 EQUALS,於是分庫策略計算出來的只有一個庫分片,即 TableUnit 只有一個,SQLExecutionUnit 只有一個。看看 DatabaseHintSQLRouter 的實現:
// DatabaseHintRoutingEngine.java @Override public RoutingResult route() { // 從 Hint 得到 分片鍵值 Optional<ShardingValue<?>> shardingValue = HintManagerHolder.getDatabaseShardingValue(new ShardingKey(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME)); Preconditions.checkState(shardingValue.isPresent()); log.debug("Before database sharding only db:{} sharding values: {}", dataSourceRule.getDataSourceNames(), shardingValue.get()); // 路由。表分片規則使用的是 ShardingRule 裏的。由於沒 SQL 解析。 Collection<String> routingDataSources = databaseShardingStrategy.doStaticSharding(sqlType, dataSourceRule.getDataSourceNames(), Collections.<ShardingValue<?>>singleton(shardingValue.get())); Preconditions.checkState(!routingDataSources.isEmpty(), "no database route info"); log.debug("After database sharding only result: {}", routingDataSources); // 路由結果 RoutingResult result = new RoutingResult(); for (String each : routingDataSources) { result.getTableUnits().getTableUnits().add(new TableUnit(each, "", "")); } return result; }
databaseShardingStrategy.doStaticSharding()
方法計算庫分片。new TableUnit(each, "", "")
的 logicTableName
,actualTableName
都是空串,相信緣由你已經知道。ParsingSQLRouter,須要解析的SQL路由器。
ParsingSQLRouter 使用 SQLParsingEngine 解析SQL。對SQL解析有興趣的同窗能夠看看拙做《Sharding-JDBC 源碼分析 —— SQL 解析》。
// ParsingSQLRouter.java public SQLStatement parse(final String logicSQL, final int parametersSize) { SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule); Context context = MetricsContext.start("Parse SQL"); SQLStatement result = parsingEngine.parse(); if (result instanceof InsertStatement) { ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize); } MetricsContext.stop(context); return result; }
#appendGenerateKeyToken()
會在《SQL 改寫》分享ParsingSQLRouter 在路由時,會根據表狀況使用 SimpleRoutingEngine 或 CartesianRoutingEngine 進行路由。
private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) { Collection<String> tableNames = sqlStatement.getTables().getTableNames(); RoutingEngine routingEngine; if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames)) { routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement); } else { // TODO 可配置是否執行笛卡爾積 routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement); } return routingEngine.route(); }
tableNames.iterator().next()
注意下,tableNames
變量是 new TreeMap<>(String.CASE_INSENSITIVE_ORDER)
。因此 SELECT * FROM t_order o join t_order_item i ON o.order_id = i.order_id
即便 t_order_item
排在 t_order
前面,tableNames.iterator().next()
返回的是 t_order
。當 t_order
和 t_order_item
爲 BindingTable關係 時,計算的是 t_order
路由分片。tableRules
配置。配置該關係 TableRule 有以下須要遵照的規則:
舉個例子:
SELECT * FROM t_order o join t_order_item i ON o.order_id = i.order_id
multi_db_multi_table_01 ├── t_order_0 ├── t_order_item_01 └── t_order_1 ├── t_order_item_02 ├── t_order_item_03 ├── t_order_item_04 multi_db_multi_table_02 ├── t_order_0 ├── t_order_item_01 └── t_order_1 ├── t_order_item_02 ├── t_order_item_03 ├── t_order_item_04
最終執行的SQL以下:
SELECT * FROM t_order_item_01 i JOIN t_order_01 o ON o.order_id = i.order_id SELECT * FROM t_order_item_01 i JOIN t_order_01 o ON o.order_id = i.order_id SELECT * FROM t_order_item_02 i JOIN t_order_02 o ON o.order_id = i.order_id SELECT * FROM t_order_item_02 i JOIN t_order_02 o ON o.order_id = i.order_id
t_order_item_03
、t_order_item_04
沒法被查詢到。下面咱們看看 #isAllBindingTables()
如何實現多表互爲BindingTable關係。
// ShardingRule.java // 調用順序 #isAllBindingTables()=>#filterAllBindingTables()=>#findBindingTableRule()=>#findBindingTableRule() /** * 判斷邏輯表名稱集合是否所有屬於Binding表. * @param logicTables 邏輯表名稱集合 */ public boolean isAllBindingTables(final Collection<String> logicTables) { Collection<String> bindingTables = filterAllBindingTables(logicTables); return !bindingTables.isEmpty() && bindingTables.containsAll(logicTables); } /** * 過濾出全部的Binding表名稱. */ public Collection<String> filterAllBindingTables(final Collection<String> logicTables) { if (logicTables.isEmpty()) { return Collections.emptyList(); } Optional<BindingTableRule> bindingTableRule = findBindingTableRule(logicTables); if (!bindingTableRule.isPresent()) { return Collections.emptyList(); } // 交集 Collection<String> result = new ArrayList<>(bindingTableRule.get().getAllLogicTables()); result.retainAll(logicTables); return result; } /** * 得到包含<strong>任一</strong>在邏輯表名稱集合的binding表配置的邏輯表名稱集合 */ private Optional<BindingTableRule> findBindingTableRule(final Collection<String> logicTables) { for (String each : logicTables) { Optional<BindingTableRule> result = findBindingTableRule(each); if (result.isPresent()) { return result; } } return Optional.absent(); } /** * 根據邏輯表名稱獲取binding表配置的邏輯表名稱集合. */ public Optional<BindingTableRule> findBindingTableRule(final String logicTable) { for (BindingTableRule each : bindingTableRules) { if (each.hasLogicTable(logicTable)) { return Optional.of(each); } } return Optional.absent(); }
[a, b, c]
,而不能是 [a, b], [b, c]
。SimpleRoutingEngine,簡單路由引擎。
// SimpleRoutingEngine.java private Collection<String> routeDataSources(final TableRule tableRule) { DatabaseShardingStrategy strategy = shardingRule.getDatabaseShardingStrategy(tableRule); List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getDatabaseShardingValuesFromHint(strategy.getShardingColumns()) : getShardingValues(strategy.getShardingColumns()); Collection<String> result = strategy.doStaticSharding(sqlStatement.getType(), tableRule.getActualDatasourceNames(), shardingValues); Preconditions.checkState(!result.isEmpty(), "no database route info"); return result; } private List<ShardingValue<?>> getShardingValues(final Collection<String> shardingColumns) { List<ShardingValue<?>> result = new ArrayList<>(shardingColumns.size()); for (String each : shardingColumns) { Optional<Condition> condition = sqlStatement.getConditions().find(new Column(each, logicTableName)); if (condition.isPresent()) { result.add(condition.get().getShardingValue(parameters)); } } return result; }
#getShardingValues()
咱們看到了《SQL 解析(二)之SQL解析》分享的 Condition 對象。以前咱們提到過Parser 半理解SQL的目的之一是:提煉分片上下文,此處便是該目的的體現。Condition 裏只放明確影響路由的條件,例如:order_id = 1
, order_id IN (1, 2)
, order_id BETWEEN (1, 3)
,不放沒法計算的條件,例如:o.order_id = i.order_id
。該方法裏,使用分片鍵從 Condition 查找 分片值。🙂 是否是對 Condition 的認識更加清晰一丟丟落。// SimpleRoutingEngine.java private Collection<String> routeTables(final TableRule tableRule, final Collection<String> routedDataSources) { TableShardingStrategy strategy = shardingRule.getTableShardingStrategy(tableRule); List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getTableShardingValuesFromHint(strategy.getShardingColumns()) : getShardingValues(strategy.getShardingColumns()); Collection<String> result = tableRule.isDynamic() ? strategy.doDynamicSharding(shardingValues) : strategy.doStaticSharding(sqlStatement.getType(), tableRule.getActualTableNames(routedDataSources), shardingValues); Preconditions.checkState(!result.isEmpty(), "no table route info"); return result; }
dynamic
屬性來判斷調用 #doDynamicSharding()
仍是 #doStaticSharding()
計算分片。// SimpleRoutingEngine.java private RoutingResult generateRoutingResult(final TableRule tableRule, final Collection<String> routedDataSources, final Collection<String> routedTables) { RoutingResult result = new RoutingResult(); for (DataNode each : tableRule.getActualDataNodes(routedDataSources, routedTables)) { result.getTableUnits().getTableUnits().add(new TableUnit(each.getDataSourceName(), logicTableName, each.getTableName())); } return result; } // TableRule.java /** * 根據數據源名稱過濾獲取真實數據單元. * @param targetDataSources 數據源名稱集合 * @param targetTables 真實表名稱集合 * @return 真實數據單元 */ public Collection<DataNode> getActualDataNodes(final Collection<String> targetDataSources, final Collection<String> targetTables) { return dynamic ? getDynamicDataNodes(targetDataSources, targetTables) : getStaticDataNodes(targetDataSources, targetTables); } private Collection<DataNode> getDynamicDataNodes(final Collection<String> targetDataSources, final Collection<String> targetTables) { Collection<DataNode> result = new LinkedHashSet<>(targetDataSources.size() * targetTables.size()); for (String targetDataSource : targetDataSources) { for (String targetTable : targetTables) { result.add(new DataNode(targetDataSource, targetTable)); } } return result; } private Collection<DataNode> getStaticDataNodes(final Collection<String> targetDataSources, final Collection<String> targetTables) { Collection<DataNode> result = new LinkedHashSet<>(actualTables.size()); for (DataNode each : actualTables) { if (targetDataSources.contains(each.getDataSourceName()) && targetTables.contains(each.getTableName())) { result.add(each); } } return result; }
ComplexRoutingEngine,混合多庫表路由引擎。
// ComplexRoutingEngine.java @Override public RoutingResult route() { Collection<RoutingResult> result = new ArrayList<>(logicTables.size()); Collection<String> bindingTableNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); // 計算每一個邏輯表的簡單路由分片 for (String each : logicTables) { Optional<TableRule> tableRule = shardingRule.tryFindTableRule(each); if (tableRule.isPresent()) { if (!bindingTableNames.contains(each)) { result.add(new SimpleRoutingEngine(shardingRule, parameters, tableRule.get().getLogicTable(), sqlStatement).route()); } // 互爲 BindingTable 關係的表加到 bindingTableNames 裏,不重複計算分片 Optional<BindingTableRule> bindingTableRule = shardingRule.findBindingTableRule(each); if (bindingTableRule.isPresent()) { bindingTableNames.addAll(Lists.transform(bindingTableRule.get().getTableRules(), new Function<TableRule, String>() { @Override public String apply(final TableRule input) { return input.getLogicTable(); } })); } } } log.trace("mixed tables sharding result: {}", result); if (result.isEmpty()) { throw new ShardingJdbcException("Cannot find table rule and default data source with logic tables: '%s'", logicTables); } // 防護性編程。shardingRule#isAllBindingTables() 已通過濾了這個狀況。 if (1 == result.size()) { return result.iterator().next(); } // 交給 CartesianRoutingEngine 造成笛卡爾積結果 return new CartesianRoutingEngine(result).route(); }
result.size == 1
,屬於防護性編程。CartesianRoutingEngine,笛卡爾積的庫表路由。
實現邏輯上相對複雜,請保持耐心喲,😈 其實目的就是實現連連看的效果:
x
RoutingResult[1] …… x
RoutingResult[n- 1] x
RoutingResult[n]// CartesianRoutingEngine.java @Override public CartesianRoutingResult route() { CartesianRoutingResult result = new CartesianRoutingResult(); for (Entry<String, Set<String>> entry : getDataSourceLogicTablesMap().entrySet()) { // Entry<數據源(庫), Set<邏輯表>> entry // 得到當前數據源(庫)的 路由表單元分組 List<Set<String>> actualTableGroups = getActualTableGroups(entry.getKey(), entry.getValue()); // List<Set<真實表>> List<Set<TableUnit>> tableUnitGroups = toTableUnitGroups(entry.getKey(), actualTableGroups); // 笛卡爾積,併合並結果 result.merge(entry.getKey(), getCartesianTableReferences(Sets.cartesianProduct(tableUnitGroups))); } log.trace("cartesian tables sharding result: {}", result); return result; }
下面,咱們一塊兒逐步看看代碼實現。
SELECT * FROM t_order o join t_order_item i ON o.order_id = i.order_id
multi_db_multi_table_01 ├── t_order_0 ├── t_order_item_01 └── t_order_1 ├── t_order_item_02 multi_db_multi_table_02 ├── t_order_0 ├── t_order_item_01 └── t_order_1 ├── t_order_item_02
// 第一步 // CartesianRoutingEngine.java /** * 得到同庫對應的邏輯表集合 */ private Map<String, Set<String>> getDataSourceLogicTablesMap() { Collection<String> intersectionDataSources = getIntersectionDataSources(); Map<String, Set<String>> result = new HashMap<>(routingResults.size()); // 得到同庫對應的邏輯表集合 for (RoutingResult each : routingResults) { for (Entry<String, Set<String>> entry : each.getTableUnits().getDataSourceLogicTablesMap(intersectionDataSources).entrySet()) { // 過濾掉不在數據源(庫)交集的邏輯表 if (result.containsKey(entry.getKey())) { result.get(entry.getKey()).addAll(entry.getValue()); } else { result.put(entry.getKey(), entry.getValue()); } } } return result; } /** * 得到全部路由結果裏的數據源(庫)交集 */ private Collection<String> getIntersectionDataSources() { Collection<String> result = new HashSet<>(); for (RoutingResult each : routingResults) { if (result.isEmpty()) { result.addAll(each.getTableUnits().getDataSourceNames()); } result.retainAll(each.getTableUnits().getDataSourceNames()); // 交集 } return result; }
#getDataSourceLogicTablesMap()
返回如圖:// 第二步 // CartesianRoutingEngine.java private List<Set<String>> getActualTableGroups(final String dataSource, final Set<String> logicTables) { List<Set<String>> result = new ArrayList<>(logicTables.size()); for (RoutingResult each : routingResults) { result.addAll(each.getTableUnits().getActualTableNameGroups(dataSource, logicTables)); } return result; } private List<Set<TableUnit>> toTableUnitGroups(final String dataSource, final List<Set<String>> actualTableGroups) { List<Set<TableUnit>> result = new ArrayList<>(actualTableGroups.size()); for (Set<String> each : actualTableGroups) { result.add(new HashSet<>(Lists.transform(new ArrayList<>(each), new Function<String, TableUnit>() { @Override public TableUnit apply(final String input) { return findTableUnit(dataSource, input); } }))); } return result; }
#getActualTableGroups()
返回如圖:#toTableUnitGroups()
返回如圖:// CartesianRoutingEngine.java private List<CartesianTableReference> getCartesianTableReferences(final Set<List<TableUnit>> cartesianTableUnitGroups) { List<CartesianTableReference> result = new ArrayList<>(cartesianTableUnitGroups.size()); for (List<TableUnit> each : cartesianTableUnitGroups) { result.add(new CartesianTableReference(each)); } return result; } // CartesianRoutingResult.java @Getter private final List<CartesianDataSource> routingDataSources = new ArrayList<>(); void merge(final String dataSource, final Collection<CartesianTableReference> routingTableReferences) { for (CartesianTableReference each : routingTableReferences) { merge(dataSource, each); } } private void merge(final String dataSource, final CartesianTableReference routingTableReference) { for (CartesianDataSource each : routingDataSources) { if (each.getDataSource().equalsIgnoreCase(dataSource)) { each.getRoutingTableReferences().add(routingTableReference); return; } } routingDataSources.add(new CartesianDataSource(dataSource, routingTableReference)); }
Sets.cartesianProduct(tableUnitGroups)
返回如圖(Guava 工具庫真強大):#getCartesianTableReferences()
返回如圖:
x
TableUnit[1] …… x
TableUnit[n]。例如圖中:t_order_01 x t_order_item_02
,最終轉換成 SQL 爲 SELECT * FROM t_order_01 o join t_order_item_02 i ON o.order_id = i.order_id
。#merge()
合併笛卡爾積路由結果。CartesianRoutingResult 包含多個 CartesianDataSource,所以須要將 CartesianTableReference 合併(添加)到對應的 CartesianDataSource。固然,目前在實現時已是按照數據源(庫)生成對應的 CartesianTableReference。
// ParsingSQLRouter.java @Override public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) { final Context context = MetricsContext.start("Route SQL"); SQLRouteResult result = new SQLRouteResult(sqlStatement); // 處理 插入SQL 主鍵字段 if (sqlStatement instanceof InsertStatement && null != ((InsertStatement) sqlStatement).getGeneratedKey()) { processGeneratedKey(parameters, (InsertStatement) sqlStatement, result); } // 🐒🐒🐒 路由 🐒🐒🐒 RoutingResult routingResult = route(parameters, sqlStatement); // SQL重寫引擎 SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, sqlStatement); boolean isSingleRouting = routingResult.isSingleRouting(); // 處理分頁 if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) { processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting); } // SQL 重寫 SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting); // 生成 ExecutionUnit if (routingResult instanceof CartesianRoutingResult) { for (CartesianDataSource cartesianDataSource : ((CartesianRoutingResult) routingResult).getRoutingDataSources()) { for (CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()) { result.getExecutionUnits().add(new SQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder))); // 生成 SQL } } } else { for (TableUnit each : routingResult.getTableUnits().getTableUnits()) { result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder))); // 生成 SQL } } MetricsContext.stop(context); // 打印 SQL if (showSQL) { SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters); } return result; }
RoutingResult routingResult = route(parameters, sqlStatement);
調用的就是上文分析的 SimpleRoutingEngine、ComplexRoutingEngine、CartesianRoutingEngine 的 #route()
方法。#processGeneratedKey()
、#processLimit()
、#rewrite()
、#generateSQL()
等會放在《SQL 改寫》 分享。篇幅有些長,但願能讓你們對路由有比較完整的認識。
若是內容有錯誤,煩請您指正,我會認真修改。
若是表述不清晰,不太理解的,歡迎加我微信(wangwenbin-server)一塊兒探討。
謝謝你技術這麼好,還耐心看完了本文。
強制路由 HintManager 講的相對略過,能夠看以下內容進一步瞭解:
厚着臉皮,道友,辛苦分享朋友圈可好?!