sharding-jdbc事務解讀

序言

sharding-jdbc在分庫分表方面提供了很大的便利性,在使用DB的時候,一般都會涉及到事務這個概念,而在分庫分表的環境上再加上事務,就會使事情變得複雜起來。本章試圖剖析sharding-jdbc在事務方面的解決思路。spring

傳統事務回顧

傳統的事務模型以下:sql

Connection conn = getConnection();
try{
    Statement stmt1 = conn.parpareStatement(sql1);
    stmt1.executeUpdate();
    Statement stmt2 = conn.parpareStatement(sql2);
    stmt2.executeUpdate();
    conn.commit();
}catch(Exception e){
    conn.rollback();
}

對於同一個鏈接,能夠執行多條sql語句,任何一條語句出現錯誤的時候,整個操做流程均可以回滾,從而達到事務的原子操做。數據庫

再來看最基本的spring事務操做:緩存

class ServiceA(){
   public void updateA(){...}
}
class ServiceB(){
    public void updateB(){...}
}
@Transactional
class ServiceC(){
    public void updateC(){
        serviceA.updateA();
        serviceB.updateB();
    }
}

咱們知道,當updateC執行的時候,不論是updateA仍是updateB出現了異常,updateC均可以總體回滾,達到原子操做的效果,其主要緣由是updateA和updateB共享了同一個Connection,這是spring底層經過ThreadLocal緩存了Connection實現的。網絡

以上介紹的這兩種狀況都只是針對單庫單表的原子操做,事務的實現並不難理解,那麼在跨庫的狀況下,sharding-jdbc又是如何解決事務問題的呢?異步

shrading-jdbc之弱事務

在官方文檔中,針對弱事務有以下三點說明:分佈式

  • 徹底支持非跨庫事務,例如:僅分表,或分庫可是路由的結果在單庫中。
  • 徹底支持因邏輯異常致使的跨庫事務。例如:同一事務中,跨兩個庫更新。更新完畢後,拋出空指針,則兩個庫的內容都能回滾。
  • 不支持因網絡、硬件異常致使的跨庫事務。例如:同一事務中,跨兩個庫更新,更新完畢後、未提交以前,第一個庫死機,則只有第二個庫數據提交。

爲了理解以上幾點,咱們來看看sharding-jdbc默認是如何處理事務的。ide

這裏寫圖片描述

這是一個很是常見的處理模式,一個總鏈接處理了多條sql語句,最後一次性提交整個事務,每一條sql語句可能會分爲多條子sql分庫分表去執行,這意味着底層可能會關聯多個真正的數據庫鏈接,咱們先來看看若是一切正常,commit會如何去處理。post

public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
    @Override
    public final void commit() throws SQLException {
        Collection<SQLException> exceptions = new LinkedList<>();
        for (Connection each : cachedConnections.values()) {
            try {
                each.commit();
            } catch (final SQLException ex) {
                exceptions.add(ex);
            }
        }
        throwSQLExceptionIfNecessary(exceptions);
    }
}

引擎會遍歷底層全部真正的數據庫鏈接,一個個進行commit操做,若是任何一個出現了異常,直接捕獲異常,可是也只是捕獲而已,而後接着下一個鏈接的commit,這也就很好的說明了,若是在執行任何一條sql語句出現了異常,整個操做是能夠原子性回滾的,由於此時全部鏈接都不會執行commit,但若是已經到了commit這一步的話,若是有鏈接commit失敗了,是不會影響到其餘鏈接的。google

sharding-jdbc之柔性事務

sharding-jdbc的弱事務並非完美的,有時可能會致使數據的一致性問題,因此針對某些特定的場景,又提出了柔性事務的概念。先來看一張官方的說明圖:

這裏寫圖片描述

這裏想表達兩個意思: 
1. 對於sql的執行,在執行前記錄日誌,若是執行成功,把日誌刪除,若是執行失敗,重試必定次數(若是未達到最大嘗試次數便執行成功了,同樣刪除日誌)。 
2. 異步任務不斷掃描執行日誌,若是重試次數未達到最大上限,嘗試從新執行,若是執行成功,刪除日誌。

從上面兩點分析能夠看出,因爲採用的是重試的模式,也就是說同一條語句,是有可能被屢次執行的,因此官方提到了柔性事務的適用場景:

  • 根據主鍵刪除數據。
  • 更新記錄永久狀態,如更新通知送達狀態。

並且它還有必定的限制: SQL須要知足冪等性,具體爲:

  • INSERT語句要求必須包含主鍵,且不能是自增主鍵。
  • UPDATE語句要求冪等,不能是UPDATE xxx SET x=x+1
  • DELETE語句無要求。

在有了一個大概的瞭解以後,咱們來更加深刻的瞭解。

sharding-jdbc使用了google的EventBus事件模型,註冊了一個Listener,監聽器對三種事件進行了處理,以下代碼所示:

