sharding-jdbc源碼解析之sql單表或綁定表路由

說在前面sql

本文轉自「天河聊技術」微信公衆號數據庫

 

sql路由這裏的內容比較多,包含單表路由或者綁定表路由、多庫多表路由、笛卡爾積路由,分三部分來介紹,今天先介紹單表或綁定表路由。微信

 

sql路由源碼解析app

 

com.dangdang.ddframe.rdb.sharding.routing.PreparedStatementRoutingEngine、com.dangdang.ddframe.rdb.sharding.routing.StatementRoutingEngine兩個sql路由引擎類,預編譯的用的比較多,咱們以預編譯的Statement的引擎類來跟蹤下sharding-jdbc是對sql怎麼進行路由的。ide

 

上層sql執行器接收到邏輯sql後再進行sql路由的時候會建立預編譯statement對象的路由器,所以會調用其構造器性能

/**
 * 預解析的SQL路由器.
 * 
 * @author zhangliang
 */
public final class PreparedStatementRoutingEngine {

//    邏輯sql
    private final String logicSQL;

//sql路由器
    private final SQLRouter sqlRouter;

//    sql語句對象
    private SQLStatement sqlStatement;
    
    public PreparedStatementRoutingEngine(final String logicSQL, final ShardingContext shardingContext) {
        this.logicSQL = logicSQL;
        sqlRouter = SQLRouterFactory.createSQLRouter(shardingContext);
    }
/**
 * 路由引擎工廠.
 * 
 * @author zhangiang
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class SQLRouterFactory {
    
    /**
     * 建立SQL路由器.
     * 
     * @param shardingContext 數據源運行期上下文
     * @return SQL路由器
     */
//    這裏是靜態工廠方法實現
    public static SQLRouter createSQLRouter(final ShardingContext shardingContext) {
        return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext);
    }
}

接下來會建立ParsingSQLRouter對象ui

**
 * 須要解析的SQL路由器.
 * 
 * @author zhangiang
 */
public final class ParsingSQLRouter implements SQLRouter {

//    分庫分表配置對象
    private final ShardingRule shardingRule;

//    支持的數據庫類型
    private final DatabaseType databaseType;

//    是否要展現sql
    private final boolean showSQL;
    
    private final List<Number> generatedKeys;

//    上面這些屬性值都是存儲在分片上下文中
    public ParsingSQLRouter(final ShardingContext shardingContext) {
        shardingRule = shardingContext.getShardingRule();
        databaseType = shardingContext.getDatabaseType();
        showSQL = shardingContext.isShowSQL();
        generatedKeys = new LinkedList<>();
    }

這個方法是sql路由的入口方法this

/**
 * SQL路由.
 * 當第一次路由時進行SQL解析,以後的路由複用第一次的解析結果.
 * 
 * @param parameters SQL中的參數
 * @return 路由結果
 */
public SQLRouteResult route(final List<Object> parameters) {//sql路由業務方法
    if (null == sqlStatement) {
        sqlStatement = sqlRouter.parse(logicSQL, parameters.size());
    }
    return sqlRouter.route(logicSQL, parameters, sqlStatement);
}

進入到這個parse方法對象

sqlStatement = sqlRouter.parse(logicSQL, parameters.size());
@Override
    public SQLStatement parse(final String logicSQL, final int parametersSize) {
//        建立sql解析引擎
        SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule);
//        開啓度量上下文
        Context context = MetricsContext.start("Parse SQL");
        
//        sql解析器解析得到sql語句對象
        SQLStatement result = parsingEngine.parse();
        if (result instanceof InsertStatement) {
            ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);
        }
        MetricsContext.stop(context);
        return result;
    }

進入下面的sql路由方法,返回路由結果ip

return sqlRouter.route(logicSQL, parameters, sqlStatement);
private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
        Collection<String> tableNames = sqlStatement.getTables().getTableNames();
        RoutingEngine routingEngine;
