數據庫分庫分表中間件 Sharding-JDBC 源碼分析 —— SQL 路由(二)之分庫分表路由

🙂🙂🙂關注微信公衆號:【芋道源碼】有福利:java

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右
  5. 認真的源碼交流微信羣。

本文主要基於 Sharding-JDBC 1.5.0 正式版git


1. 概述

本文分享分表分庫路由相關的實現。涉及內容以下:github

  1. SQL 路由結果
  2. 路由策略 x 算法
  3. SQL 路由器

內容順序如編號。算法

Sharding-JDBC 正在收集使用公司名單:傳送門
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門
sql

SQL 路由大致流程以下:數據庫

2. SQLRouteResult

通過 SQL解析SQL路由後,產生SQL路由結果,即 SQLRouteResult。根據路由結果,生成SQL執行SQL編程

  • sqlStatement :SQL語句對象,通過SQL解析的結果對象。
  • executionUnits :SQL最小執行單元集合。SQL執行時,執行每一個單元。
  • generatedKeys插入SQL語句生成的主鍵編號集合。目前不支持批量插入而使用集合的緣由,猜想是爲了將來支持批量插入作準備。

3. 路由策略 x 算法

ShardingStrategy,分片策略。目前支持兩種分片:api

分片資源:在分庫策略裏指的是庫,在分表策略裏指的是表。微信

【1】 計算靜態分片(經常使用)app

// ShardingStrategy.java
/**
* 計算靜態分片.
* @param sqlType SQL語句的類型
* @param availableTargetNames 全部的可用分片資源集合
* @param shardingValues 分片值集合
* @return 分庫後指向的數據源名稱集合
*/
public Collection<String> doStaticSharding(final SQLType sqlType, final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) {
   Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
   if (shardingValues.isEmpty()) {
       Preconditions.checkState(!isInsertMultiple(sqlType, availableTargetNames), "INSERT statement should contain sharding value."); // 插入不能有多資源對象
       result.addAll(availableTargetNames);
   } else {
       result.addAll(doSharding(shardingValues, availableTargetNames));
   }
   return result;
}
/**
* 插入SQL 是否插入多個分片
* @param sqlType SQL類型
* @param availableTargetNames 全部的可用分片資源集合
* @return 是否
*/
private boolean isInsertMultiple(final SQLType sqlType, final Collection<String> availableTargetNames) {
   return SQLType.INSERT == sqlType && availableTargetNames.size() > 1;
}
  • 插入SQL 須要有片鍵值,不然沒法判斷單個分片資源。(Sharding-JDBC 目前僅支持單條記錄插入)

【2】計算動態分片

// ShardingStrategy.java
/**
* 計算動態分片.
* @param shardingValues 分片值集合
* @return 分庫後指向的分片資源集合
*/
public Collection<String> doDynamicSharding(final Collection<ShardingValue<?>> shardingValues) {
   Preconditions.checkState(!shardingValues.isEmpty(), "Dynamic table should contain sharding value."); // 動態分片必須有分片值
   Collection<String> availableTargetNames = Collections.emptyList();
   Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
   result.addAll(doSharding(shardingValues, availableTargetNames));
   return result;
}
  • 動態分片對應 TableRule.dynamic=true
  • 動態分片必須有分片值

😈 悶了,看起來二者沒啥區別?答案在分片算法上。咱們先看 #doSharding() 方法的實現。

// ShardingStrategy.java
/**
* 計算分片
* @param shardingValues 分片值集合
* @param availableTargetNames 全部的可用分片資源集合
* @return 分庫後指向的分片資源集合
*/
private Collection<String> doSharding(final Collection<ShardingValue<?>> shardingValues, final Collection<String> availableTargetNames) {
   // 無片鍵
   if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) {
       return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next()));
   }
   // 單片鍵
   if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) {
       SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm<?>) shardingAlgorithm;
       ShardingValue shardingValue = shardingValues.iterator().next();
       switch (shardingValue.getType()) {
           case SINGLE:
               return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue));
           case LIST:
               return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue);
           case RANGE:
               return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue);
           default:
               throw new UnsupportedOperationException(shardingValue.getType().getClass().getName());
       }
   }
   // 多片鍵
   if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) {
       return ((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues);
   }
   throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName());
}
  • 無分片鍵算法:對應 NoneKeyShardingAlgorithm 分片算法接口。
public interface NoneKeyShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
    String doSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);
}
  • 單片鍵算法:對應 SingleKeyShardingAlgorithm 分片算法接口。
