Shading - jdbc 源碼分析(六) - sql 執行

上一篇文章咱們分析了sharding-jdbc 的SQL改寫(改寫),今天咱們分析下sql執行。sql

摘要:

SQL執行主要分爲2部分:數據庫

  • 爲每一個SQL改寫結果建立數據庫鏈接,官方說法叫建立執行單元
  • 執行「建立好的執行單元」,並對每一個關鍵步驟發送事件, 如:執行開始事件、執行成功事件以及執行失敗事件。執行引擎僅關注事件的發送,它並不關心事件的訂閱者。(以上摘自官方文檔)

主要類:

  • ExecutorEngine:SQL執行引擎,內部維護一個線程池(默認corePoolSize=100),異步執行SQL,發送事件通知
  • StatementExecutor:多線程執行靜態語句對象請求的執行器.(這個解釋有點繞口,就是包裝了ExecutorEngine須要執行的SQL,調用ExecutorEngine執行具體的SQL)
  • ShardingDataSource: 分片的數據源
  • ShardingConnection:分片的數據連接
  • ShardingStatement:支持分片的靜態語句對象

執行過程:

  • 初始化ShardingDataSource,先看下咱們的整個test:
/**
 * @Author serlain
 * @Date 2018/11/14 下午11:21
 */
public class MyShardingStatementTest extends AbstractShardingDatabaseOnlyDBUnitTest {

    private ShardingDataSource shardingDataSource;

    private static String sql = "SELECT o.order_id FROM t_order o WHERE o.order_id = 4";

    @Before
    public void init() throws SQLException {
        shardingDataSource = getShardingDataSource();
    }


    @Test
    public void testselect() throws SQLException {
        try (
                Connection connection = shardingDataSource.getConnection();
                Statement stmt = connection.createStatement();
                ResultSet resultSet = stmt.executeQuery(sql)) {
            assertTrue(resultSet.next());
            assertThat(resultSet.getLong(1), is(40L));
        }

    }
}

protected final ShardingDataSource getShardingDataSource() {
        if (null != shardingDataSource && !isShutdown) {
            return shardingDataSource;
        }
        isShutdown = false;
        DataSourceRule dataSourceRule = new DataSourceRule(createDataSourceMap("dataSource_%s"));
        TableRule orderTableRule = TableRule.builder("t_order").dataSourceRule(dataSourceRule).actualTables(Lists.newArrayList("t_order_0", "t_order_1")).generateKeyColumn("order_id", IncrementKeyGenerator.class).build();
        ShardingRule shardingRule = ShardingRule.builder().dataSourceRule(dataSourceRule).tableRules(Arrays.asList(orderTableRule))
                .databaseShardingStrategy(new DatabaseShardingStrategy(Collections.singletonList("order_id"), new MultipleKeysModuloDatabaseShardingAlgorithm()))
                .tableShardingStrategy(new TableShardingStrategy("order_id", new OrderShardingAlgorithm())).build();
        shardingDataSource = new ShardingDataSource(shardingRule);
        return shardingDataSource;
    }
複製代碼

咱們能夠看到ShardingDataSource的構造函數:編程

ShardingRule:分片的規則,外部自定義;ExecutorEngine:執行引擎,內部初始化;ShardingContext:數據源運行期上下文(這個類很關鍵,把SQL執行期間須要的類都貫穿起來,須要就從這個類裏面拿)。緩存

public ShardingDataSource(final ShardingRule shardingRule) {
        this(shardingRule, new Properties());
    }
    
    public ShardingDataSource(final ShardingRule shardingRule, final Properties props) {
        Preconditions.checkNotNull(shardingRule);
        Preconditions.checkNotNull(props);
        shardingProperties = new ShardingProperties(props);
        int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
        executorEngine = new ExecutorEngine(executorSize);
        try {
            shardingContext = new ShardingContext(shardingRule, DatabaseType.valueFrom(getDatabaseProductName(shardingRule)), executorEngine);
        } catch (final SQLException ex) {
            throw new ShardingJdbcException(ex);
        }
    }
複製代碼
  • getConnection:返回ShardingConnection
@Override
    public ShardingConnection getConnection() throws SQLException {
        MetricsContext.init(shardingProperties);
        return new ShardingConnection(shardingContext);
    }
複製代碼
  • 建立 SQL statements,返回ShardingStatement
@Override
    public Statement createStatement(final int resultSetType, final int resultSetConcurrency) throws SQLException {
        return new ShardingStatement(this, resultSetType, resultSetConcurrency);
    }
    public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency) {
        this(shardingConnection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
        super(Statement.class);
        this.shardingConnection = shardingConnection;
        this.resultSetType = resultSetType;
        this.resultSetConcurrency = resultSetConcurrency;
        this.resultSetHoldability = resultSetHoldability;
    }
複製代碼
  • 執行查詢,executeQuery:ShardingStatement重寫該方法
@Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        ResultSet result;
        try {
           //包含了執行和歸併,咱們今天只分析執行的部分,歸併下一篇文章分析
            result = ResultSetFactory.getResultSet(generateExecutor(sql).executeQuery(), routeResult.getSqlStatement());
        } finally {
            setCurrentResultSet(null);
        }
        setCurrentResultSet(result);
        return result;
    }
    
複製代碼
  • 建立執行單元:

根據路由改寫後的結果(dataSource),建立SQL Statement;構建StatementUnit(SQL語句到Statement映射);構建StatementExecutor(SQL執行單元)bash