//        若是表集合是1,或者是綁定表路由就走簡單路由規則
        if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames)) {
            routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);//單表路由
        } else {
            // TODO 可配置是否執行笛卡爾積
            routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
        }
        return routingEngine.route();//tianhe TODO 笛卡爾積
    }
建立簡單路由引擎
routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);//單表路由
/**
 * 簡單路由引擎.
 * 
 * @author zhangliang
 */
@RequiredArgsConstructor
public final class SimpleRoutingEngine implements RoutingEngine {
//    分庫分表配置對象
    private final ShardingRule shardingRule;
//    sql參數
    private final List<Object> parameters;

//    邏輯表名
    private final String logicTableName;

//    sql語句對象
    private final SQLStatement sqlStatement;

不是單表路由,就走多庫多表路由引擎,建立多庫多表路由對象

routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
/**
 * 混合多庫表路由引擎.
 * 
 * @author gaohongtao
 * @author zhangliang
 */
@RequiredArgsConstructor
@Slf4j
public final class ComplexRoutingEngine implements RoutingEngine {
    
    private final ShardingRule shardingRule;
    
    private final List<Object> parameters;
    
    private final Collection<String> logicTables;
    
    private final SQLStatement sqlStatement;
    
return routingEngine.route();//tianhe TODO 笛卡爾積

這裏是路由邏輯,這裏有三種實現,一種是單表或者綁定表路由,一種是多庫多表路由,一種是笛卡爾積路由

 

單表或者綁定表路由

@Override
    public RoutingResult route() {
//        根據邏輯表名得到表規則配置對象
        TableRule tableRule = shardingRule.getTableRule(logicTableName);
//        根據表規則配置對象得到數據源集合
        Collection<String> routedDataSources = routeDataSources(tableRule);
        Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size());
        for (String each : routedDataSources) {
            routedMap.put(each, routeTables(tableRule, each));
        }
        return generateRoutingResult(tableRule, routedMap);
    }
  根據邏輯表名得到表規則配置對象
        TableRule tableRule = shardingRule.getTableRule(logicTableName);
/**
     * 根據邏輯表名稱查找分片規則.
     *
     * @param logicTableName 邏輯表名稱
     * @return 該邏輯表的分片規則
     */
    public TableRule getTableRule(final String logicTableName) {
//        根據邏輯表返回表規則配置對象
        Optional<TableRule> tableRule = tryFindTableRule(logicTableName);
        if (tableRule.isPresent()) {
            return tableRule.get();
        }
//        若是默認數據源不爲空就根據默認數據源建立表配置規則對象
        if (dataSourceRule.getDefaultDataSource().isPresent()) {
            return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule);
        }
        throw new ShardingJdbcException("Cannot find table rule and default data source with logic table: '%s'", logicTableName);
    }
//        若是默認數據源不爲空就根據默認數據源建立表配置規則對象
        if (dataSourceRule.getDefaultDataSource().isPresent()) {
            return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule);
        }
//    根據默認數據源建立部分庫數據分片策略,數據表不分表分片策略對象,並建立表配置規則對象進行裝載
    private TableRule createTableRuleWithDefaultDataSource(final String logicTableName, final DataSourceRule defaultDataSourceRule) {
        Map<String, DataSource> defaultDataSourceMap = new HashMap<>(1);
        defaultDataSourceMap.put(defaultDataSourceRule.getDefaultDataSourceName(), defaultDataSourceRule.getDefaultDataSource().get());
        return TableRule.builder(logicTableName)
                .dataSourceRule(new DataSourceRule(defaultDataSourceMap))
                .databaseShardingStrategy(new DatabaseShardingStrategy("", new NoneDatabaseShardingAlgorithm()))
                .tableShardingStrategy(new TableShardingStrategy("", new NoneTableShardingAlgorithm())).build();
    }

返回到這裏