public interface SingleKeyShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
    String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);
    Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);
    Collection<String> doBetweenSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);
}
ShardingValueType SQL 操做符 接口方法
SINGLE = #doEqualSharding()
LIST IN #doInSharding()
RANGE BETWEEN #doBetweenSharding()
  • 多片鍵算法:對應 MultipleKeysShardingAlgorithm 分片算法接口。
public interface MultipleKeysShardingAlgorithm extends ShardingAlgorithm {
    Collection<String> doSharding(Collection<String> availableTargetNames, Collection<ShardingValue<?>> shardingValues);
}

分片算法類結構以下:

來看看 Sharding-JDBC 實現的無需分庫的分片算法 NoneDatabaseShardingAlgorithm (NoneTableShardingAlgorithm 基本如出一轍):

public final class NoneDatabaseShardingAlgorithm implements SingleKeyDatabaseShardingAlgorithm<String>, MultipleKeysDatabaseShardingAlgorithm { 
    @Override
    public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) {
        return availableTargetNames;
    }
    @Override
    public String doEqualSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) {
        return availableTargetNames.isEmpty() ? null : availableTargetNames.iterator().next();
    }
    @Override
    public Collection<String> doInSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) {
        return availableTargetNames;
    }
    @Override
    public Collection<String> doBetweenSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) {
        return availableTargetNames;
    }
}
  • 必定要注意,NoneXXXXShardingAlgorithm 只適用於無分庫/表的需求,不然會是錯誤的路由結果。例如,#doEqualSharding() 返回的是第一個分片資源。

再來看測試目錄下實現的餘數基偶分表算法 ModuloTableShardingAlgorithm 的實現:

// com.dangdang.ddframe.rdb.integrate.fixture.ModuloTableShardingAlgorithm.java
public final class ModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> {
    @Override
    public String doEqualSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) {
        for (String each : tableNames) {
            if (each.endsWith(shardingValue.getValue() % 2 + "")) {
                return each;
            }
        }
        throw new UnsupportedOperationException();
    }
    @Override
    public Collection<String> doInSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) {
        Collection<String> result = new LinkedHashSet<>(tableNames.size());
        for (Integer value : shardingValue.getValues()) {
            for (String tableName : tableNames) {
                if (tableName.endsWith(value % 2 + "")) {
                    result.add(tableName);
                }
            }
        }
        return result;
    }
    @Override
    public Collection<String> doBetweenSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) {
        Collection<String> result = new LinkedHashSet<>(tableNames.size());
        Range<Integer> range = shardingValue.getValueRange();
        for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {
            for (String each : tableNames) {
                if (each.endsWith(i % 2 + "")) {
                    result.add(each);
                }
            }
        }
        return result;
    }
}

😈 來看看動態計算分片須要怎麼實現分片算法。

// com.dangdang.ddframe.rdb.integrate.fixture.SingleKeyDynamicModuloTableShardingAlgorithm.java
public final class SingleKeyDynamicModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> {
    /**
    * 表前綴
    */
    private final String tablePrefix;
    @Override
    public String doEqualSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {
        return tablePrefix + shardingValue.getValue() % 10;
    }
    @Override
    public Collection<String> doInSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {
        Collection<String> result = new LinkedHashSet<>(shardingValue.getValues().size());
        for (Integer value : shardingValue.getValues()) {
            result.add(tablePrefix + value % 10);
        }
        return result;
    }
    @Override
    public Collection<String> doBetweenSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {
        Collection<String> result = new LinkedHashSet<>(availableTargetNames.size());
        Range<Integer> range = shardingValue.getValueRange();
        for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {
            result.add(tablePrefix + i % 10);
        }
        return result;
    }
}
  • 騷年,是否是明白了一些?動態表無需把真實表配置到 TableRule,而是經過分片算法計算出真實表

4. SQL 路由

SQLRouter,SQL 路由器接口,共有兩種實現:

  • DatabaseHintSQLRouter:經過提示且僅路由至數據庫的SQL路由器
  • ParsingSQLRouter:須要解析的SQL路由器

它們實現 #parse()進行SQL解析#route()進行SQL路由


RoutingEngine,路由引擎接口,共有四種實現:

  • DatabaseHintRoutingEngine:基於數據庫提示的路由引擎
  • SimpleRoutingEngine:簡單路由引擎
  • CartesianRoutingEngine:笛卡爾積的庫表路由
  • ComplexRoutingEngine:混合多庫表路由引擎

ComplexRoutingEngine 根據路由結果會轉化成 SimpleRoutingEngine 或 ComplexRoutingEngine。下文會看相應源碼。


