sql執行源碼解析sql
本文轉自「天河聊技術」微信公衆號數據庫
找到這個方法緩存
com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement#execute安全
@Override public boolean execute() throws SQLException { try { Collection<PreparedStatementUnit> preparedStatementUnits = route(); // 建立預編譯statement的sql執行器 return new PreparedStatementExecutor( getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).execute(); } finally { // 釋放內存 clearBatch(); } }
組裝預編譯對象執行單元集合微信
/** * 預編譯語句對象執行單元. * * @author zhangliang */ @RequiredArgsConstructor @Getter public final class PreparedStatementUnit implements BaseStatementUnit { // sql執行單元 private final SQLExecutionUnit sqlExecutionUnit; // 預編譯對象 private final PreparedStatement statement; }
進入到route()方法
private Collection<PreparedStatementUnit> route() throws SQLException { Collection<PreparedStatementUnit> result = new LinkedList<>(); // 執行sql路由邏輯並獲得路由結果並裝載支持靜態分片的預編譯statement對象 setRouteResult(routingEngine.route(getParameters())); // 遍歷最小sql執行單元 for (SQLExecutionUnit each : getRouteResult().getExecutionUnits()) { // 獲取sql類型 SQLType sqlType = getRouteResult().getSqlStatement().getType(); Collection<PreparedStatement> preparedStatements; if (SQLType.DDL == sqlType) { // 若是是DDL,建立DDL的prepareStatement對象 preparedStatements = generatePreparedStatementForDDL(each); } else { // DDL以外的語句建立prepareStatement對象 preparedStatements = Collections.singletonList(generatePreparedStatement(each)); } // 裝載路由的statement對象 getRoutedStatements().addAll(preparedStatements); for (PreparedStatement preparedStatement : preparedStatements) { replaySetParameter(preparedStatement); result.add(new PreparedStatementUnit(each, preparedStatement)); } } return result; }
// 遍歷最小sql執行單元 for (SQLExecutionUnit each : getRouteResult().getExecutionUnits()) {
// 獲取sql類型 SQLType sqlType = getRouteResult().getSqlStatement().getType();
// 若是是DDL,建立DDL的prepareStatement對象 preparedStatements = generatePreparedStatementForDDL(each);
private Collection<PreparedStatement> generatePreparedStatementForDDL(final SQLExecutionUnit sqlExecutionUnit) throws SQLException { Collection<PreparedStatement> result = new LinkedList<>(); // 獲取能夠執行DDL語句的數據庫鏈接對象集合 Collection<Connection> connections = getShardingConnection().getConnectionForDDL(sqlExecutionUnit.getDataSource()); for (Connection each : connections) { // 建立prepareStatement對象 result.add(each.prepareStatement(sqlExecutionUnit.getSql(), getResultSetType(), getResultSetConcurrency(), getResultSetHoldability())); } return result; }
// 獲取能夠執行DDL語句的數據庫鏈接對象集合 Collection<Connection> connections = getShardingConnection().getConnectionForDDL(sqlExecutionUnit.getDataSource());
public Collection<Connection> getConnectionForDDL(final String dataSourceName) throws SQLException { final Context metricsContext = MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnectionForDDL", dataSourceName)); // 從分片規則的數據庫分片規則中獲取數據源 DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName); Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName); Collection<DataSource> dataSources = new LinkedList<>(); if (dataSource instanceof MasterSlaveDataSource) { dataSources.add(((MasterSlaveDataSource) dataSource).getMasterDataSource()); dataSources.addAll(((MasterSlaveDataSource) dataSource).getSlaveDataSources()); } else { dataSources.add(dataSource); } Collection<Connection> result = new LinkedList<>(); for (DataSource each : dataSources) { // 根據數據源獲取數據庫鏈接 Connection connection = each.getConnection(); replayMethodsInvocation(connection);//從新調用調用過的方法動做 result.add(connection); } MetricsContext.stop(metricsContext); return result; }
向上返回到這裏
private Collection<PreparedStatementUnit> route() throws SQLException { Collection<PreparedStatementUnit> result = new LinkedList<>(); // 執行sql路由邏輯並獲得路由結果並裝載支持靜態分片的預編譯statement對象 setRouteResult(routingEngine.route(getParameters())); // 遍歷最小sql執行單元 for (SQLExecutionUnit each : getRouteResult().getExecutionUnits()) { // 獲取sql類型 SQLType sqlType = getRouteResult().getSqlStatement().getType(); Collection<PreparedStatement> preparedStatements; if (SQLType.DDL == sqlType) { // 若是是DDL,建立DDL的prepareStatement對象 preparedStatements = generatePreparedStatementForDDL(each); } else { // DDL以外的語句建立prepareStatement對象 preparedStatements = Collections.singletonList(generatePreparedStatement(each)); } // 裝載路由的statement對象 getRoutedStatements().addAll(preparedStatements); for (PreparedStatement preparedStatement : preparedStatements) { replaySetParameter(preparedStatement); result.add(new PreparedStatementUnit(each, preparedStatement)); } } return result; }
} else { // DDL以外的語句建立prepareStatement對象 preparedStatements = Collections.singletonList(generatePreparedStatement(each)); }
private PreparedStatement generatePreparedStatement(final SQLExecutionUnit sqlExecutionUnit) throws SQLException { Optional<GeneratedKey> generatedKey = getGeneratedKey(); // 獲取數據庫鏈接 Connection connection = getShardingConnection().getConnection(sqlExecutionUnit.getDataSource(), getRouteResult().getSqlStatement().getType()); // 建立prepareStatement對象 if (isReturnGeneratedKeys() || isReturnGeneratedKeys() && generatedKey.isPresent()) { return connection.prepareStatement(sqlExecutionUnit.getSql(), RETURN_GENERATED_KEYS); } return connection.prepareStatement(sqlExecutionUnit.getSql(), getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()); }
獲取數據庫鏈接對象多線程
// 獲取數據庫鏈接 Connection connection = getShardingConnection().getConnection(sqlExecutionUnit.getDataSource(), getRouteResult().getSqlStatement().getType());
public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException { // 從緩存中獲取數據源鏈接 Optional<Connection> connection = getCachedConnection(dataSourceName, sqlType); if (connection.isPresent()) { return connection.get(); } Context metricsContext = MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnection", dataSourceName)); // 根據數據源名稱獲取數據源對象 DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName); Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName); String realDataSourceName; if (dataSource instanceof MasterSlaveDataSource) { dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType); realDataSourceName = MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType); } else { realDataSourceName = dataSourceName; } Connection result = dataSource.getConnection(); MetricsContext.stop(metricsContext); connectionMap.put(realDataSourceName, result); replayMethodsInvocation(result); return result; }
向上返回到這裏異步
private Collection<PreparedStatementUnit> route() throws SQLException { Collection<PreparedStatementUnit> result = new LinkedList<>(); // 執行sql路由邏輯並獲得路由結果並裝載支持靜態分片的預編譯statement對象 setRouteResult(routingEngine.route(getParameters())); // 遍歷最小sql執行單元 for (SQLExecutionUnit each : getRouteResult().getExecutionUnits()) { // 獲取sql類型 SQLType sqlType = getRouteResult().getSqlStatement().getType(); Collection<PreparedStatement> preparedStatements; if (SQLType.DDL == sqlType) { // 若是是DDL,建立DDL的prepareStatement對象 preparedStatements = generatePreparedStatementForDDL(each); } else { // DDL以外的語句建立prepareStatement對象 preparedStatements = Collections.singletonList(generatePreparedStatement(each)); } getRoutedStatements().addAll(preparedStatements); for (PreparedStatement preparedStatement : preparedStatements) { replaySetParameter(preparedStatement); result.add(new PreparedStatementUnit(each, preparedStatement)); } } return result; }
// 裝載路由的statement對象 getRoutedStatements().addAll(preparedStatements);
向上返回到這裏async
@Override public boolean execute() throws SQLException { try { Collection<PreparedStatementUnit> preparedStatementUnits = route(); // 建立預編譯statement的sql執行器 return new PreparedStatementExecutor( getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).execute(); } finally { // 釋放內存 clearBatch(); } }
進入到這個方法ide
com.dangdang.ddframe.rdb.sharding.executor.type.prepared.PreparedStatementExecutor#execute 執行sql請求函數
/** * 執行SQL請求. * * @return true表示執行DQL, false表示執行的DML */ public boolean execute() { Context context = MetricsContext.start("ShardingPreparedStatement-execute"); try { List<Boolean> result = executorEngine.executePreparedStatement(sqlType, preparedStatementUnits, parameters, new ExecuteCallback<Boolean>() { @Override public Boolean execute(final BaseStatementUnit baseStatementUnit) throws Exception { return ((PreparedStatement) baseStatementUnit.getStatement()).execute(); } }); if (null == result || result.isEmpty() || null == result.get(0)) { return false; } return result.get(0); } finally { MetricsContext.stop(context); } }
進入到執行prepareStatement對象的方法
com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#executePreparedStatement
/** * 執行PreparedStatement. * * @param sqlType SQL類型 * @param preparedStatementUnits 語句對象執行單元集合 * @param parameters 參數列表 * @param executeCallback 執行回調函數 * @param <T> 返回值類型 * @return 執行結果 */ public <T> List<T> executePreparedStatement( final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) { return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback); }
com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#execute
進入sql執行引擎的這個方法
private <T> List<T> execute( final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) { if (baseStatementUnits.isEmpty()) { return Collections.emptyList(); } Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator(); // 得到一個sql語句執行單元 BaseStatementUnit firstInput = iterator.next(); // 異步多線程去執行-> ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback); T firstOutput; List<T> restOutputs; try { // 同步執行-> firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback); // 獲取執行結果 restOutputs = restFutures.get(); //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON ExecutorExceptionHandler.handleException(ex); return null; } List<T> result = Lists.newLinkedList(restOutputs); result.add(0, firstOutput); return result; }
// 異步多線程去執行-> ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
進入到這個方法
com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#asyncExecute
private <T> ListenableFuture<List<T>> asyncExecute( final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) { List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size()); // 是否有異常出現 final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); // 執行數據是多線程安全的 final Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); for (final BaseStatementUnit each : baseStatementUnits) { // 線程分發執行 result.add(executorService.submit(new Callable<T>() { @Override public T call() throws Exception { return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap); } })); } return Futures.allAsList(result); }
進入這個方法
return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception { // 同一個數據源是串行執行的 synchronized (baseStatementUnit.getStatement().getConnection()) { T result; ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); ExecutorDataMap.setDataMap(dataMap); List<AbstractExecutionEvent> events = new LinkedList<>(); if (parameterSets.isEmpty()) { // 添加執行事件-》 events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList())); } for (List<Object> each : parameterSets) { // 添加執行事件 events.add(getExecutionEvent(sqlType, baseStatementUnit, each)); } for (AbstractExecutionEvent event : events) { // 這裏是事件總線實現,發佈事件 EventBusInstance.getInstance().post(event); } try { // 回調函數獲取回調結果 result = executeCallback.execute(baseStatementUnit); } catch (final SQLException ex) { // 執行失敗,更新事件,發佈執行失敗的事件 for (AbstractExecutionEvent each : events) { each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE); each.setException(Optional.of(ex)); EventBusInstance.getInstance().post(each); ExecutorExceptionHandler.handleException(ex); } return null; } for (AbstractExecutionEvent each : events) { // 執行成功,更新事件內容,發佈執行成功事件 each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS); EventBusInstance.getInstance().post(each); } return result; } }
向上返回到這個方法
com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#execute
// 同步執行-> firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
sql批量執行源碼解析
進入到這個方法
com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement#executeBatch
@Override public int[] executeBatch() throws SQLException { try { return new BatchPreparedStatementExecutor( // 建立批量statement執行器並執行批量sql getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), batchStatementUnits, parameterSets).executeBatch(); } finally { // 釋放內存 clearBatch(); } }
/** * 執行批量SQL. * * @return 執行結果 */ public int[] executeBatch() { Context context = MetricsContext.start("ShardingPreparedStatement-executeBatch"); try { return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets, new ExecuteCallback<int[]>() { @Override public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception { return baseStatementUnit.getStatement().executeBatch(); } })); } finally { MetricsContext.stop(context); } }
/** * 執行Batch. * * @param sqlType SQL類型 * @param batchPreparedStatementUnits 語句對象執行單元集合 * @param parameterSets 參數列表集 * @param executeCallback 執行回調函數 * @return 執行結果 */ public List<int[]> executeBatch( final SQLType sqlType, final Collection<BatchPreparedStatementUnit> batchPreparedStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<int[]> executeCallback) { return execute(sqlType, batchPreparedStatementUnits, parameterSets, executeCallback); }
/** * 預編譯語句對象的執行上下文. * * @author zhangliang */ @RequiredArgsConstructor @Getter public final class BatchPreparedStatementUnit implements BaseStatementUnit { // sql最小執行單元 private final SQLExecutionUnit sqlExecutionUnit; // 預編譯statement對象 private final PreparedStatement statement;
最後調用這個方法
private <T> List<T> execute( final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) { if (baseStatementUnits.isEmpty()) { return Collections.emptyList(); } Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator(); // 得到一個sql語句執行單元 BaseStatementUnit firstInput = iterator.next(); // 異步多線程去執行-> ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback); T firstOutput; List<T> restOutputs; try { // 同步執行-> firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback); // 獲取執行結果 restOutputs = restFutures.get(); //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON ExecutorExceptionHandler.handleException(ex); return null; } List<T> result = Lists.newLinkedList(restOutputs); result.add(0, firstOutput); return result; }
以上是sql執行邏輯的源碼解析。
說到最後
以上內容,僅供參考。