switch (event.getEventExecutionType()) {
            case BEFORE_EXECUTE:
                transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), 
                        event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
                return;
            case EXECUTE_SUCCESS: 
                transactionLogStorage.remove(event.getId());
                return;
            case EXECUTE_FAILURE: 
                boolean deliverySuccess = false;
                for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
                    if (deliverySuccess) {
                        return;
                    }
                    boolean isNewConnection = false;
                    Connection conn = null;
                    PreparedStatement preparedStatement = null;
                    try {
                        conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
                        if (!isValidConnection(conn)) {
                            bedSoftTransaction.getConnection().release(conn);
                            conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
                            isNewConnection = true;
                        }
                        preparedStatement = conn.prepareStatement(event.getSql());
                        //TODO for batch event need split to 2-level records
                        for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
                            preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
                        }
                        preparedStatement.executeUpdate();
                        deliverySuccess = true;
                        transactionLogStorage.remove(event.getId());
                    } catch (final SQLException ex) {
                        log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
                    } finally {
                        close(isNewConnection, conn, preparedStatement);
                    }
                }
                return;
            default: 
                throw new UnsupportedOperationException(event.getEventExecutionType().toString());
        }

以上代碼能夠抽取爲以下圖的描述: 
這裏寫圖片描述

監聽器根據三種不一樣的事件類型對事務日誌進行不一樣的操做。有監聽 ,必然就有事件的投遞,那麼引擎是何時產生這些事件的呢? 
咱們知道每一條sql語句拆分後有可能對應多條子sql語句,而每一條子sql語句是單獨執行的,執行是封裝在一個內部方法的:

private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, 
                          final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
        synchronized (baseStatementUnit.getStatement().getConnection()) {
            T result;
            ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
            ExecutorDataMap.setDataMap(dataMap);
            List<AbstractExecutionEvent> events = new LinkedList<>();
            if (parameterSets.isEmpty()) {
                events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
            }
            for (List<Object> each : parameterSets) {
                events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
            }
            for (AbstractExecutionEvent event : events) {
                EventBusInstance.getInstance().post(event);
            }
            try {
                result = executeCallback.execute(baseStatementUnit);
            } catch (final SQLException ex) {
                for (AbstractExecutionEvent each : events) {
                    each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
                    each.setException(Optional.of(ex));
                    EventBusInstance.getInstance().post(each);
                    ExecutorExceptionHandler.handleException(ex);
                }
                return null;
            }
            for (AbstractExecutionEvent each : events) {
                each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
                EventBusInstance.getInstance().post(each);
            }
            return result;
        }
    }

以上代碼能夠簡化爲以下流程:

這裏寫圖片描述

其實執行流程比較簡單,但還有兩個重要的細節這裏沒有體現:

  1. 當使用柔性事務的時候,須要建立事務管理器,並獲取事務對象,調用事務對象的begin開始一個事務,在這一步,會強制設置鏈接的autoCommit=true,這會致使全部的sql語句執時後當即提交,想一想若是能回滾,那柔性事務也就失去了意義。

  2. 當事務執行begin時,會標記當前不拋出異常,這樣當執行sql語句有異常時,會生成相應的EXECUTE_FAILURE事件,從而進行事務日誌處理,而不是往外拋出異常,當事務結束時,調用事務對象的end方法,恢復異常的捕獲。

一個常見的代碼編寫模式以下(來自官方的demo)

private static void updateFailure(final DataSource dataSource) throws SQLException {
        String sql1 = "UPDATE t_order SET status='UPDATE_1' WHERE user_id=10 AND order_id=1000";
        String sql2 = "UPDATE t_order SET not_existed_column=1 WHERE user_id=1 AND order_id=?";
        String sql3 = "UPDATE t_order SET status='UPDATE_2' WHERE user_id=10 AND order_id=1000";
        SoftTransactionManager transactionManager = new SoftTransactionManager(getSoftTransactionConfiguration(dataSource));
        transactionManager.init();
        BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery);
        Connection conn = null;
        try {
            conn = dataSource.getConnection();
            transaction.begin(conn);
            PreparedStatement preparedStatement1 = conn.prepareStatement(sql1);
            PreparedStatement preparedStatement2 = conn.prepareStatement(sql2);
            preparedStatement2.setObject(1, 1000);
            PreparedStatement preparedStatement3 = conn.prepareStatement(sql3);
            preparedStatement1.executeUpdate();
            preparedStatement2.executeUpdate();
            preparedStatement3.executeUpdate();
        } finally {
            transaction.end();
            if (conn != null) {
                conn.close();
            }
        }
    }

看到這個編寫模式,你必定會想,若是我使用MyBatis和spring,這一切可否整合起來,這個話題有興趣你們能夠去嘗試。

總結

分佈式事務處理起來有必定的難度,sharding-jdbc採用了簡單的弱事務模式和特殊場景下的柔性事務模式,沒有最好,只有更好,根據自身業務去選擇事務模式纔是最重要的。

相關文章
相關標籤/搜索