路由結果有兩種:

  • RoutingResult:簡單路由結果
  • CartesianRoutingResult:笛卡爾積路由結果

從圖中,咱們已經能大概看到二者有什麼區別,更具體的下文隨源碼一塊兒分享。

😈 SQLRouteResult 和 RoutingResult 有什麼區別?

  • SQLRouteResult:整個SQL路由返回的路由結果
  • RoutingResult:RoutingEngine返回路由結果


一會兒看到這麼多"對象",可能有點緊張。沒關係張,咱們一塊兒在整理下。

路由器 路由引擎 路由結果
DatabaseHintSQLRouter DatabaseHintRoutingEngine RoutingResult
ParsingSQLRouter SimpleRoutingEngine RoutingResult
ParsingSQLRouter CartesianRoutingEngine CartesianRoutingResult

😈 逗比博主給你們解決了"對象",是否是應該分享朋友圈

5. DatabaseHintSQLRouter

DatabaseHintSQLRouter,基於數據庫提示的路由引擎。路由器工廠 SQLRouterFactory 建立路由器時,判斷到使用數據庫提示( Hint ) 時,建立 DatabaseHintSQLRouter。

// DatabaseHintRoutingEngine.java
public static SQLRouter createSQLRouter(final ShardingContext shardingContext) {
   return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext);
}

先來看下 HintManagerHolder、HintManager 部分相關的代碼:

// HintManagerHolder.java
public final class HintManagerHolder {
    /**
     * HintManager 線程變量
     */
    private static final ThreadLocal<HintManager> HINT_MANAGER_HOLDER = new ThreadLocal<>();
    /**
     * 判斷是否當前只分庫.
     * 
     * @return 是否當前只分庫.
     */
    public static boolean isDatabaseShardingOnly() {
        return null != HINT_MANAGER_HOLDER.get() && HINT_MANAGER_HOLDER.get().isDatabaseShardingOnly();
    }
    /**
     * 清理線索分片管理器的本地線程持有者.
     */
    public static void clear() {
        HINT_MANAGER_HOLDER.remove();
    }
}

// HintManager.java
public final class HintManager implements AutoCloseable {
    /**
     * 庫分片值集合
     */
    private final Map<ShardingKey, ShardingValue<?>> databaseShardingValues = new HashMap<>();
    /**
     * 只作庫分片
     * {@link DatabaseHintRoutingEngine}
     */
    @Getter
    private boolean databaseShardingOnly;
    /**
     * 獲取線索分片管理器實例.
     * 
     * @return 線索分片管理器實例
     */
    public static HintManager getInstance() {
        HintManager result = new HintManager();
        HintManagerHolder.setHintManager(result);
        return result;
    }
    /**
     * 設置分庫分片值.
     * 
     * <p>分片操做符爲等號.該方法適用於只分庫的場景</p>
     * 
     * @param value 分片值
     */
    public void setDatabaseShardingValue(final Comparable<?> value) {
        databaseShardingOnly = true;
        addDatabaseShardingValue(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME, value);
    }
}

那麼若是要使用 DatabaseHintSQLRouter,咱們只須要 HintManager.getInstance().setDatabaseShardingValue(庫分片值) 便可。這裏有兩點要注意下:

  • HintManager#getInstance(),每次獲取到的都是的 HintManager,屢次賦值須要當心。
  • HintManager#close(),使用完須要去清理,避免下個請求讀到遺漏的線程變量。

看看 DatabaseHintSQLRouter 的實現:

// DatabaseHintSQLRouter.java
@Override
public SQLStatement parse(final String logicSQL, final int parametersSize) {
   return new SQLJudgeEngine(logicSQL).judge(); // 只解析 SQL 類型
}  
@Override
// TODO insert的SQL仍然須要解析自增主鍵
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
   Context context = MetricsContext.start("Route SQL");
   SQLRouteResult result = new SQLRouteResult(sqlStatement);
   // 路由
   RoutingResult routingResult = new DatabaseHintRoutingEngine(shardingRule.getDataSourceRule(), shardingRule.getDatabaseShardingStrategy(), sqlStatement.getType())
           .route();
   // SQL最小執行單元
   for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
       result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), logicSQL));
   }
   MetricsContext.stop(context);
   if (showSQL) {
       SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);
   }
   return result;
}
  • #parse() 只解析了 SQL 類型,即 SELECT / UPDATE / DELETE / INSERT 。
  • 使用的分庫策略來自 ShardingRule,不是 TableRule,這個必定要留心。❓由於 SQL 未解析表名。所以,即便在 TableRule 設置了 actualTables 屬性也是沒有效果的。
  • 目前不支持 Sharding-JDBC 的主鍵自增。❓由於 SQL 未解析自增主鍵。從代碼上的TODO應該會支持。
  • HintManager.getInstance().setDatabaseShardingValue(庫分片值) 設置的庫分片值使用的是 EQUALS,於是分庫策略計算出來的只有一個庫分片,即 TableUnit 只有一個,SQLExecutionUnit 只有一個。