@Override
    public RoutingResult route() {
//        根據邏輯表名得到表規則配置對象
        TableRule tableRule = shardingRule.getTableRule(logicTableName);
//        根據表規則配置對象得到數據源集合
        Collection<String> routedDataSources = routeDataSources(tableRule);
        Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size());
        for (String each : routedDataSources) {
            routedMap.put(each, routeTables(tableRule, each));
        }
        return generateRoutingResult(tableRule, routedMap);
    }
//        根據表規則配置對象得到數據源集合
        Collection<String> routedDataSources = routeDataSources(tableRule);

根據分片列獲取分片值

getShardingValues(strategy.getShardingColumns());
//        根據真實的數據源名稱和分片值計算靜態分片
        Collection<String> result = strategy.doStaticSharding(tableRule.getActualDatasourceNames(), shardingValues);
/**
     * 計算靜態分片.
     *
     * @param availableTargetNames 全部的可用分片資源集合
     * @param shardingValues 分片值集合
     * @return 分庫後指向的數據源名稱集合
     */
    public Collection<String> doStaticSharding(final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) {
        Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
//        若是沒有解析到傳入的數據源分片值,要走全庫路由
        if (shardingValues.isEmpty()) {
            result.addAll(availableTargetNames);
        } else {
//            若是傳入分片值,根據分片值去獲取具體的數據源
            result.addAll(doSharding(shardingValues, availableTargetNames));
        }
        return result;
    }

注意上面的數據庫路由的默認實現,若是不傳入數據庫分片值會走全庫路由的,數據量大的話是會影響性能的,因此建議必需要傳入分片值,阿里的TDDL這裏的實現是直接報錯的。

 

//            若是傳入分片值,根據分片值去獲取具體的數據源
            result.addAll(doSharding(shardingValues, availableTargetNames));
private Collection<String> doSharding(final Collection<ShardingValue<?>> shardingValues, final Collection<String> availableTargetNames) {
//        若是沒分片
        if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) {
            return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next()));
        }
//        若是按一個分片值分片
        if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) {
            SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm<?>) shardingAlgorithm;
            ShardingValue shardingValue = shardingValues.iterator().next();
            switch (shardingValue.getType()) {
                case SINGLE:
//                    = 元算符分片
                    return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue));
                case LIST:
//                    in運算符分片
                    return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue);
                case RANGE:
//                    between運算符分片
                    return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue);
                default:
//                    如今只支持這三種運算符分片
                    throw new UnsupportedOperationException(shardingValue.getType().getClass().getName());
            }
        }
//        若是是多個分片值
        if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) {
            return ((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues);
        }
//        其餘方式的分片不支持
        throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName());
    }

返回到這裏

@Override
    public RoutingResult route() {
//        根據邏輯表名得到表規則配置對象
        TableRule tableRule = shardingRule.getTableRule(logicTableName);
//        根據表規則配置對象得到數據源集合
        Collection<String> routedDataSources = routeDataSources(tableRule);
        Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size());
        for (String each : routedDataSources) {
            routedMap.put(each, routeTables(tableRule, each));
        }
        return generateRoutingResult(tableRule, routedMap);
    }

根據數據源和表配置規則組裝路由map

routedMap.put(each, routeTables(tableRule, each));

 

下面這個方法是獲取路由的表的集合

private Collection<String> routeTables(final TableRule tableRule, final String routedDataSource) {
//        獲取表分片策略
        TableShardingStrategy strategy = shardingRule.getTableShardingStrategy(tableRule);
//        獲取分片值
        List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getTableShardingValuesFromHint(strategy.getShardingColumns())
                : getShardingValues(strategy.getShardingColumns());//doDynamicSharding
//        若是是動態分片走動態分片,若是是靜態分片走靜態分片
        Collection<String> result = tableRule.isDynamic() ? strategy.doDynamicSharding(shardingValues) : strategy.doStaticSharding(tableRule.getActualTableNames(routedDataSource), shardingValues);
        Preconditions.checkState(!result.isEmpty(), "no table route info");
        return result;
    }