private StatementExecutor generateExecutor(final String sql) throws SQLException {
        clearPrevious();
        // 路由、改寫後的結果
        routeResult = new StatementRoutingEngine(shardingConnection.getShardingContext()).route(sql);
        Collection<StatementUnit> statementUnits = new LinkedList<>();
        //遍歷SQL最小執行單元,根據dataSourceName獲取datasource,建立ShardingStatement
        for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
            Statement statement = shardingConnection.getConnection(
                    each.getDataSource(), routeResult.getSqlStatement().getType()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
            replayMethodsInvocation(statement);
            //建立StatementUnit:SQL語句-statement 映射
            statementUnits.add(new StatementUnit(each, statement));
            routedStatements.add(statement);
        }
        //StatementExecutor:
        return new StatementExecutor(shardingConnection.getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), statementUnits);
    }
    
     
    /**
     * 根據數據源名稱獲取相應的數據庫鏈接.
     * 
     * @param dataSourceName 數據源名稱
     * @param sqlType SQL語句類型
     * @return 數據庫鏈接
     * @throws SQLException SQL異常
     */
    public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {
        //緩存的Conn
        Optional<Connection> connection = getCachedConnection(dataSourceName, sqlType);
        if (connection.isPresent()) {
            return connection.get();
        }
        Context metricsContext = MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnection", dataSourceName));
        //根據配置的DataSourceRule,獲取指定name的DataSource
        DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName);
        Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName);
        String realDataSourceName;
        //MasterSlaveDataSource
        if (dataSource instanceof MasterSlaveDataSource) {
            dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);
            realDataSourceName = MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);
        } else {
            realDataSourceName = dataSourceName;
        }
        //獲取conn
        Connection result = dataSource.getConnection();
        MetricsContext.stop(metricsContext);
        connectionMap.put(realDataSourceName, result);
        replayMethodsInvocation(result);
        return result;
    }
複製代碼
  • 執行「建立好的執行單元「:

  1. 利用回調執行SQL:
  2. 執行單元中,第一個1以異步的方式執行,其餘的同步執行,並等待結果返回,提升執行效率
  3. 在執行過程當中,執行前、執行失敗、執行成功都會利用Guava的EventBus發佈事件通知,訂閱方接受事件通知,去處理一些事情(好比內部實現的分佈式事務(最大努力到達,就是訂閱了事件,在執行失敗的時候,加入重試來實現))

/**
     * 執行SQL查詢.
     * 
     * @return 結果集列表
     */
    public List<ResultSet> executeQuery() {
        Context context = MetricsContext.start("ShardingStatement-executeQuery");
        List<ResultSet> result;
        try {
            //交給executorEngine執行
            result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<ResultSet>() {
                //使用回調,具體執行的邏輯由外部編寫
                @Override
                public ResultSet execute(final BaseStatementUnit baseStatementUnit) throws Exception {
                    return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql());
                }
            });
        } finally {
            MetricsContext.stop(context);
        }
        return result;
    }
複製代碼
/**
     * 執行Statement.
     *
     * @param sqlType SQL類型
     * @param statementUnits 語句對象執行單元集合
     * @param executeCallback 執行回調函數
     * @param <T> 返回值類型
     * @return 執行結果
     */
    public <T> List<T> executeStatement(final SQLType sqlType, final Collection<StatementUnit> statementUnits, final ExecuteCallback<T> executeCallback) {
        return execute(sqlType, statementUnits, Collections.<List<Object>>emptyList(), executeCallback);
    }
複製代碼
private  <T> List<T> execute(
            final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
        //須要執行的SQL爲空,直接返回空list
        if (baseStatementUnits.isEmpty()) {
            return Collections.emptyList();
        }
        
        Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
        //獲取第一個須要執行的
        BaseStatementUnit firstInput = iterator.next();
        //剩下的異步執行
        ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
        T firstOutput;
        List<T> restOutputs;
        try {
            //同步執行
            firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
            //等待
            restOutputs = restFutures.get();
            //CHECKSTYLE:OFF
        } catch (final Exception ex) {
            //CHECKSTYLE:ON
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
        返回
        List<T> result = Lists.newLinkedList(restOutputs);
        result.add(0, firstOutput);
        return result;
    }
複製代碼

異步執行的邏輯:多線程

private <T> ListenableFuture<List<T>> asyncExecute(
            final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
        List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
        for (final BaseStatementUnit each : baseStatementUnits) {
            result.add(executorService.submit(new Callable<T>() {
                
                @Override
                public T call() throws Exception {
                    return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
                }
            }));
        }
        return Futures.allAsList(result);
    }
複製代碼

同步執行的邏輯:異步

private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {
        return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());
    }
複製代碼

executeInternal:async

private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, 
                          final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
        synchronized (baseStatementUnit.getStatement().getConnection()) {
            T result;
            ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
            ExecutorDataMap.setDataMap(dataMap);
            List<AbstractExecutionEvent> events = new LinkedList<>();
            //添加執行前事件
            if (parameterSets.isEmpty()) {
                events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
            }
             //添加執行前事件
            for (List<Object> each : parameterSets) {
                events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
            }
            //發佈事件:EventBus 單例
            for (AbstractExecutionEvent event : events) {
                EventBusInstance.getInstance().post(event);
            }
            //調用executeCallback 執行SQL
            try {
                result = executeCallback.execute(baseStatementUnit);
            } catch (final SQLException ex) {
                //發佈執行失敗事件
                for (AbstractExecutionEvent each : events) {
                    each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
                    each.setException(Optional.of(ex));
                    EventBusInstance.getInstance().post(each);
                    ExecutorExceptionHandler.handleException(ex);
                }
                return null;
            }
            //發佈執行成功事件
            for (AbstractExecutionEvent each : events) {
                each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
                EventBusInstance.getInstance().post(each);
            }
            return result;
        }
    }
複製代碼

最後:

小尾巴走一波,歡迎關注個人公衆號,不按期分享編程、投資、生活方面的感悟:)分佈式

相關文章
相關標籤/搜索