看看 DatabaseHintSQLRouter 的實現:

// DatabaseHintRoutingEngine.java
@Override
public RoutingResult route() {
   // 從 Hint 得到 分片鍵值
   Optional<ShardingValue<?>> shardingValue = HintManagerHolder.getDatabaseShardingValue(new ShardingKey(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME));
   Preconditions.checkState(shardingValue.isPresent());
   log.debug("Before database sharding only db:{} sharding values: {}", dataSourceRule.getDataSourceNames(), shardingValue.get());
   // 路由。表分片規則使用的是 ShardingRule 裏的。由於沒 SQL 解析。
   Collection<String> routingDataSources = databaseShardingStrategy.doStaticSharding(sqlType, dataSourceRule.getDataSourceNames(), Collections.<ShardingValue<?>>singleton(shardingValue.get()));
   Preconditions.checkState(!routingDataSources.isEmpty(), "no database route info");
   log.debug("After database sharding only result: {}", routingDataSources);
   // 路由結果
   RoutingResult result = new RoutingResult();
   for (String each : routingDataSources) {
       result.getTableUnits().getTableUnits().add(new TableUnit(each, "", ""));
   }
   return result;
}
  • 調用 databaseShardingStrategy.doStaticSharding() 方法計算分片。
  • new TableUnit(each, "", "")logicTableNameactualTableName 都是空串,相信緣由你已經知道。

6. ParsingSQLRouter

ParsingSQLRouter,須要解析的SQL路由器。

ParsingSQLRouter 使用 SQLParsingEngine 解析SQL。對SQL解析有興趣的同窗能夠看看拙做《Sharding-JDBC 源碼分析 —— SQL 解析》

// ParsingSQLRouter.java
public SQLStatement parse(final String logicSQL, final int parametersSize) {
   SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule);
   Context context = MetricsContext.start("Parse SQL");
   SQLStatement result = parsingEngine.parse();
   if (result instanceof InsertStatement) {
       ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);
   }
   MetricsContext.stop(context);
   return result;
}

ParsingSQLRouter 在路由時,會根據表狀況使用 SimpleRoutingEngine 或 CartesianRoutingEngine 進行路由。

private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
   Collection<String> tableNames = sqlStatement.getTables().getTableNames();
   RoutingEngine routingEngine;
   if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames)) {
       routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
   } else {
       // TODO 可配置是否執行笛卡爾積
       routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
   }
   return routingEngine.route();
}
  • 當只進行一張表或者多表互爲BindingTable關係時,使用 SimpleRoutingEngine 簡單路由引擎。多表互爲BindingTable關係時,每張表的路由結果是相同的,因此只要計算第一張表的分片便可。
  • tableNames.iterator().next() 注意下,tableNames 變量是 new TreeMap<>(String.CASE_INSENSITIVE_ORDER)。因此 SELECT * FROM t_order o join t_order_item i ON o.order_id = i.order_id 即便 t_order_item 排在 t_order 前面,tableNames.iterator().next() 返回的是 t_order。當 t_ordert_order_itemBindingTable關係 時,計算的是 t_order 路由分片。
  • BindingTable關係在 ShardingRule 的 tableRules 配置。配置該關係 TableRule 有以下須要遵照的規則:
    • 分片策略與算法相同
    • 數據源配置對象相同
    • 真實表數量相同

舉個例子

  • SQL :SELECT * FROM t_order o join t_order_item i ON o.order_id = i.order_id
  • 分庫分表狀況:
multi_db_multi_table_01
  ├── t_order_0                        ├── t_order_item_01
  └── t_order_1                        ├── t_order_item_02
                                       ├── t_order_item_03
                                       ├── t_order_item_04
multi_db_multi_table_02
  ├── t_order_0                        ├── t_order_item_01
  └── t_order_1                        ├── t_order_item_02
                                       ├── t_order_item_03
                                       ├── t_order_item_04

最終執行的SQL以下:

SELECT * FROM t_order_item_01 i JOIN t_order_01 o ON o.order_id = i.order_id 
SELECT * FROM t_order_item_01 i JOIN t_order_01 o ON o.order_id = i.order_id 
SELECT * FROM t_order_item_02 i JOIN t_order_02 o ON o.order_id = i.order_id
SELECT * FROM t_order_item_02 i JOIN t_order_02 o ON o.order_id = i.order_id
  • t_order_item_03t_order_item_04 沒法被查詢到。

下面咱們看看 #isAllBindingTables() 如何實現多表互爲BindingTable關係

// ShardingRule.java
// 調用順序 #isAllBindingTables()=>#filterAllBindingTables()=>#findBindingTableRule()=>#findBindingTableRule()
/**
* 判斷邏輯表名稱集合是否所有屬於Binding表.
* @param logicTables 邏輯表名稱集合
*/
public boolean isAllBindingTables(final Collection<String> logicTables) {
   Collection<String> bindingTables = filterAllBindingTables(logicTables);
   return !bindingTables.isEmpty() && bindingTables.containsAll(logicTables);
}
/**
* 過濾出全部的Binding表名稱.
*/
public Collection<String> filterAllBindingTables(final Collection<String> logicTables) {
   if (logicTables.isEmpty()) {
       return Collections.emptyList();
   }
   Optional<BindingTableRule> bindingTableRule = findBindingTableRule(logicTables);
   if (!bindingTableRule.isPresent()) {
       return Collections.emptyList();
   }
   // 交集
   Collection<String> result = new ArrayList<>(bindingTableRule.get().getAllLogicTables());
   result.retainAll(logicTables);
   return result;
}
/**
* 得到包含<strong>任一</strong>在邏輯表名稱集合的binding表配置的邏輯表名稱集合
*/
private Optional<BindingTableRule> findBindingTableRule(final Collection<String> logicTables) {
   for (String each : logicTables) {
       Optional<BindingTableRule> result = findBindingTableRule(each);
       if (result.isPresent()) {
           return result;
       }
   }
   return Optional.absent();
}
/**
* 根據邏輯表名稱獲取binding表配置的邏輯表名稱集合.
*/
public Optional<BindingTableRule> findBindingTableRule(final String logicTable) {
   for (BindingTableRule each : bindingTableRules) {
       if (each.hasLogicTable(logicTable)) {
           return Optional.of(each);
       }
   }
   return Optional.absent();
}
  • 邏輯看起來比較長,目的是找到一條 BindingTableRule 包含全部邏輯表集合
  • 不支持《傳遞關係》:配置 BindingTableRule 時,相同綁定關係必定要配置在一條,必須是 [a, b, c],而不能是 [a, b], [b, c]

6.1 SimpleRoutingEngine

SimpleRoutingEngine,簡單路由引擎。

// SimpleRoutingEngine.java
private Collection<String> routeDataSources(final TableRule tableRule) {
   DatabaseShardingStrategy strategy = shardingRule.getDatabaseShardingStrategy(tableRule);
   List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getDatabaseShardingValuesFromHint(strategy.getShardingColumns())
           : getShardingValues(strategy.getShardingColumns());
   Collection<String> result = strategy.doStaticSharding(sqlStatement.getType(), tableRule.getActualDatasourceNames(), shardingValues);
   Preconditions.checkState(!result.isEmpty(), "no database route info");
   return result;
}
private List<ShardingValue<?>> getShardingValues(final Collection<String> shardingColumns) {
   List<ShardingValue<?>> result = new ArrayList<>(shardingColumns.size());
   for (String each : shardingColumns) {
       Optional<Condition> condition = sqlStatement.getConditions().find(new Column(each, logicTableName));
       if (condition.isPresent()) {
           result.add(condition.get().getShardingValue(parameters));
       }
   }
   return result;
}
  • 可使用 HintManager 設置分片值進行強制路由
  • #getShardingValues() 咱們看到了《SQL 解析(二)之SQL解析》分享的 Condition 對象。以前咱們提到過Parser 半理解SQL的目的之一是:提煉分片上下文,此處便是該目的的體現。Condition 裏只放明確影響路由的條件,例如:order_id = 1, order_id IN (1, 2), order_id BETWEEN (1, 3),不放沒法計算的條件,例如:o.order_id = i.order_id。該方法裏,使用分片鍵從 Condition 查找 分片值。🙂 是否是對 Condition 的認識更加清晰一丟丟落。
