上一篇文章咱們分析了sharding-jdbc 的SQL改寫(改寫),今天咱們分析下sql執行。sql
SQL執行主要分爲2部分:數據庫
/**
* @Author serlain
* @Date 2018/11/14 下午11:21
*/
public class MyShardingStatementTest extends AbstractShardingDatabaseOnlyDBUnitTest {
private ShardingDataSource shardingDataSource;
private static String sql = "SELECT o.order_id FROM t_order o WHERE o.order_id = 4";
@Before
public void init() throws SQLException {
shardingDataSource = getShardingDataSource();
}
@Test
public void testselect() throws SQLException {
try (
Connection connection = shardingDataSource.getConnection();
Statement stmt = connection.createStatement();
ResultSet resultSet = stmt.executeQuery(sql)) {
assertTrue(resultSet.next());
assertThat(resultSet.getLong(1), is(40L));
}
}
}
protected final ShardingDataSource getShardingDataSource() {
if (null != shardingDataSource && !isShutdown) {
return shardingDataSource;
}
isShutdown = false;
DataSourceRule dataSourceRule = new DataSourceRule(createDataSourceMap("dataSource_%s"));
TableRule orderTableRule = TableRule.builder("t_order").dataSourceRule(dataSourceRule).actualTables(Lists.newArrayList("t_order_0", "t_order_1")).generateKeyColumn("order_id", IncrementKeyGenerator.class).build();
ShardingRule shardingRule = ShardingRule.builder().dataSourceRule(dataSourceRule).tableRules(Arrays.asList(orderTableRule))
.databaseShardingStrategy(new DatabaseShardingStrategy(Collections.singletonList("order_id"), new MultipleKeysModuloDatabaseShardingAlgorithm()))
.tableShardingStrategy(new TableShardingStrategy("order_id", new OrderShardingAlgorithm())).build();
shardingDataSource = new ShardingDataSource(shardingRule);
return shardingDataSource;
}
複製代碼
咱們能夠看到ShardingDataSource的構造函數:編程
ShardingRule:分片的規則,外部自定義;ExecutorEngine:執行引擎,內部初始化;ShardingContext:數據源運行期上下文(這個類很關鍵,把SQL執行期間須要的類都貫穿起來,須要就從這個類裏面拿)。緩存
public ShardingDataSource(final ShardingRule shardingRule) {
this(shardingRule, new Properties());
}
public ShardingDataSource(final ShardingRule shardingRule, final Properties props) {
Preconditions.checkNotNull(shardingRule);
Preconditions.checkNotNull(props);
shardingProperties = new ShardingProperties(props);
int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
executorEngine = new ExecutorEngine(executorSize);
try {
shardingContext = new ShardingContext(shardingRule, DatabaseType.valueFrom(getDatabaseProductName(shardingRule)), executorEngine);
} catch (final SQLException ex) {
throw new ShardingJdbcException(ex);
}
}
複製代碼
@Override
public ShardingConnection getConnection() throws SQLException {
MetricsContext.init(shardingProperties);
return new ShardingConnection(shardingContext);
}
複製代碼
@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency) throws SQLException {
return new ShardingStatement(this, resultSetType, resultSetConcurrency);
}
public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency) {
this(shardingConnection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.shardingConnection = shardingConnection;
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
}
複製代碼
@Override
public ResultSet executeQuery(final String sql) throws SQLException {
ResultSet result;
try {
//包含了執行和歸併,咱們今天只分析執行的部分,歸併下一篇文章分析
result = ResultSetFactory.getResultSet(generateExecutor(sql).executeQuery(), routeResult.getSqlStatement());
} finally {
setCurrentResultSet(null);
}
setCurrentResultSet(result);
return result;
}
複製代碼
根據路由改寫後的結果(dataSource),建立SQL Statement;構建StatementUnit(SQL語句到Statement映射);構建StatementExecutor(SQL執行單元)bash
private StatementExecutor generateExecutor(final String sql) throws SQLException {
clearPrevious();
// 路由、改寫後的結果
routeResult = new StatementRoutingEngine(shardingConnection.getShardingContext()).route(sql);
Collection<StatementUnit> statementUnits = new LinkedList<>();
//遍歷SQL最小執行單元,根據dataSourceName獲取datasource,建立ShardingStatement
for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
Statement statement = shardingConnection.getConnection(
each.getDataSource(), routeResult.getSqlStatement().getType()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
replayMethodsInvocation(statement);
//建立StatementUnit:SQL語句-statement 映射
statementUnits.add(new StatementUnit(each, statement));
routedStatements.add(statement);
}
//StatementExecutor:
return new StatementExecutor(shardingConnection.getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), statementUnits);
}
/**
* 根據數據源名稱獲取相應的數據庫鏈接.
*
* @param dataSourceName 數據源名稱
* @param sqlType SQL語句類型
* @return 數據庫鏈接
* @throws SQLException SQL異常
*/
public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {
//緩存的Conn
Optional<Connection> connection = getCachedConnection(dataSourceName, sqlType);
if (connection.isPresent()) {
return connection.get();
}
Context metricsContext = MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnection", dataSourceName));
//根據配置的DataSourceRule,獲取指定name的DataSource
DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName);
Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName);
String realDataSourceName;
//MasterSlaveDataSource
if (dataSource instanceof MasterSlaveDataSource) {
dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);
realDataSourceName = MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);
} else {
realDataSourceName = dataSourceName;
}
//獲取conn
Connection result = dataSource.getConnection();
MetricsContext.stop(metricsContext);
connectionMap.put(realDataSourceName, result);
replayMethodsInvocation(result);
return result;
}
複製代碼
/**
* 執行SQL查詢.
*
* @return 結果集列表
*/
public List<ResultSet> executeQuery() {
Context context = MetricsContext.start("ShardingStatement-executeQuery");
List<ResultSet> result;
try {
//交給executorEngine執行
result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<ResultSet>() {
//使用回調,具體執行的邏輯由外部編寫
@Override
public ResultSet execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
} finally {
MetricsContext.stop(context);
}
return result;
}
複製代碼
/**
* 執行Statement.
*
* @param sqlType SQL類型
* @param statementUnits 語句對象執行單元集合
* @param executeCallback 執行回調函數
* @param <T> 返回值類型
* @return 執行結果
*/
public <T> List<T> executeStatement(final SQLType sqlType, final Collection<StatementUnit> statementUnits, final ExecuteCallback<T> executeCallback) {
return execute(sqlType, statementUnits, Collections.<List<Object>>emptyList(), executeCallback);
}
複製代碼
private <T> List<T> execute(
final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
//須要執行的SQL爲空,直接返回空list
if (baseStatementUnits.isEmpty()) {
return Collections.emptyList();
}
Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
//獲取第一個須要執行的
BaseStatementUnit firstInput = iterator.next();
//剩下的異步執行
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
T firstOutput;
List<T> restOutputs;
try {
//同步執行
firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
//等待
restOutputs = restFutures.get();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
ExecutorExceptionHandler.handleException(ex);
return null;
}
返回
List<T> result = Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}
複製代碼
異步執行的邏輯:多線程
private <T> ListenableFuture<List<T>> asyncExecute(
final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
for (final BaseStatementUnit each : baseStatementUnits) {
result.add(executorService.submit(new Callable<T>() {
@Override
public T call() throws Exception {
return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
}
}));
}
return Futures.allAsList(result);
}
複製代碼
同步執行的邏輯:異步
private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {
return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());
}
複製代碼
executeInternal:async
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback,
final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
synchronized (baseStatementUnit.getStatement().getConnection()) {
T result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
List<AbstractExecutionEvent> events = new LinkedList<>();
//添加執行前事件
if (parameterSets.isEmpty()) {
events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
}
//添加執行前事件
for (List<Object> each : parameterSets) {
events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
//發佈事件:EventBus 單例
for (AbstractExecutionEvent event : events) {
EventBusInstance.getInstance().post(event);
}
//調用executeCallback 執行SQL
try {
result = executeCallback.execute(baseStatementUnit);
} catch (final SQLException ex) {
//發佈執行失敗事件
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
each.setException(Optional.of(ex));
EventBusInstance.getInstance().post(each);
ExecutorExceptionHandler.handleException(ex);
}
return null;
}
//發佈執行成功事件
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(each);
}
return result;
}
}
複製代碼
小尾巴走一波,歡迎關注個人公衆號,不按期分享編程、投資、生活方面的感悟:)分佈式