面試官問我:看過sharding-jdbc的源碼嗎?我吧啦吧啦說了一通!!

寫在前面

在產品初期快速迭代的過程當中,每每爲了快速上線而佔據市場,在後端開發的過程當中每每不會過多的考慮分佈式和微服務,每每會將後端服務作成一個單體應用,而數據庫也是同樣,最初會把全部的業務數據都放到一個數據庫中,即所謂的單實例數據庫。隨着業務的迅速發展,將全部數據都放在一個數據庫中已經不足以支撐業務發展的須要。此時,就會對系統進行分佈式改造,而數據庫業務進行分庫分表的拆分。那麼,問題來了,如何更好的訪問和管理拆分後的數據庫呢?業界已經有不少成熟的解決方案,其中,一個很是優秀的解決方案就是:Apache ShardingSphere。今天,咱們就從源碼級別來共同探討下sharding-jdbc的核心源碼。java

sharding-jdbc經典用法

Sharding-Jdbc 是一個輕量級的分庫分表框架,使用時最關鍵的是配製分庫分表策略,其他的和使用普通的 MySQL 驅動同樣,幾乎不用改代碼。例以下面的代碼片斷。mysql

try(DataSource dataSource =  ShardingDataSourceFactory.createDataSource(
    createDataSourceMap(), shardingRuleConfig, new Properties()) {
    Connection connection = dataSource.getConnection();
    ...
}

咱們在程序中拿到Connection對象後,就能夠像使用普通的JDBC同樣來使用sharding-jdbc操做數據庫了。面試

sharding-jdbc包結構

sharding-jdbc  
    ├── sharding-jdbc-core      重寫DataSource/Connection/Statement/ResultSet四大對象
    └── sharding-jdbc-orchestration        配置中心
sharding-core
    ├── sharding-core-api       接口和配置類	
    ├── sharding-core-common    通用分片策略實現...
    ├── sharding-core-entry     SQL解析、路由、改寫,核心類BaseShardingEngine
    ├── sharding-core-route     SQL路由,核心類StatementRoutingEngine
    ├── sharding-core-rewrite   SQL改寫,核心類ShardingSQLRewriteEngine
    ├── sharding-core-execute   SQL執行,核心類ShardingExecuteEngine
    └── sharding-core-merge     結果合併,核心類MergeEngine
shardingsphere-sql-parser 
    ├── shardingsphere-sql-parser-spi       SQLParserEntry,用於初始化SQLParser
    ├── shardingsphere-sql-parser-engine    SQL解析,核心類SQLParseEngine
    ├── shardingsphere-sql-parser-relation
    └── shardingsphere-sql-parser-mysql     MySQL解析器,核心類MySQLParserEntry和MySQLParser
shardingsphere-underlying           基礎接口和api
    ├── shardingsphere-rewrite      SQLRewriteEngine接口
    ├── shardingsphere-execute      QueryResult查詢結果
    └── shardingsphere-merge        MergeEngine接口
shardingsphere-spi                  SPI加載工具類
sharding-transaction
    ├── sharding-transaction-core   接口ShardingTransactionManager,SPI加載		
    ├── sharding-transaction-2pc    實現類XAShardingTransactionManager
    └── sharding-transaction-base   實現類SeataATShardingTransactionManager

sharding-jdbc中的四大對象

全部的一切都從 ShardingDataSourceFactory 開始的,建立了一個 ShardingDataSource 的分片數據源。除了 ShardingDataSource(分片數據源),在 Sharding-Sphere 中還有 MasterSlaveDataSourceFactory(主從數據源)、EncryptDataSourceFactory(脫敏數據源)。sql

public static DataSource createDataSource(
        final Map<String, DataSource> dataSourceMap,
        final ShardingRuleConfiguration shardingRuleConfig,
        final Properties props) throws SQLException {
    return new ShardingDataSource(dataSourceMap,
               new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
}

說明: 本文主要以 ShardingDataSource 爲切入點分析 Sharding-Sphere 是如何對 JDBC 四大對象 DataSource、Connection、Statement、ResultSet 進行封裝的。數據庫

DataSource

這裏,涉及到兩個比較重要的接口,一個是DataSource,一個是Connection。咱們首先來看下它們的類圖。後端

  • DataSource
    在這裏插入圖片描述設計模式

  • Connection
    在這裏插入圖片描述api

DataSource 和 Connection 都比較簡單,沒有處理過多的邏輯,只是 dataSourceMap, shardingRule 進行簡單的封裝。bash

ShardingDataSource 持有對數據源和分片規則,能夠經過 getConnection 方法獲取 ShardingConnection 鏈接。服務器

private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(
                dataSourceMap, shardingRule, props, getDatabaseType());
@Override
public final ShardingConnection getConnection() {
    return new ShardingConnection(getDataSourceMap(), runtimeContext,
            TransactionTypeHolder.get());
}

Connection

ShardingConnection 能夠建立 Statement 和 PrepareStatement 兩種運行方式,以下代碼所示。

@Override
public Statement createStatement(final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability) {
    return new ShardingStatement(this, resultSetType,
            resultSetConcurrency, resultSetHoldability);
}

@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability)
        throws SQLException {
    return new ShardingPreparedStatement(this, sql, resultSetType,
            resultSetConcurrency, resultSetHoldability);
}

說明: ShardingConnection 主要是將建立 ShardingStatement 和 ShardingPreparedStatement 兩個對象,主要的執行邏輯都在 Statement 對象中。另外,ShardingConnection 還有兩個重要的功能,一個是獲取真正的數據庫鏈接,一個是事務提交功能。

Statement

Statement 相對來講比較複雜,由於它都是 JDBC 的真正執行器,全部邏輯都封裝在 Statement 中。咱們來看下Statement的類圖

在這裏插入圖片描述

對於Statement,我就不作過對的描述了,相信使用過JDBC的小夥伴,對Statement都不陌生了。

ResultSet

ResultSet類圖以下所示。

在這裏插入圖片描述

咱們從源碼中能夠看出:ShardingResultSet 只是對 MergedResult 的簡單封裝。

private final MergedResult mergeResultSet;
@Override
public boolean next() throws SQLException {
    return mergeResultSet.next();
}

sharding-jdbc-core核心分析

ShardingStatement 內部有三個核心的類,一是 SimpleQueryShardingEngine 完成 SQL 解析、路由、改寫;一是 StatementExecutor 進行 SQL 執行;最後調用 MergeEngine 對結果進行合併處理。

ShardingStatement

初始化

private final ShardingConnection connection;
private final StatementExecutor statementExecutor;

public ShardingStatement(final ShardingConnection connection) {
    this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
            ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public ShardingStatement(final ShardingConnection connection, final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability) {
    super(Statement.class);
    this.connection = connection;
    statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,
            resultSetHoldability, connection);
}

ShardingStatement 內部執行 SQL 委託給了 statementExecutor。

執行

(1)executeQuery 執行過程

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
    ResultSet result;
    try {
        clearPrevious();
        // 1. SQL 解析、路由、改寫,最終生成 SQLRouteResult
        shard(sql);
        // 2. 生成執行計劃 SQLRouteResult -> StatementExecuteUnit
        initStatementExecutor();
        // 3. statementExecutor.executeQuery() 執行任務
        MergeEngine mergeEngine = MergeEngineFactory.newInstance(
                connection.getRuntimeContext().getDatabaseType(),
                connection.getRuntimeContext().getRule(), sqlRouteResult,
                connection.getRuntimeContext().getMetaData().getRelationMetas(),
                statementExecutor.executeQuery());
        // 4. 結果合併
        result = getResultSet(mergeEngine);
    } finally {
        currentResultSet = null;
    }
    currentResultSet = result;
    return result;
}

