sharding-jdbc源碼解析之sql執行

sql執行源碼解析sql

本文轉自「天河聊技術」微信公衆號數據庫

 

找到這個方法緩存

com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement#execute安全

@Override
    public boolean execute() throws SQLException {
        try {
            Collection<PreparedStatementUnit> preparedStatementUnits = route();
//            建立預編譯statement的sql執行器
            return new PreparedStatementExecutor(
                    getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).execute();
        } finally {
//            釋放內存
            clearBatch();
        }
    }

 

組裝預編譯對象執行單元集合微信

/**
 * 預編譯語句對象執行單元.
 *
 * @author zhangliang
 */
@RequiredArgsConstructor
@Getter
public final class PreparedStatementUnit implements BaseStatementUnit {

//    sql執行單元
    private final SQLExecutionUnit sqlExecutionUnit;

//    預編譯對象
    private final PreparedStatement statement;
}
進入到route()方法
private Collection<PreparedStatementUnit> route() throws SQLException {
        Collection<PreparedStatementUnit> result = new LinkedList<>();
//        執行sql路由邏輯並獲得路由結果並裝載支持靜態分片的預編譯statement對象
        setRouteResult(routingEngine.route(getParameters()));
//        遍歷最小sql執行單元
        for (SQLExecutionUnit each : getRouteResult().getExecutionUnits()) {
//            獲取sql類型
            SQLType sqlType = getRouteResult().getSqlStatement().getType();
            Collection<PreparedStatement> preparedStatements;
            if (SQLType.DDL == sqlType) {
//                若是是DDL,建立DDL的prepareStatement對象
                preparedStatements = generatePreparedStatementForDDL(each);
            } else {
//                DDL以外的語句建立prepareStatement對象
                preparedStatements = Collections.singletonList(generatePreparedStatement(each));
            }
//            裝載路由的statement對象
            getRoutedStatements().addAll(preparedStatements);
            for (PreparedStatement preparedStatement : preparedStatements) {
                replaySetParameter(preparedStatement);
                result.add(new PreparedStatementUnit(each, preparedStatement));
            }
        }
        return result;
    }
//        遍歷最小sql執行單元
        for (SQLExecutionUnit each : getRouteResult().getExecutionUnits()) {
//            獲取sql類型
            SQLType sqlType = getRouteResult().getSqlStatement().getType();
//                若是是DDL,建立DDL的prepareStatement對象
                preparedStatements = generatePreparedStatementForDDL(each);
private Collection<PreparedStatement> generatePreparedStatementForDDL(final SQLExecutionUnit sqlExecutionUnit) throws SQLException {
        Collection<PreparedStatement> result = new LinkedList<>();
//        獲取能夠執行DDL語句的數據庫鏈接對象集合
        Collection<Connection> connections = getShardingConnection().getConnectionForDDL(sqlExecutionUnit.getDataSource());
        for (Connection each : connections) {
//            建立prepareStatement對象
            result.add(each.prepareStatement(sqlExecutionUnit.getSql(), getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()));
        }
        return result;
    }
//        獲取能夠執行DDL語句的數據庫鏈接對象集合
        Collection<Connection> connections = getShardingConnection().getConnectionForDDL(sqlExecutionUnit.getDataSource());
public Collection<Connection> getConnectionForDDL(final String dataSourceName) throws SQLException {
        final Context metricsContext = MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnectionForDDL", dataSourceName));
//        從分片規則的數據庫分片規則中獲取數據源
        DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName);
        Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName);
        Collection<DataSource> dataSources = new LinkedList<>();
        if (dataSource instanceof MasterSlaveDataSource) {
            dataSources.add(((MasterSlaveDataSource) dataSource).getMasterDataSource());
            dataSources.addAll(((MasterSlaveDataSource) dataSource).getSlaveDataSources());
        } else {
            dataSources.add(dataSource);
        }
        Collection<Connection> result = new LinkedList<>();
        for (DataSource each : dataSources) {
//            根據數據源獲取數據庫鏈接
            Connection connection = each.getConnection();
            replayMethodsInvocation(connection);//從新調用調用過的方法動做
            result.add(connection);
        }
        MetricsContext.stop(metricsContext);
        return result;
    }
向上返回到這裏
private Collection<PreparedStatementUnit> route() throws SQLException {
        Collection<PreparedStatementUnit> result = new LinkedList<>();
//        執行sql路由邏輯並獲得路由結果並裝載支持靜態分片的預編譯statement對象
        setRouteResult(routingEngine.route(getParameters()));
//        遍歷最小sql執行單元
        for (SQLExecutionUnit each : getRouteResult().getExecutionUnits()) {
//            獲取sql類型
            SQLType sqlType = getRouteResult().getSqlStatement().getType();
            Collection<PreparedStatement> preparedStatements;
            if (SQLType.DDL == sqlType) {
//                若是是DDL,建立DDL的prepareStatement對象
                preparedStatements = generatePreparedStatementForDDL(each);
            } else {
//                DDL以外的語句建立prepareStatement對象
                preparedStatements = Collections.singletonList(generatePreparedStatement(each));
            }
//            裝載路由的statement對象
            getRoutedStatements().addAll(preparedStatements);
            for (PreparedStatement preparedStatement : preparedStatements) {
                replaySetParameter(preparedStatement);
                result.add(new PreparedStatementUnit(each, preparedStatement));
            }
        }
        return result;
    }
} else {
//                DDL以外的語句建立prepareStatement對象
                preparedStatements = Collections.singletonList(generatePreparedStatement(each));
            }