// SimpleRoutingEngine.java
private Collection<String> routeTables(final TableRule tableRule, final Collection<String> routedDataSources) {
   TableShardingStrategy strategy = shardingRule.getTableShardingStrategy(tableRule);
   List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getTableShardingValuesFromHint(strategy.getShardingColumns())
           : getShardingValues(strategy.getShardingColumns());
   Collection<String> result = tableRule.isDynamic() ? strategy.doDynamicSharding(shardingValues)
           : strategy.doStaticSharding(sqlStatement.getType(), tableRule.getActualTableNames(routedDataSources), shardingValues);
   Preconditions.checkState(!result.isEmpty(), "no table route info");
   return result;
}
  • 可使用 HintManager 設置分片值進行強制路由
  • 根據 dynamic 屬性來判斷調用 #doDynamicSharding() 仍是 #doStaticSharding() 計算分片。
// SimpleRoutingEngine.java
private RoutingResult generateRoutingResult(final TableRule tableRule, final Collection<String> routedDataSources, final Collection<String> routedTables) {
   RoutingResult result = new RoutingResult();
   for (DataNode each : tableRule.getActualDataNodes(routedDataSources, routedTables)) {
       result.getTableUnits().getTableUnits().add(new TableUnit(each.getDataSourceName(), logicTableName, each.getTableName()));
   }
   return result;
}

// TableRule.java
/**
* 根據數據源名稱過濾獲取真實數據單元.
* @param targetDataSources 數據源名稱集合
* @param targetTables 真實表名稱集合
* @return 真實數據單元
*/
public Collection<DataNode> getActualDataNodes(final Collection<String> targetDataSources, final Collection<String> targetTables) {
   return dynamic ? getDynamicDataNodes(targetDataSources, targetTables) : getStaticDataNodes(targetDataSources, targetTables);
}  
private Collection<DataNode> getDynamicDataNodes(final Collection<String> targetDataSources, final Collection<String> targetTables) {
   Collection<DataNode> result = new LinkedHashSet<>(targetDataSources.size() * targetTables.size());
   for (String targetDataSource : targetDataSources) {
       for (String targetTable : targetTables) {
           result.add(new DataNode(targetDataSource, targetTable));
       }
   }
   return result;
} 
private Collection<DataNode> getStaticDataNodes(final Collection<String> targetDataSources, final Collection<String> targetTables) {
   Collection<DataNode> result = new LinkedHashSet<>(actualTables.size());
   for (DataNode each : actualTables) {
       if (targetDataSources.contains(each.getDataSourceName()) && targetTables.contains(each.getTableName())) {
           result.add(each);
       }
   }
   return result;
}
  • 在 SimpleRoutingEngine 只生成了當前表的 TableUnits。若是存在與其互爲BindingTable關係的表的 TableUnits 怎麼得到?你能夠想一想噢,固然在後文《SQL 改寫》也會給出答案,看看和你想的是否同樣。

6.2 ComplexRoutingEngine

ComplexRoutingEngine,混合多庫表路由引擎。

// ComplexRoutingEngine.java
@Override
public RoutingResult route() {
   Collection<RoutingResult> result = new ArrayList<>(logicTables.size());
   Collection<String> bindingTableNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
   // 計算每一個邏輯表的簡單路由分片
   for (String each : logicTables) {
       Optional<TableRule> tableRule = shardingRule.tryFindTableRule(each);
       if (tableRule.isPresent()) {
           if (!bindingTableNames.contains(each)) {
               result.add(new SimpleRoutingEngine(shardingRule, parameters, tableRule.get().getLogicTable(), sqlStatement).route());
           }
           // 互爲 BindingTable 關係的表加到 bindingTableNames 裏,不重複計算分片
           Optional<BindingTableRule> bindingTableRule = shardingRule.findBindingTableRule(each);
           if (bindingTableRule.isPresent()) {
               bindingTableNames.addAll(Lists.transform(bindingTableRule.get().getTableRules(), new Function<TableRule, String>() {
                   
                   @Override
                   public String apply(final TableRule input) {
                       return input.getLogicTable();
                   }
               }));
           }
       }
   }
   log.trace("mixed tables sharding result: {}", result);
   if (result.isEmpty()) {
       throw new ShardingJdbcException("Cannot find table rule and default data source with logic tables: '%s'", logicTables);
   }
   // 防護性編程。shardingRule#isAllBindingTables() 已通過濾了這個狀況。
   if (1 == result.size()) {
       return result.iterator().next();
   }
   // 交給 CartesianRoutingEngine 造成笛卡爾積結果
   return new CartesianRoutingEngine(result).route();
}
  • ComplexRoutingEngine 計算每一個邏輯表的簡單路由分片,路由結果交給 CartesianRoutingEngine 繼續路由造成笛卡爾積結果。

  • 因爲目前 ComplexRoutingEngine 路由前已經判斷所有表互爲 BindingTable 關係,於是不會出現 result.size == 1,屬於防護性編程。
  • 部分表互爲 BindingTable 關係時,ComplexRoutingEngine 不重複計算分片。