(2)SQL 路由(包括 SQL 解析、路由、改寫)

private SQLRouteResult sqlRouteResult;
private void shard(final String sql) {
    ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
    SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(
            runtimeContext.getRule(), runtimeContext.getProps(),
            runtimeContext.getMetaData(), runtimeContext.getParseEngine());
    sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}

SimpleQueryShardingEngine 進行 SQL 路由(包括 SQL 解析、路由、改寫),生成 SQLRouteResult,當 ShardingStatement 完成 SQL 的路由,生成 SQLRouteResult 後,剩下的執行任務就所有交給 StatementExecutor 完成。

StatementExecutor

StatementExecutor 內部封裝了 SQL 任務的執行過程,包括:SqlExecutePrepareTemplate 類生成執行計劃 StatementExecuteUnit,以及 SQLExecuteTemplate 用於執行 StatementExecuteUnit。

類結構

在這裏插入圖片描述

重要屬性

AbstractStatementExecutor 類中重要的屬性:

// SQLExecutePrepareTemplate用於生成執行計劃StatementExecuteUnit
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// 保存生成的執行計劃StatementExecuteUnit
private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =
            new LinkedList<>();

// SQLExecuteTemplate用於執行StatementExecuteUnit
private final SQLExecuteTemplate sqlExecuteTemplate;
// 保存查詢結果
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();

生成執行計劃

// 執行前清理狀態
private void clearPrevious() throws SQLException {
    statementExecutor.clear();
}
// 執行時初始化
private void initStatementExecutor() throws SQLException {
    statementExecutor.init(sqlRouteResult);
    replayMethodForStatements();
}

這裏,須要注意的是: StatementExecutor 是有狀態的,每次執行前都要調用 statementExecutor.clear() 清理上一次執行的狀態,並調用 statementExecutor.init() 從新初始化。

