摘要: 原創出處 www.iocoder.cn/Sharding-JD… 「芋道源碼」歡迎轉載,保留摘要,謝謝!html
本文主要基於 Sharding-JDBC 1.5.0 正式版 java
🙂🙂🙂關注微信公衆號:【芋道源碼】有福利: mysql
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
- 掘金Java QQ 羣:217878901
越過千山萬水(SQL 解析、SQL 路由、SQL 改寫),咱們終於來到了 SQL 執行。開森不開森?!git
本文主要分享SQL 執行的過程,不包括結果聚合。《結果聚合》 東半球第二良心筆者會更新,關注微信公衆號【芋道源碼】完稿後第一時間通知您喲。github
綠框部分 SQL 執行主流程。sql
Sharding-JDBC 正在收集使用公司名單:傳送門。
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門數據庫
ExecutorEngine,SQL執行引擎。編程
分表分庫,須要執行的 SQL 數量從單條變成了多條,此時有兩種方式執行:數組
前者,編碼容易,性能較差,總耗時是多條 SQL 執行時間累加。
後者,編碼複雜,性能較好,總耗時約等於執行時間最長的 SQL。安全
👼 ExecutorEngine 固然採用的是後者,並行執行 SQL。
Guava( Java 工具庫 ) 提供的繼承自 ExecutorService 的線程服務接口,提供建立 ListenableFuture 功能。ListenableFuture 接口,繼承 Future 接口,有以下好處:
咱們強烈地建議你在代碼中多使用ListenableFuture來代替JDK的 Future, 由於:
- 大多數Futures 方法中須要它。
- 轉到ListenableFuture 編程比較容易。
- Guava提供的通用公共類封裝了公共的操做方方法,不須要提供Future和ListenableFuture的擴展方法。
傳統JDK中的Future經過異步的方式計算返回結果:在多線程運算中可能或者可能在沒有結束返回結果,Future是運行中的多線程的一個引用句柄,確保在服務執行返回一個Result。
ListenableFuture能夠容許你註冊回調方法(callbacks),在運算(多線程執行)完成的時候進行調用, 或者在運算(多線程執行)完成後當即執行。這樣簡單的改進,使得能夠明顯的支持更多的操做,這樣的功能在JDK concurrent中的Future是不支持的。
如上內容來自《Google Guava包的ListenableFuture解析
》,文章寫的很棒。下文你會看到 Sharding-JDBC 是如何經過 ListenableFuture 簡化併發編程的。
下面看看 ExecutorEngine 如何初始化 ListeningExecutorService
// ShardingDataSource.java
public ShardingDataSource(final ShardingRule shardingRule, final Properties props) {
// .... 省略部分代碼
shardingProperties = new ShardingProperties(props);
int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
executorEngine = new ExecutorEngine(executorSize);
// .... 省略部分代碼
}
// ExecutorEngine
public ExecutorEngine(final int executorSize) {
executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(
executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
}複製代碼
MoreExecutors#listeningDecorator()
建立 ListeningExecutorService,這樣 #submit()
,#invokeAll()
能夠返回 ListenableFuture。#setNameFormat()
併發編程時,必定要對線程名字作下定義,這樣排查問題會方便不少。MoreExecutors#addDelayedShutdownHook()
,應用關閉時,等待全部任務所有完成再關閉。默認配置等待時間爲 60 秒,建議將等待時間作成可配的。數據源關閉時,會調用 ExecutorEngine 也進行關閉。
// ShardingDataSource.java
@Override
public void close() {
executorEngine.close();
}
// ExecutorEngine
@Override
public void close() {
executorService.shutdownNow();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException ignored) {
}
if (!executorService.isTerminated()) {
throw new ShardingJdbcException("ExecutorEngine can not been terminated");
}
}複製代碼
#shutdownNow()
嘗試使用 Thread.interrupt()
打斷正在執行中的任務,未執行的任務再也不執行。建議打印下哪些任務未執行,由於 SQL 未執行,可能數據未能持久化。#awaitTermination()
由於 #shutdownNow()
打斷不是當即結束,須要一個過程,所以這裏等待了 5 秒。ExecutorEngine 對外暴露 #executeStatement()
,#executePreparedStatement()
,#executeBatch()
三個方法分別提供給 StatementExecutor、PreparedStatementExecutor、BatchPreparedStatementExecutor 調用。而這三個方法,內部調用的都是 #execute()
私有方法。
// ExecutorEngine.java
/** * 執行Statement. * @param sqlType SQL類型 * @param statementUnits 語句對象執行單元集合 * @param executeCallback 執行回調函數 * @param <T> 返回值類型 * @return 執行結果 */
public <T> List<T> executeStatement(final SQLType sqlType, final Collection<StatementUnit> statementUnits, final ExecuteCallback<T> executeCallback) {
return execute(sqlType, statementUnits, Collections.<List<Object>>emptyList(), executeCallback);
}
/** * 執行PreparedStatement. * @param sqlType SQL類型 * @param preparedStatementUnits 語句對象執行單元集合 * @param parameters 參數列表 * @param executeCallback 執行回調函數 * @param <T> 返回值類型 * @return 執行結果 */
public <T> List<T> executePreparedStatement( final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) {
return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback);
}
/** * 執行Batch. * @param sqlType SQL類型 * @param batchPreparedStatementUnits 語句對象執行單元集合 * @param parameterSets 參數列表集 * @param executeCallback 執行回調函數 * @return 執行結果 */
public List<int[]> executeBatch(
final SQLType sqlType, final Collection<BatchPreparedStatementUnit> batchPreparedStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<int[]> executeCallback) {
return execute(sqlType, batchPreparedStatementUnits, parameterSets, executeCallback);
}複製代碼
#execute()
執行過程大致流程以下圖:
/** * 執行 * * @param sqlType SQL 類型 * @param baseStatementUnits 語句對象執行單元集合 * @param parameterSets 參數列表集 * @param executeCallback 執行回調函數 * @param <T> 返回值類型 * @return 執行結果 */
private <T> List<T> execute( final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
if (baseStatementUnits.isEmpty()) {
return Collections.emptyList();
}
Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
BaseStatementUnit firstInput = iterator.next();
// 第二個任務開始全部 SQL任務 提交線程池【異步】執行任務
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
T firstOutput;
List<T> restOutputs;
try {
// 第一個任務【同步】執行任務
firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
// 等待第二個任務開始全部 SQL任務完成
restOutputs = restFutures.get();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
ExecutorExceptionHandler.handleException(ex);
return null;
}
// 返回結果
List<T> result = Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}複製代碼
#executeInternal()
執行任務。private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {
// 【同步】執行任務
return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());
}複製代碼
#executeInternal()
執行任務。private <T> ListenableFuture<List<T>> asyncExecute(
final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
for (final BaseStatementUnit each : baseStatementUnits) {
// 提交線程池【異步】執行任務
result.add(executorService.submit(new Callable<T>() {
@Override
public T call() throws Exception {
return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
}
}));
}
// 返回 ListenableFuture
return Futures.allAsList(result);
}複製代碼
Futures.allAsList(result);
和 restOutputs = restFutures.get();
。神器 Guava 簡化併發編程 的好處就提現出來了。ListenableFuture#get()
當全部任務都成功時,返回全部任務執行結果;當任何一個任務失敗時,立刻拋出異常,無需等待其餘任務執行完成。_😮 Guava 真她喵神器,公衆號:【芋道源碼】會更新 Guava 源碼分享的一個系列喲!老司機還不趕忙上車?_
// ExecutorEngine.java
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
synchronized (baseStatementUnit.getStatement().getConnection()) {
T result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
List<AbstractExecutionEvent> events = new LinkedList<>();
// 生成 Event
if (parameterSets.isEmpty()) {
events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
} else {
for (List<Object> each : parameterSets) {
events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
}
// EventBus 發佈 EventExecutionType.BEFORE_EXECUTE
for (AbstractExecutionEvent event : events) {
EventBusInstance.getInstance().post(event);
}
try {
// 執行回調函數
result = executeCallback.execute(baseStatementUnit);
} catch (final SQLException ex) {
// EventBus 發佈 EventExecutionType.EXECUTE_FAILURE
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
each.setException(Optional.of(ex));
EventBusInstance.getInstance().post(each);
ExecutorExceptionHandler.handleException(ex);
}
return null;
}
// EventBus 發佈 EventExecutionType.EXECUTE_SUCCESS
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(each);
}
return result;
}
}複製代碼
result = executeCallback.execute(baseStatementUnit);
執行回調函數。StatementExecutor,PreparedStatementExecutor,BatchPreparedStatementExecutor 經過傳遞執行回調函數( ExecuteCallback )實現給 ExecutorEngine 實現並行執行。public interface ExecuteCallback<T> {
/** * 執行任務. * * @param baseStatementUnit 語句對象執行單元 * @return 處理結果 * @throws Exception 執行期異常 */
T execute(BaseStatementUnit baseStatementUnit) throws Exception;
}複製代碼
synchronized (baseStatementUnit.getStatement().getConnection())
原覺得 Connection 非線程安全,所以須要用同步,後翻查資料《數據庫鏈接池爲何要創建多個鏈接》,Connection 是線程安全的。等跟張亮大神請教確認緣由後,咱會進行更新。
解答:MySQL、Oracle 的 Connection 實現是線程安全的。數據庫鏈接池實現的 Connection 不必定是線程安全,例如 Druid 的線程池 Connection 非線程安全
FROM github.com/dangdangdot…
druid的數據源的stat這種filter在併發使用同一個connection連接時沒有考慮線程安全的問題,故形成多個線程修改filter中的狀態異常。
改造這個問題時,考慮到mysql驅動在執行statement時對同一個connection是線程安全的。也就是說同一個數據庫連接的會話是串行執行的。故在sjdbc的executor對於多線程執行的狀況也進行了針對數據庫連接級別的同步。故該方案不會下降sjdbc的性能。
同時jdk1.7版本的同步採用了鎖升級技術,在碰撞較低的狀況下開銷也是很小的。
ExecutionEvent 這裏先不解釋,在本文第四節【EventBus】分享。
Executor,執行器,目前一共有三個執行器。不一樣的執行器對應不一樣的執行單元 (BaseStatementUnit)。
執行器類 | 執行器名 | 執行單元 |
---|---|---|
StatementExecutor | 靜態語句對象執行單元 | StatementUnit |
PreparedStatementExecutor | 預編譯語句對象請求的執行器 | PreparedStatementUnit |
BatchPreparedStatementExecutor | 批量預編譯語句對象請求的執行器 | BatchPreparedStatementUnit |
StatementExecutor,多線程執行靜態語句對象請求的執行器,一共有三類方法:
#executeQuery()
// StatementExecutor.java
/** * 執行SQL查詢. * @return 結果集列表 */
public List<ResultSet> executeQuery() {
Context context = MetricsContext.start("ShardingStatement-executeQuery");
List<ResultSet> result;
try {
result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<ResultSet>() {
@Override
public ResultSet execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
} finally {
MetricsContext.stop(context);
}
return result;
}複製代碼
#executeUpdate()
由於有四個不一樣狀況的#executeUpdate()
,因此抽象了 Updater 接口,從而達到邏輯重用。// StatementExecutor.java
/** * 執行SQL更新. * @return 更新數量 */
public int executeUpdate() {
return executeUpdate(new Updater() {
@Override
public int executeUpdate(final Statement statement, final String sql) throws SQLException {
return statement.executeUpdate(sql);
}
});
}
private int executeUpdate(final Updater updater) {
Context context = MetricsContext.start("ShardingStatement-executeUpdate");
try {
List<Integer> results = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<Integer>() {
@Override
public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return updater.executeUpdate(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
return accumulate(results);
} finally {
MetricsContext.stop(context);
}
}
/** * 計算總的更新數量 * @param results 更新數量數組 * @return 更新數量 */
private int accumulate(final List<Integer> results) {
int result = 0;
for (Integer each : results) {
result += null == each ? 0 : each;
}
return result;
}複製代碼
#execute()
由於有四個不一樣狀況的#execute()
,因此抽象了 Executor 接口,從而達到邏輯重用。/** * 執行SQL請求. * @return true表示執行DQL語句, false表示執行的DML語句 */
public boolean execute() {
return execute(new Executor() {
@Override
public boolean execute(final Statement statement, final String sql) throws SQLException {
return statement.execute(sql);
}
});
}
private boolean execute(final Executor executor) {
Context context = MetricsContext.start("ShardingStatement-execute");
try {
List<Boolean> result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<Boolean>() {
@Override
public Boolean execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return executor.execute(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
if (null == result || result.isEmpty() || null == result.get(0)) {
return false;
}
return result.get(0);
} finally {
MetricsContext.stop(context);
}
}複製代碼
PreparedStatementExecutor,多線程執行預編譯語句對象請求的執行器。比 StatementExecutor 多了 parameters
參數,方法邏輯上基本一致,就不重複分享啦。
BatchPreparedStatementExecutor,多線程執行批量預編譯語句對象請求的執行器。
// BatchPreparedStatementExecutor.java
/** * 執行批量SQL. * * @return 執行結果 */
public int[] executeBatch() {
Context context = MetricsContext.start("ShardingPreparedStatement-executeBatch");
try {
return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets, new ExecuteCallback<int[]>() {
@Override
public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return baseStatementUnit.getStatement().executeBatch();
}
}));
} finally {
MetricsContext.stop(context);
}
}
/** * 計算每一個語句的更新數量 * * @param results 每條 SQL 更新數量 * @return 每一個語句的更新數量 */
private int[] accumulate(final List<int[]> results) {
int[] result = new int[parameterSets.size()];
int count = 0;
// 每一個語句按照順序,讀取到其對應的每一個分片SQL影響的行數進行累加
for (BatchPreparedStatementUnit each : batchPreparedStatementUnits) {
for (Map.Entry<Integer, Integer> entry : each.getJdbcAndActualAddBatchCallTimesMap().entrySet()) {
result[entry.getKey()] += null == results.get(count) ? 0 : results.get(count)[entry.getValue()];
}
count++;
}
return result;
}複製代碼
眼尖的同窗會發現,爲何有 BatchPreparedStatementExecutor,而沒有 BatchStatementExecutor 呢?目前 Sharding-JDBC 不支持 Statement 批量操做,只能進行 PreparedStatement 的批操做。
// PreparedStatement 批量操做,不會報錯
PreparedStatement ps = conn.prepareStatement(sql)
ps.addBatch();
ps.addBatch();
// Statement 批量操做,會報錯
ps.addBatch(sql); // 報錯:at com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationStatement.addBatch複製代碼
AbstractExecutionEvent,SQL 執行事件抽象接口。
public abstract class AbstractExecutionEvent {
/** * 事件編號 */
private final String id;
/** * 數據源 */
private final String dataSource;
/** * SQL */
private final String sql;
/** * 參數 */
private final List<Object> parameters;
/** * 事件類型 */
private EventExecutionType eventExecutionType;
/** * 異常 */
private Optional<SQLException> exception;
}複製代碼
AbstractExecutionEvent 有兩個實現子類:
EventExecutionType,事件觸發類型。
那究竟有什麼用途呢? Sharding-JDBC 使用 Guava(沒錯,又是它)的 EventBus 實現了事件的發佈和訂閱。從上文 ExecutorEngine#executeInternal()
咱們能夠看到每一個分片 SQL 執行的過程當中會發布相應事件:
怎麼訂閱事件呢?很是簡單,例子以下:
EventBusInstance.getInstance().register(new Runnable() {
@Override
public void run() {
}
@Subscribe // 訂閱
@AllowConcurrentEvents // 是否容許併發執行,即線程安全
public void listen(final DMLExecutionEvent event) { // DMLExecutionEvent
System.out.println("DMLExecutionEvent:" + event.getSql() + "\t" + event.getEventExecutionType());
}
@Subscribe // 訂閱
@AllowConcurrentEvents // 是否容許併發執行,即線程安全
public void listen2(final DQLExecutionEvent event) { //DQLExecutionEvent
System.out.println("DQLExecutionEvent:" + event.getSql() + "\t" + event.getEventExecutionType());
}
});複製代碼
#register()
任何類均可以,並不是必定須要使用 Runnable 類。此處例子單純由於方便@Subscribe
註解在方法上,實現對事件的訂閱@AllowConcurrentEvents
註解在方法上,表示線程安全,容許併發執行#listen()
訂閱了 DMLExecutionEvent 事件EventBus#post()
發佈事件,同步調用訂閱邏輯Sharding-JDBC 正在收集使用公司名單:傳送門。
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門
BestEffortsDeliveryListener,最大努力送達型事務監聽器。
本文暫時暫時不分析其實現,僅僅做爲另一個訂閱者的例子。咱們會在《柔性事務》進行分享。
public final class BestEffortsDeliveryListener {
@Subscribe
@AllowConcurrentEvents
public void listen(final DMLExecutionEvent event) {
if (!isProcessContinuously()) {
return;
}
SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
switch (event.getEventExecutionType()) {
case BEFORE_EXECUTE:
//TODO 對於批量執行的SQL須要解析成兩層列表
transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
return;
case EXECUTE_SUCCESS:
transactionLogStorage.remove(event.getId());
return;
case EXECUTE_FAILURE:
boolean deliverySuccess = false;
for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
if (deliverySuccess) {
return;
}
boolean isNewConnection = false;
Connection conn = null;
PreparedStatement preparedStatement = null;
try {
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.UPDATE);
if (!isValidConnection(conn)) {
bedSoftTransaction.getConnection().release(conn);
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.UPDATE);
isNewConnection = true;
}
preparedStatement = conn.prepareStatement(event.getSql());
//TODO 對於批量事件須要解析成兩層列表
for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
}
preparedStatement.executeUpdate();
deliverySuccess = true;
transactionLogStorage.remove(event.getId());
} catch (final SQLException ex) {
log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
} finally {
close(isNewConnection, conn, preparedStatement);
}
}
return;
default:
throw new UnsupportedOperationException(event.getEventExecutionType().toString());
}
}
}複製代碼
本文完,但也未完。
跨分片事務問題。例如:
UPDATE t_order SET nickname = ? WHERE user_id = ?複製代碼
A 節點 connection.commit()
時,應用忽然掛了!B節點 connection.commit()
還來不及執行。
咱們一塊兒去《柔性事務》尋找答案。
道友,分享一波朋友圈可好?