private PreparedStatement generatePreparedStatement(final SQLExecutionUnit sqlExecutionUnit) throws SQLException {
        Optional<GeneratedKey> generatedKey = getGeneratedKey();
//        獲取數據庫鏈接
        Connection connection = getShardingConnection().getConnection(sqlExecutionUnit.getDataSource(), getRouteResult().getSqlStatement().getType());
//        建立prepareStatement對象
        if (isReturnGeneratedKeys() || isReturnGeneratedKeys() && generatedKey.isPresent()) {
            return connection.prepareStatement(sqlExecutionUnit.getSql(), RETURN_GENERATED_KEYS);
        }
        return connection.prepareStatement(sqlExecutionUnit.getSql(), getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
    }

獲取數據庫鏈接對象多線程

//        獲取數據庫鏈接
        Connection connection = getShardingConnection().getConnection(sqlExecutionUnit.getDataSource(), getRouteResult().getSqlStatement().getType());
public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {
//        從緩存中獲取數據源鏈接
        Optional<Connection> connection = getCachedConnection(dataSourceName, sqlType);
        if (connection.isPresent()) {
            return connection.get();
        }
        Context metricsContext = MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnection", dataSourceName));
//        根據數據源名稱獲取數據源對象
        DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName);
        Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName);
        String realDataSourceName;
        if (dataSource instanceof MasterSlaveDataSource) {
            dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);
            realDataSourceName = MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);
        } else {
            realDataSourceName = dataSourceName;
        }
        Connection result = dataSource.getConnection();
        MetricsContext.stop(metricsContext);
        connectionMap.put(realDataSourceName, result);
        replayMethodsInvocation(result);
        return result;
    }

向上返回到這裏異步

private Collection<PreparedStatementUnit> route() throws SQLException {
        Collection<PreparedStatementUnit> result = new LinkedList<>();
//        執行sql路由邏輯並獲得路由結果並裝載支持靜態分片的預編譯statement對象
        setRouteResult(routingEngine.route(getParameters()));
//        遍歷最小sql執行單元
        for (SQLExecutionUnit each : getRouteResult().getExecutionUnits()) {
//            獲取sql類型
            SQLType sqlType = getRouteResult().getSqlStatement().getType();
            Collection<PreparedStatement> preparedStatements;
            if (SQLType.DDL == sqlType) {
//                若是是DDL,建立DDL的prepareStatement對象
                preparedStatements = generatePreparedStatementForDDL(each);
            } else {
//                DDL以外的語句建立prepareStatement對象
                preparedStatements = Collections.singletonList(generatePreparedStatement(each));
            }
            getRoutedStatements().addAll(preparedStatements);
            for (PreparedStatement preparedStatement : preparedStatements) {
                replaySetParameter(preparedStatement);
                result.add(new PreparedStatementUnit(each, preparedStatement));
            }
        }
        return result;
    }
//            裝載路由的statement對象
            getRoutedStatements().addAll(preparedStatements);

向上返回到這裏async

@Override
    public boolean execute() throws SQLException {
        try {
            Collection<PreparedStatementUnit> preparedStatementUnits = route();
//            建立預編譯statement的sql執行器
            return new PreparedStatementExecutor(
                    getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).execute();
        } finally {
//            釋放內存
            clearBatch();
        }
    }

進入到這個方法ide

com.dangdang.ddframe.rdb.sharding.executor.type.prepared.PreparedStatementExecutor#execute 執行sql請求函數

/**
 * 執行SQL請求.
 * 
 * @return true表示執行DQL, false表示執行的DML
 */
public boolean execute() {
    Context context = MetricsContext.start("ShardingPreparedStatement-execute");
    try {
        List<Boolean> result = executorEngine.executePreparedStatement(sqlType, preparedStatementUnits, parameters, new ExecuteCallback<Boolean>() {
            
            @Override
            public Boolean execute(final BaseStatementUnit baseStatementUnit) throws Exception {
                return ((PreparedStatement) baseStatementUnit.getStatement()).execute();
            }
        });
        if (null == result || result.isEmpty() || null == result.get(0)) {
            return false;
        }
        return result.get(0);
    } finally {
        MetricsContext.stop(context);
    }
}

進入到執行prepareStatement對象的方法

com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#executePreparedStatement

/**
 * 執行PreparedStatement.
 *
 * @param sqlType SQL類型
 * @param preparedStatementUnits 語句對象執行單元集合
 * @param parameters 參數列表
 * @param executeCallback 執行回調函數
 * @param <T> 返回值類型
 * @return 執行結果
 */
public <T> List<T> executePreparedStatement(
        final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) {
    return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback);
}

com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#execute