statementExecutor.init() 初始化主要是生成執行計劃 StatementExecuteUnit。

public void init(final SQLRouteResult routeResult) throws SQLException {
    setSqlStatementContext(routeResult.getSqlStatementContext());
    getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
    cacheStatements();
}

private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(
        final Collection<RouteUnit> routeUnits) throws SQLException {
    return getSqlExecutePrepareTemplate().getExecuteUnitGroups(
            routeUnits, new SQLExecutePrepareCallback() {
                // 獲取鏈接
                @Override
                public List<Connection> getConnections(
                        final ConnectionMode connectionMode,
                        final String dataSourceName, final int connectionSize)
                        throws SQLException {
                    return StatementExecutor.super.getConnection().getConnections(
                            connectionMode, dataSourceName, connectionSize);
                }

                // 生成執行計劃RouteUnit -> StatementExecuteUnit
                @Override
                public StatementExecuteUnit createStatementExecuteUnit(
                        final Connection connection, final RouteUnit routeUnit,
                        final ConnectionMode connectionMode) throws SQLException {
                    return new StatementExecuteUnit(
                            routeUnit, connection.createStatement(
                            getResultSetType(), getResultSetConcurrency(),
                            getResultSetHoldability()), connectionMode);
                }
            });
}

SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一個工具類,專門用於生成執行計劃,將 RouteUnit 轉化爲 StatementExecuteUnit。同時還提供了另外一個工具類 SQLExecuteTemplate 用於執行 StatementExecuteUnit,在任務執行時咱們會看到這個類。

任務執行

public List<QueryResult> executeQuery() throws SQLException {
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    SQLExecuteCallback<QueryResult> executeCallback = 
        new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
        @Override
        protected QueryResult executeSQL(final String sql, final Statement statement,
                final ConnectionMode connectionMode) throws SQLException {
            return getQueryResult(sql, statement, connectionMode);
        }
    };
    // 執行StatementExecuteUnit
    return executeCallback(executeCallback);
}

// sqlExecuteTemplate 執行 executeGroups(即StatementExecuteUnit)
protected final <T> List<T> executeCallback(
        final SQLExecuteCallback<T> executeCallback) throws SQLException {
    // 執行全部的任務 StatementExecuteUnit
    List<T> result = sqlExecuteTemplate.executeGroup(
            (Collection) executeGroups, executeCallback);
    refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
    return result;
}

SqlExecuteTemplate 執行 StatementExecuteUnit 會回調 SQLExecuteCallback#executeSQL 方法,最終調用 getQueryResult 方法。

private QueryResult getQueryResult(final String sql, final Statement statement,
        final ConnectionMode connectionMode) throws SQLException {
    ResultSet resultSet = statement.executeQuery(sql);
    getResultSets().add(resultSet);
    return ConnectionMode.MEMORY_STRICTLY == connectionMode
            ? new StreamQueryResult(resultSet)
            : new MemoryQueryResult(resultSet);
}

ConnectionMode 有兩種模式:內存限制(MEMORY_STRICTLY)和鏈接限制(CONNECTION_STRICTLY),若是一個鏈接執行多個 StatementExecuteUnit 則爲內存限制(MEMORY_STRICTLY),採用流式處理,即 StreamQueryResult ,反之則爲鏈接限制(CONNECTION_STRICTLY),此時會將全部從 MySQL 服務器返回的數據都加載到內存中。特別是在 Sharding-Proxy 中特別有用,避免將代理服務器撐爆。

重磅福利

關注「 冰河技術 」微信公衆號,後臺回覆 「設計模式」 關鍵字領取《深刻淺出Java 23種設計模式》PDF文檔。回覆「Java8」關鍵字領取《Java8新特性教程》PDF文檔。回覆「限流」關鍵字獲取《億級流量下的分佈式限流解決方案》PDF文檔,三本PDF均是由冰河原創並整理的超硬核教程,面試必備!!

好了,今天就聊到這兒吧!別忘了點個贊,給個在看和轉發,讓更多的人看到,一塊兒學習,一塊兒進步!!

寫在最後

若是你以爲冰河寫的還不錯,請微信搜索並關注「 冰河技術 」微信公衆號,跟冰河學習高併發、分佈式、微服務、大數據、互聯網和雲原生技術,「 冰河技術 」微信公衆號更新了大量技術專題,每一篇技術文章乾貨滿滿!很多讀者已經經過閱讀「 冰河技術 」微信公衆號文章,吊打面試官,成功跳槽到大廠;也有很多讀者實現了技術上的飛躍,成爲公司的技術骨幹!若是你也想像他們同樣提高本身的能力,實現技術能力的飛躍,進大廠,升職加薪,那就關注「 冰河技術 」微信公衆號吧,天天更新超硬核技術乾貨,讓你對如何提高技術能力再也不迷茫!

相關文章
相關標籤/搜索