6.3 CartesianRoutingEngine

CartesianRoutingEngine,笛卡爾積的庫表路由。

實現邏輯上相對複雜,請保持耐心喲,😈 其實目的就是實現連連看的效果:

  • RoutingResult[0] x RoutingResult[1] …… x RoutingResult[n- 1] x RoutingResult[n]
  • 同庫 才能夠進行笛卡爾積
// CartesianRoutingEngine.java
@Override
public CartesianRoutingResult route() {
   CartesianRoutingResult result = new CartesianRoutingResult();
   for (Entry<String, Set<String>> entry : getDataSourceLogicTablesMap().entrySet()) { // Entry<數據源(庫), Set<邏輯表>> entry
       // 得到當前數據源(庫)的 路由表單元分組
       List<Set<String>> actualTableGroups = getActualTableGroups(entry.getKey(), entry.getValue()); // List<Set<真實表>>
       List<Set<TableUnit>> tableUnitGroups = toTableUnitGroups(entry.getKey(), actualTableGroups);
       // 笛卡爾積,併合並結果
       result.merge(entry.getKey(), getCartesianTableReferences(Sets.cartesianProduct(tableUnitGroups)));
   }
   log.trace("cartesian tables sharding result: {}", result);
   return result;
}
  • 第一步,得到同庫對應的邏輯表集合,即 Entry<數據源(庫), Set <邏輯表> > entry
  • 第二步,遍歷數據源(庫),得到當前數據源(庫)路由表單元分組
  • 第三步,對路由表單元分組進行笛卡爾積,併合併到路由結果。

下面,咱們一塊兒逐步看看代碼實現。

  • SQL :SELECT * FROM t_order o join t_order_item i ON o.order_id = i.order_id
  • 分庫分表狀況:
multi_db_multi_table_01
  ├── t_order_0                        ├── t_order_item_01
  └── t_order_1                        ├── t_order_item_02
multi_db_multi_table_02
  ├── t_order_0                        ├── t_order_item_01
  └── t_order_1                        ├── t_order_item_02
// 第一步
// CartesianRoutingEngine.java
/**
* 得到同庫對應的邏輯表集合
*/
private Map<String, Set<String>> getDataSourceLogicTablesMap() {
   Collection<String> intersectionDataSources = getIntersectionDataSources();
   Map<String, Set<String>> result = new HashMap<>(routingResults.size());
   // 得到同庫對應的邏輯表集合
   for (RoutingResult each : routingResults) {
       for (Entry<String, Set<String>> entry : each.getTableUnits().getDataSourceLogicTablesMap(intersectionDataSources).entrySet()) { // 過濾掉不在數據源(庫)交集的邏輯表
           if (result.containsKey(entry.getKey())) {
               result.get(entry.getKey()).addAll(entry.getValue());
           } else {
               result.put(entry.getKey(), entry.getValue());
           }
       }
   }
   return result;
}
/**
* 得到全部路由結果裏的數據源(庫)交集
*/
private Collection<String> getIntersectionDataSources() {
   Collection<String> result = new HashSet<>();
   for (RoutingResult each : routingResults) {
       if (result.isEmpty()) {
           result.addAll(each.getTableUnits().getDataSourceNames());
       }
       result.retainAll(each.getTableUnits().getDataSourceNames()); // 交集
   }
   return result;
}
  • #getDataSourceLogicTablesMap() 返回如圖:

// 第二步
// CartesianRoutingEngine.java
private List<Set<String>> getActualTableGroups(final String dataSource, final Set<String> logicTables) {
   List<Set<String>> result = new ArrayList<>(logicTables.size());
   for (RoutingResult each : routingResults) {
       result.addAll(each.getTableUnits().getActualTableNameGroups(dataSource, logicTables));
   }
   return result;
}
private List<Set<TableUnit>> toTableUnitGroups(final String dataSource, final List<Set<String>> actualTableGroups) {
   List<Set<TableUnit>> result = new ArrayList<>(actualTableGroups.size());
   for (Set<String> each : actualTableGroups) {
       result.add(new HashSet<>(Lists.transform(new ArrayList<>(each), new Function<String, TableUnit>() {
    
           @Override
           public TableUnit apply(final String input) {
               return findTableUnit(dataSource, input);
           }
       })));
   }
   return result;
}
  • #getActualTableGroups() 返回如圖:
  • #toTableUnitGroups() 返回如圖:

// CartesianRoutingEngine.java
private List<CartesianTableReference> getCartesianTableReferences(final Set<List<TableUnit>> cartesianTableUnitGroups) {
   List<CartesianTableReference> result = new ArrayList<>(cartesianTableUnitGroups.size());
   for (List<TableUnit> each : cartesianTableUnitGroups) {
       result.add(new CartesianTableReference(each));
   }
   return result;
}

// CartesianRoutingResult.java
@Getter
private final List<CartesianDataSource> routingDataSources = new ArrayList<>();
void merge(final String dataSource, final Collection<CartesianTableReference> routingTableReferences) {
   for (CartesianTableReference each : routingTableReferences) {
       merge(dataSource, each);
   }
}
private void merge(final String dataSource, final CartesianTableReference routingTableReference) {
   for (CartesianDataSource each : routingDataSources) {
       if (each.getDataSource().equalsIgnoreCase(dataSource)) {
           each.getRoutingTableReferences().add(routingTableReference);
           return;
       }
   }
   routingDataSources.add(new CartesianDataSource(dataSource, routingTableReference));
}
  • Sets.cartesianProduct(tableUnitGroups) 返回如圖(Guava 工具庫真強大):
  • #getCartesianTableReferences() 返回如圖:

    CartesianTableReference,笛卡爾積表路由組,包含多條 TableUnit,即 TableUnit[0] x TableUnit[1] …… x TableUnit[n]。例如圖中:t_order_01 x t_order_item_02,最終轉換成 SQL 爲 SELECT * FROM t_order_01 o join t_order_item_02 i ON o.order_id = i.order_id
  • #merge() 合併笛卡爾積路由結果。CartesianRoutingResult 包含多個 CartesianDataSource,所以須要將 CartesianTableReference 合併(添加)到對應的 CartesianDataSource。固然,目前在實現時已是按照數據源(庫)生成對應的 CartesianTableReference。

6.4 ParsingSQLRouter 主#route()

// ParsingSQLRouter.java
@Override
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
   final Context context = MetricsContext.start("Route SQL");
   SQLRouteResult result = new SQLRouteResult(sqlStatement);
   // 處理 插入SQL 主鍵字段
   if (sqlStatement instanceof InsertStatement && null != ((InsertStatement) sqlStatement).getGeneratedKey()) {
       processGeneratedKey(parameters, (InsertStatement) sqlStatement, result);
   }
   // 🐒🐒🐒 路由 🐒🐒🐒
   RoutingResult routingResult = route(parameters, sqlStatement);
   // SQL重寫引擎
   SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, sqlStatement);
   boolean isSingleRouting = routingResult.isSingleRouting();
   // 處理分頁
   if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
       processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting);
   }
   // SQL 重寫
   SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
   // 生成 ExecutionUnit
   if (routingResult instanceof CartesianRoutingResult) {
       for (CartesianDataSource cartesianDataSource : ((CartesianRoutingResult) routingResult).getRoutingDataSources()) {
           for (CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()) {
               result.getExecutionUnits().add(new SQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder))); // 生成 SQL
           }
       }
   } else {
       for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
           result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder))); // 生成 SQL
       }
   }
   MetricsContext.stop(context);
   // 打印 SQL
   if (showSQL) {
       SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);
   }
   return result;
}
  • RoutingResult routingResult = route(parameters, sqlStatement); 調用的就是上文分析的 SimpleRoutingEngine、ComplexRoutingEngine、CartesianRoutingEngine 的 #route() 方法。
  • #processGeneratedKey()#processLimit()#rewrite()#generateSQL() 等會放在《SQL 改寫》 分享。

666. 彩蛋

篇幅有些長,但願能讓你們對路由有比較完整的認識。
若是內容有錯誤,煩請您指正,我會認真修改。
若是表述不清晰,不太理解的,歡迎加我微信(wangwenbin-server)一塊兒探討。

謝謝你技術這麼好,還耐心看完了本文。

強制路由 HintManager 講的相對略過,能夠看以下內容進一步瞭解:

  1. 《官方文檔-強制路由》
  2. HintManager.java 源碼

厚着臉皮,道友,辛苦分享朋友圈可好?!

相關文章
相關標籤/搜索