進入sql執行引擎的這個方法

private  <T> List<T> execute(
            final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
        if (baseStatementUnits.isEmpty()) {
            return Collections.emptyList();
        }
        Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
//        得到一個sql語句執行單元
        BaseStatementUnit firstInput = iterator.next();
//        異步多線程去執行->
        ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
        T firstOutput;
        List<T> restOutputs;
        try {
//            同步執行->
            firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
//            獲取執行結果
            restOutputs = restFutures.get();
            //CHECKSTYLE:OFF
        } catch (final Exception ex) {
            //CHECKSTYLE:ON
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
        List<T> result = Lists.newLinkedList(restOutputs);
        result.add(0, firstOutput);
        return result;
    }
//        異步多線程去執行->
        ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);

進入到這個方法

com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#asyncExecute

private <T> ListenableFuture<List<T>> asyncExecute(
            final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
        List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
//        是否有異常出現
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
//        執行數據是多線程安全的
        final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
        for (final BaseStatementUnit each : baseStatementUnits) {
//            線程分發執行
            result.add(executorService.submit(new Callable<T>() {
                
                @Override
                public T call() throws Exception {
                    return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
                }
            }));
        }

        return Futures.allAsList(result);
    }

進入這個方法

return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, 
                          final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
//        同一個數據源是串行執行的
        synchronized (baseStatementUnit.getStatement().getConnection()) {
            T result;
            ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
            ExecutorDataMap.setDataMap(dataMap);
            List<AbstractExecutionEvent> events = new LinkedList<>();
            if (parameterSets.isEmpty()) {
//                添加執行事件-》
                events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
            }
            for (List<Object> each : parameterSets) {
//                添加執行事件
                events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
            }
            for (AbstractExecutionEvent event : events) {
//                這裏是事件總線實現,發佈事件
                EventBusInstance.getInstance().post(event);
            }
            try {
//                回調函數獲取回調結果
                result = executeCallback.execute(baseStatementUnit);
            } catch (final SQLException ex) {
//                執行失敗,更新事件,發佈執行失敗的事件
                for (AbstractExecutionEvent each : events) {
                    each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
                    each.setException(Optional.of(ex));
                    EventBusInstance.getInstance().post(each);
                    ExecutorExceptionHandler.handleException(ex);
                }
                return null;
            }
            for (AbstractExecutionEvent each : events) {
//                執行成功,更新事件內容,發佈執行成功事件
                each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
                EventBusInstance.getInstance().post(each);
            }
            return result;
        }
    }

向上返回到這個方法

com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#execute

//            同步執行->
            firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);

 

sql批量執行源碼解析

進入到這個方法

com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement#executeBatch

 

@Override
    public int[] executeBatch() throws SQLException {
        try {
            return new BatchPreparedStatementExecutor(
//                    建立批量statement執行器並執行批量sql
                    getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), batchStatementUnits, parameterSets).executeBatch();
        } finally {
//            釋放內存
            clearBatch();
        }
    }
/**
 * 執行批量SQL.
 * 
 * @return 執行結果
 */
public int[] executeBatch() {
    Context context = MetricsContext.start("ShardingPreparedStatement-executeBatch");
    try {
        return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets, new ExecuteCallback<int[]>() {
            
            @Override
            public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception {
                return baseStatementUnit.getStatement().executeBatch();
            }
        }));
    } finally {
        MetricsContext.stop(context);
    }
}
/**
 * 執行Batch.
 *
 * @param sqlType SQL類型
 * @param batchPreparedStatementUnits 語句對象執行單元集合
 * @param parameterSets 參數列表集
 * @param executeCallback 執行回調函數
 * @return 執行結果
 */
public List<int[]> executeBatch(
        final SQLType sqlType, final Collection<BatchPreparedStatementUnit> batchPreparedStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<int[]> executeCallback) {
    return execute(sqlType, batchPreparedStatementUnits, parameterSets, executeCallback);
}
/**
 * 預編譯語句對象的執行上下文.
 * 
 * @author zhangliang
 */
@RequiredArgsConstructor
@Getter
public final class BatchPreparedStatementUnit implements BaseStatementUnit {
    
//    sql最小執行單元
    private final SQLExecutionUnit sqlExecutionUnit;
    
//    預編譯statement對象
    private final PreparedStatement statement;

最後調用這個方法

private  <T> List<T> execute(
            final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
        if (baseStatementUnits.isEmpty()) {
            return Collections.emptyList();
        }
        Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
//        得到一個sql語句執行單元
        BaseStatementUnit firstInput = iterator.next();
//        異步多線程去執行->
        ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
        T firstOutput;
        List<T> restOutputs;
        try {
//            同步執行->
            firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
//            獲取執行結果
            restOutputs = restFutures.get();
            //CHECKSTYLE:OFF
        } catch (final Exception ex) {
            //CHECKSTYLE:ON
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
        List<T> result = Lists.newLinkedList(restOutputs);
        result.add(0, firstOutput);
        return result;
    }

以上是sql執行邏輯的源碼解析。

 

說到最後

以上內容,僅供參考。

相關文章
相關標籤/搜索