/**
 * 計算動態分片.
 *
 * @param shardingValues 分片值集合
 * @return 分庫後指向的分片資源集合
 */
public Collection<String> doDynamicSharding(final Collection<ShardingValue<?>> shardingValues) {//doDynamicSharding
    Preconditions.checkState(!shardingValues.isEmpty(), "Dynamic table should contain sharding value.");
    Collection<String> availableTargetNames = Collections.emptyList();
    Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
    result.addAll(doSharding(shardingValues, availableTargetNames));
    return result;
}

返回到這裏

@Override
    public RoutingResult route() {
//        根據邏輯表名得到表規則配置對象
        TableRule tableRule = shardingRule.getTableRule(logicTableName);
//        根據表規則配置對象得到數據源集合
        Collection<String> routedDataSources = routeDataSources(tableRule);
        Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size());
        for (String each : routedDataSources) {
            routedMap.put(each, routeTables(tableRule, each));
        }
        return generateRoutingResult(tableRule, routedMap);
    }
//        生成路由結果
        return generateRoutingResult(tableRule, routedMap);
private RoutingResult generateRoutingResult(final TableRule tableRule, final Map<String, Collection<String>> routedMap) {
        RoutingResult result = new RoutingResult();
//        遍歷roadMap,roadMap裏面key值存儲的是數據源名稱,value值是物理數據表集合
        for (Entry<String, Collection<String>> entry : routedMap.entrySet()) {
//            獲取最下數據單元,每一個數據單元是一個DataNode
            Collection<DataNode> dataNodes = tableRule.getActualDataNodes(entry.getKey(), entry.getValue());
            for (DataNode each : dataNodes) {
//                組裝數據表單元裝載到路由結果中
                result.getTableUnits().getTableUnits().add(new TableUnit(each.getDataSourceName(), logicTableName, each.getTableName()));
            }
        }
        return result;
    }

 

 

數據模型

 

/**
 * SQL路由結果.
 * 
 * @author gaohongtao
 * @author zhangliang
 */
@RequiredArgsConstructor
@Getter
public final class SQLRouteResult {
    
//    sql語句對象
    private final SQLStatement sqlStatement;
    
//    最小sql執行單元集合
    private final Set<SQLExecutionUnit> executionUnits = new LinkedHashSet<>();
    
    private final List<Number> generatedKeys = new LinkedList<>();
}
/**
 * SQL最小執行單元.
 * 
 * @author gaohongtao
 */
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class SQLExecutionUnit {
    
//    具體的數據源
    private final String dataSource;
    
//    具體要執行的物理sql語句
    private final String sql;
}
/**
 * 路由表單元.
 * 
 * @author zhangliang
 */
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class TableUnit {
    
//    數據源名
    private final String dataSourceName;
    
//    邏輯表名
    private final String logicTableName;
    
//    物理表名
    private final String actualTableName;
}
/**
 * 路由表單元集合.
 * 
 * @author zhangliang
 */
@Getter
@ToString
public final class TableUnits {
    
//    路由表單元集合
    private final List<TableUnit> tableUnits = new LinkedList<>();
/**
 *  路由結果.
 * 
 * @author zhangliang
 */
@Getter
public class RoutingResult {
    
//    表路由單元集合
    private final TableUnits tableUnits = new TableUnits();
    
/**
 * 路由表單元.
 * 
 * @author zhangliang
 */
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class TableUnit {

//    數據源名
    private final String dataSourceName;

//    邏輯表名
    private final String logicTableName;

//    物理表名
    private final String actualTableName;
}
/**
 * 分庫分表數據單元.
 * 
 * @author zhangliang
 */
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public class DataNode {
    
    private static final String DELIMITER = ".";
    
    private final String dataSourceName;//數據庫名
    
    private final String tableName;//表名

 

 

說到最後

 

以上介紹,僅供參考。

相關文章
相關標籤/搜索