在產品初期快速迭代的過程當中,每每爲了快速上線而佔據市場,在後端開發的過程當中每每不會過多的考慮分佈式和微服務,每每會將後端服務作成一個單體應用,而數據庫也是同樣,最初會把全部的業務數據都放到一個數據庫中,即所謂的單實例數據庫。隨着業務的迅速發展,將全部數據都放在一個數據庫中已經不足以支撐業務發展的須要。此時,就會對系統進行分佈式改造,而數據庫業務進行分庫分表的拆分。那麼,問題來了,如何更好的訪問和管理拆分後的數據庫呢?業界已經有不少成熟的解決方案,其中,一個很是優秀的解決方案就是:Apache ShardingSphere。今天,咱們就從源碼級別來共同探討下sharding-jdbc的核心源碼。java
Sharding-Jdbc 是一個輕量級的分庫分表框架,使用時最關鍵的是配製分庫分表策略,其他的和使用普通的 MySQL 驅動同樣,幾乎不用改代碼。例以下面的代碼片斷。mysql
try(DataSource dataSource = ShardingDataSourceFactory.createDataSource( createDataSourceMap(), shardingRuleConfig, new Properties()) { Connection connection = dataSource.getConnection(); ... }
咱們在程序中拿到Connection對象後,就能夠像使用普通的JDBC同樣來使用sharding-jdbc操做數據庫了。面試
sharding-jdbc ├── sharding-jdbc-core 重寫DataSource/Connection/Statement/ResultSet四大對象 └── sharding-jdbc-orchestration 配置中心 sharding-core ├── sharding-core-api 接口和配置類 ├── sharding-core-common 通用分片策略實現... ├── sharding-core-entry SQL解析、路由、改寫,核心類BaseShardingEngine ├── sharding-core-route SQL路由,核心類StatementRoutingEngine ├── sharding-core-rewrite SQL改寫,核心類ShardingSQLRewriteEngine ├── sharding-core-execute SQL執行,核心類ShardingExecuteEngine └── sharding-core-merge 結果合併,核心類MergeEngine shardingsphere-sql-parser ├── shardingsphere-sql-parser-spi SQLParserEntry,用於初始化SQLParser ├── shardingsphere-sql-parser-engine SQL解析,核心類SQLParseEngine ├── shardingsphere-sql-parser-relation └── shardingsphere-sql-parser-mysql MySQL解析器,核心類MySQLParserEntry和MySQLParser shardingsphere-underlying 基礎接口和api ├── shardingsphere-rewrite SQLRewriteEngine接口 ├── shardingsphere-execute QueryResult查詢結果 └── shardingsphere-merge MergeEngine接口 shardingsphere-spi SPI加載工具類 sharding-transaction ├── sharding-transaction-core 接口ShardingTransactionManager,SPI加載 ├── sharding-transaction-2pc 實現類XAShardingTransactionManager └── sharding-transaction-base 實現類SeataATShardingTransactionManager
全部的一切都從 ShardingDataSourceFactory 開始的,建立了一個 ShardingDataSource 的分片數據源。除了 ShardingDataSource(分片數據源),在 Sharding-Sphere 中還有 MasterSlaveDataSourceFactory(主從數據源)、EncryptDataSourceFactory(脫敏數據源)。sql
public static DataSource createDataSource( final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException { return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props); }
說明: 本文主要以 ShardingDataSource 爲切入點分析 Sharding-Sphere 是如何對 JDBC 四大對象 DataSource、Connection、Statement、ResultSet 進行封裝的。數據庫
這裏,涉及到兩個比較重要的接口,一個是DataSource,一個是Connection。咱們首先來看下它們的類圖。後端
DataSource
設計模式
Connection
api
DataSource 和 Connection 都比較簡單,沒有處理過多的邏輯,只是 dataSourceMap, shardingRule 進行簡單的封裝。bash
ShardingDataSource 持有對數據源和分片規則,能夠經過 getConnection 方法獲取 ShardingConnection 鏈接。服務器
private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext( dataSourceMap, shardingRule, props, getDatabaseType()); @Override public final ShardingConnection getConnection() { return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get()); }
ShardingConnection 能夠建立 Statement 和 PrepareStatement 兩種運行方式,以下代碼所示。
@Override public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { return new ShardingStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability); } @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability); }
說明: ShardingConnection 主要是將建立 ShardingStatement 和 ShardingPreparedStatement 兩個對象,主要的執行邏輯都在 Statement 對象中。另外,ShardingConnection 還有兩個重要的功能,一個是獲取真正的數據庫鏈接,一個是事務提交功能。
Statement 相對來講比較複雜,由於它都是 JDBC 的真正執行器,全部邏輯都封裝在 Statement 中。咱們來看下Statement的類圖
對於Statement,我就不作過對的描述了,相信使用過JDBC的小夥伴,對Statement都不陌生了。
ResultSet類圖以下所示。
咱們從源碼中能夠看出:ShardingResultSet 只是對 MergedResult 的簡單封裝。
private final MergedResult mergeResultSet; @Override public boolean next() throws SQLException { return mergeResultSet.next(); }
ShardingStatement 內部有三個核心的類,一是 SimpleQueryShardingEngine 完成 SQL 解析、路由、改寫;一是 StatementExecutor 進行 SQL 執行;最後調用 MergeEngine 對結果進行合併處理。
private final ShardingConnection connection; private final StatementExecutor statementExecutor; public ShardingStatement(final ShardingConnection connection) { this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { super(Statement.class); this.connection = connection; statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection); }
ShardingStatement 內部執行 SQL 委託給了 statementExecutor。
(1)executeQuery 執行過程
@Override public ResultSet executeQuery(final String sql) throws SQLException { ResultSet result; try { clearPrevious(); // 1. SQL 解析、路由、改寫,最終生成 SQLRouteResult shard(sql); // 2. 生成執行計劃 SQLRouteResult -> StatementExecuteUnit initStatementExecutor(); // 3. statementExecutor.executeQuery() 執行任務 MergeEngine mergeEngine = MergeEngineFactory.newInstance( connection.getRuntimeContext().getDatabaseType(), connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), statementExecutor.executeQuery()); // 4. 結果合併 result = getResultSet(mergeEngine); } finally { currentResultSet = null; } currentResultSet = result; return result; }
(2)SQL 路由(包括 SQL 解析、路由、改寫)
private SQLRouteResult sqlRouteResult; private void shard(final String sql) { ShardingRuntimeContext runtimeContext = connection.getRuntimeContext(); SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine( runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine()); sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList()); }
SimpleQueryShardingEngine 進行 SQL 路由(包括 SQL 解析、路由、改寫),生成 SQLRouteResult,當 ShardingStatement 完成 SQL 的路由,生成 SQLRouteResult 後,剩下的執行任務就所有交給 StatementExecutor 完成。
StatementExecutor 內部封裝了 SQL 任務的執行過程,包括:SqlExecutePrepareTemplate 類生成執行計劃 StatementExecuteUnit,以及 SQLExecuteTemplate 用於執行 StatementExecuteUnit。
AbstractStatementExecutor 類中重要的屬性:
// SQLExecutePrepareTemplate用於生成執行計劃StatementExecuteUnit private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate; // 保存生成的執行計劃StatementExecuteUnit private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups = new LinkedList<>(); // SQLExecuteTemplate用於執行StatementExecuteUnit private final SQLExecuteTemplate sqlExecuteTemplate; // 保存查詢結果 private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
// 執行前清理狀態 private void clearPrevious() throws SQLException { statementExecutor.clear(); } // 執行時初始化 private void initStatementExecutor() throws SQLException { statementExecutor.init(sqlRouteResult); replayMethodForStatements(); }
這裏,須要注意的是: StatementExecutor 是有狀態的,每次執行前都要調用 statementExecutor.clear() 清理上一次執行的狀態,並調用 statementExecutor.init() 從新初始化。
statementExecutor.init() 初始化主要是生成執行計劃 StatementExecuteUnit。
public void init(final SQLRouteResult routeResult) throws SQLException { setSqlStatementContext(routeResult.getSqlStatementContext()); getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits())); cacheStatements(); } private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups( final Collection<RouteUnit> routeUnits) throws SQLException { return getSqlExecutePrepareTemplate().getExecuteUnitGroups( routeUnits, new SQLExecutePrepareCallback() { // 獲取鏈接 @Override public List<Connection> getConnections( final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return StatementExecutor.super.getConnection().getConnections( connectionMode, dataSourceName, connectionSize); } // 生成執行計劃RouteUnit -> StatementExecuteUnit @Override public StatementExecuteUnit createStatementExecuteUnit( final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit( routeUnit, connection.createStatement( getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()), connectionMode); } }); }
SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一個工具類,專門用於生成執行計劃,將 RouteUnit 轉化爲 StatementExecuteUnit。同時還提供了另外一個工具類 SQLExecuteTemplate 用於執行 StatementExecuteUnit,在任務執行時咱們會看到這個類。
public List<QueryResult> executeQuery() throws SQLException { final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { return getQueryResult(sql, statement, connectionMode); } }; // 執行StatementExecuteUnit return executeCallback(executeCallback); } // sqlExecuteTemplate 執行 executeGroups(即StatementExecuteUnit) protected final <T> List<T> executeCallback( final SQLExecuteCallback<T> executeCallback) throws SQLException { // 執行全部的任務 StatementExecuteUnit List<T> result = sqlExecuteTemplate.executeGroup( (Collection) executeGroups, executeCallback); refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext); return result; }
SqlExecuteTemplate 執行 StatementExecuteUnit 會回調 SQLExecuteCallback#executeSQL 方法,最終調用 getQueryResult 方法。
private QueryResult getQueryResult(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { ResultSet resultSet = statement.executeQuery(sql); getResultSets().add(resultSet); return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet); }
ConnectionMode 有兩種模式:內存限制(MEMORY_STRICTLY)和鏈接限制(CONNECTION_STRICTLY),若是一個鏈接執行多個 StatementExecuteUnit 則爲內存限制(MEMORY_STRICTLY),採用流式處理,即 StreamQueryResult ,反之則爲鏈接限制(CONNECTION_STRICTLY),此時會將全部從 MySQL 服務器返回的數據都加載到內存中。特別是在 Sharding-Proxy 中特別有用,避免將代理服務器撐爆。
關注「 冰河技術 」微信公衆號,後臺回覆 「設計模式」 關鍵字領取《深刻淺出Java 23種設計模式》PDF文檔。回覆「Java8」關鍵字領取《Java8新特性教程》PDF文檔。回覆「限流」關鍵字獲取《億級流量下的分佈式限流解決方案》PDF文檔,三本PDF均是由冰河原創並整理的超硬核教程,面試必備!!
好了,今天就聊到這兒吧!別忘了點個贊,給個在看和轉發,讓更多的人看到,一塊兒學習,一塊兒進步!!
若是你以爲冰河寫的還不錯,請微信搜索並關注「 冰河技術 」微信公衆號,跟冰河學習高併發、分佈式、微服務、大數據、互聯網和雲原生技術,「 冰河技術 」微信公衆號更新了大量技術專題,每一篇技術文章乾貨滿滿!很多讀者已經經過閱讀「 冰河技術 」微信公衆號文章,吊打面試官,成功跳槽到大廠;也有很多讀者實現了技術上的飛躍,成爲公司的技術骨幹!若是你也想像他們同樣提高本身的能力,實現技術能力的飛躍,進大廠,升職加薪,那就關注「 冰河技術 」微信公衆號吧,天天更新超硬核技術乾貨,讓你對如何提高技術能力再也不迷茫!