分佈式事務原理及解決方案

1 引言

分佈式事務是企業集成中的一個技術難點,也是每個分佈式系統架構中都會涉及到的一個東西,特別是在這幾年愈來愈火的微服務架構中,幾乎能夠說是沒法避免,本文就圍繞單機事務,分佈式事務以及分佈式事務的處理方式來展開。java

2 事務

事務提供一種「要麼什麼都不作,要麼作全套(All or Nothing)」的機制,她有ACID四大特性mysql

  • 原子性(Atomicity):事務做爲一個總體被執行,包含在其中的對數據庫的操做要麼所有被執行,要麼都不執行。
  • 一致性(Consistency):事務應確保數據庫的狀態從一個一致狀態轉變爲另外一個一致狀態。一致狀態是指數據庫中的數據應知足完整性約束。除此以外,一致性還有另一層語義,就是事務的中間狀態不能被觀察到(這層語義也有說應該屬於原子性)。
  • 隔離性(Isolation):多個事務併發執行時,一個事務的執行不該影響其餘事務的執行,如同只有這一個操做在被數據庫所執行同樣。
  • 持久性(Durability):已被提交的事務對數據庫的修改應該永久保存在數據庫中。在事務結束時,此操做將不可逆轉。

2.1 單機事務

以mysql的InnoDB存儲引擎爲例,來了解單機事務是如何保證ACID特性的。 git

Image text
事務的隔離性是經過數據庫鎖的機制實現的,持久性經過redo log(重作日誌)來實現,原子性和一致性經過Undo log來實現。

2.2 分佈式事務

單機事務是經過將操做限制在一個會話內經過數據庫自己的鎖以及日誌來實現ACID,那麼分佈式環境下該如何保證ACID特性那?github

2.2.1 XA協議實現分佈式事務

2.2.1.1 XA描述

X/Open DTP(X/Open Distributed Transaction Processing Reference Model) 是X/Open 這個組織定義的一套分佈式事務的標準,也就是了定義了規範和API接口,由各個廠商進行具體的實現。 X/Open DTP 定義了三個組件: AP,TM,RM
web

Image text

  • AP(Application Program):也就是應用程序,能夠理解爲使用DTP的程序
  • RM(Resource Manager):資源管理器,這裏能夠理解爲一個DBMS系統,或者消息服務器管理系統,應用程序經過資源管理器對資源進行控制。資源必須實現XA定義的接口
  • TM(Transaction Manager):事務管理器,負責協調和管理事務,提供給AP應用程序編程接口以及管理資源管理器

其中在DTP定義瞭如下幾個概念spring

  • 事務:一個事務是一個完整的工做單元,由多個獨立的計算任務組成,這多個任務在邏輯上是原子的
  • 全局事務:對於一次性操做多個資源管理器的事務,就是全局事務
  • 分支事務:在全局事務中,某一個資源管理器有本身獨立的任務,這些任務的集合做爲這個資源管理器的分支任務
  • 控制線程:用來表示一個工做線程,主要是關聯AP,TM,RM三者的一個線程,也就是事務上下文環境。簡單的說,就是須要標識一個全局事務以及分支事務的關係

若是一個事務管理器管理着多個資源管理器,DTP是經過兩階段提交協議來控制全局事務和分支事務。sql

  • 第一階段:準備階段 事務管理器通知資源管理器準備分支事務,資源管理器告之事務管理器準備結果
  • 第二階段:提交階段 事務管理器通知資源管理器提交分支事務,資源管理器告之事務管理器結果
2.2.1.2 XA的ACID特性
  • 原子性:XA議使用2PC原子提交協議來保證分佈式事務原子性
  • 隔離性:XA要求每一個RMs實現本地的事務隔離,子事務的隔離來保證整個事務的隔離。
  • 一致性:經過原子性、隔離性以及自身一致性的實現來保證「數據庫從一個一致狀態轉變爲另外一個一致狀態」;經過MVCC來保證中間狀態不能被觀察到。
2.2.1.3 XA的優缺點
  • 優勢:
    對業務無侵入,對RM要求高
  • 缺點:
    同步阻塞:在二階段提交的過程當中,全部的節點都在等待其餘節點的響應,沒法進行其餘操做。這種同步阻塞極大的限制了分佈式系統的性能。

    單點問題:協調者在整個二階段提交過程當中很重要,若是協調者在提交階段出現問題,那麼整個流程將沒法運轉。更重要的是,其餘參與者將會處於一直鎖定事務資源的狀態中,而沒法繼續完成事務操做。

    數據不一致:假設當協調者向全部的參與者發送commit請求以後,發生了局部網絡異常,或者是協調者在還沒有發送完全部 commit請求以前自身發生了崩潰,致使最終只有部分參與者收到了commit請求。這將致使嚴重的數據不一致問題。

    容錯性很差:若是在二階段提交的提交詢問階段中,參與者出現故障,致使協調者始終沒法獲取到全部參與者的確認信息,這時協調者只能依靠其自身的超時機制,判斷是否須要中斷事務。顯然,這種策略過於保守。換句話說,二階段提交協議沒有設計較爲完善的容錯機制,任意一個節點是失敗都會致使整個事務的失敗。

2.2.2 TCC協議實現分佈式事務

2.2.2.1 TCC描述

TCC(Try-Confirm-Cancel)分佈式事務模型相對於 XA 等傳統模型,其特徵在於它不依賴資源管理器(RM)對分佈式事務的支持,而是經過對業務邏輯的分解來實現分佈式事務。
數據庫

Image text

  • 第一階段:CanCommit

3PC的CanCommit階段其實和2PC的準備階段很像。協調者向參與者發送commit請求,參與者若是能夠提交就返回Yes響應,不然返回No響應。
事務詢問:協調者向參與者發送CanCommit請求。詢問是否能夠執行事務提交操做。而後開始等待參與者的響應
響應反饋:參與者接到CanCommit請求以後,正常狀況下,若是其自身認爲能夠順利執行事務,則返回Yes響應,並進入預備狀態;不然反饋No。apache

  • 第二階段:PreCommit
    協調者在獲得全部參與者的響應以後,會根據結果執行2種操做:執行事務預提交,或者中斷事務

執行事務預提交
發送預提交請求:協調者向全部參與者節點發出 preCommit 的請求,並進入 prepared 狀態。
事務預提交:參與者受到 preCommit 請求後,會執行事務操做,對應 2PC 準備階段中的 「執行事務」,也會 Undo 和 Redo 信息記錄到事務日誌中。
各參與者響應反饋:若是參與者成功執行了事務,就反饋 ACK 響應,同時等待指令:提交(commit) 或終止(abort)編程

中斷事務
發送中斷請求:協調者向全部參與者節點發出 abort 請求 。
中斷事務:參與者若是收到 abort 請求或者超時了,都會中斷事務。

  • 第三階段:Do Commit
    該階段進行真正的事務提交,也能夠分爲如下兩種狀況

執行提交
發送提交請求:協調者接收到各參與者發送的ACK響應,那麼他將從預提交狀態進入到提交狀態。並向全部參與者發送 doCommit 請求。
事務提交:參與者接收到 doCommit 請求以後,執行正式的事務提交。並在完成事務提交以後釋放全部事務資源。
響應反饋:事務提交完以後,向協調者發送 ACK 響應。
完成事務:協調者接收到全部參與者的 ACK 響應以後,完成事務。

中斷事務
協調者沒有接收到參與者發送的 ACK 響應(多是接受者發送的不是ACK響應,也可能響應超時),那麼就會執行中斷事務。
發送中斷請求:協調者向全部參與者發送 abort 請求。
事務回滾:參與者接收到 abort 請求以後,利用其在階段二記錄的 undo 信息來執行事務的回滾操做,並在完成回滾以後釋放全部的事務資源。
反饋結果:參與者完成事務回滾以後,向協調者發送 ACK 消息。
中斷事務:協調者接收到參與者反饋的 ACK 消息以後,完成事務的中斷。

2.2.2.2 TCC的ACID特性
  • 原子性:TCC 模型也使用 2PC 原子提交協議來保證事務原子性。Try 操做對應2PC 的一階段準備(Prepare);Confirm 對應 2PC 的二階段提交(Commit),Cancel 對應 2PC 的二階段回滾(Rollback),能夠說 TCC 就是應用層的 2PC。
  • 隔離性:隔離的本質是控制併發,放棄在數據庫層面加鎖經過在業務層面加鎖來實現。【好比在帳戶管理模塊設計中,增長可用餘額和凍結金額的設置】
  • 一致性:經過原子性保證事務的原子提交、業務隔離性控制事務的併發訪問,實現分佈式事務的一致性狀態轉變;事務的中間狀態不能被觀察到這點並不保證[本協議是基於柔性事務理論提出的]。
2.2.2.3 TCC的優缺點
  • 優勢:
    相對於二階段提交,三階段提交主要解決的單點故障問題,並減小了阻塞的時間。由於一旦參與者沒法及時收到來自協調者的信息以後,他會默認執行 commit。而不會一直持有事務資源並處於阻塞狀態。
  • 缺點:
    三階段提交也會致使數據一致性問題。因爲網絡緣由,協調者發送的 Cancel 響應沒有及時被參與者接收到,那麼參與者在等待超時以後執行了 commit 操做。這樣就和其餘接到 Cancel 命令並執行回滾的參與者之間存在數據不一致的狀況。

2.2.3 SAGA協議實現分佈式事務

2.2.3.1 SAGA協議介紹

Saga的組成:

  • 每一個Saga由一系列sub-transaction Ti 組成
  • 每一個Ti 都有對應的補償動做Ci,補償動做用於撤銷Ti形成的結果

saga的執行順序有兩種:

  • T1, T2, T3, ..., Tn
  • T1, T2, ..., Tj, Cj,..., C2, C1,其中0 < j < n

Saga定義了兩種恢復策略:

  • backward recovery,向後恢復,即上面提到的第二種執行順序,其中j是發生錯誤的sub-transaction,這種作法的效果是撤銷掉以前全部成功的sub-transation,使得整個Saga的執行結果撤銷。
  • forward recovery,向前恢復,適用於必需要成功的場景,執行順序是相似於這樣的:T1, T2, ..., Tj(失敗), Tj(重試),..., Tn,其中j是發生錯誤的sub-transaction。該狀況下不須要Ci。

Saga的注意事項

  • Ti和Ci是冪等的。舉個例子,假設在執行Ti的時候超時了,此時咱們是不知道執行結果的,若是採用forward recovery策略就會再次發送Ti,那麼就有可能出現Ti被執行了兩次,因此要求Ti冪等。若是採用backward recovery策略就會發送Ci,而若是Ci也超時了,就會嘗試再次發送Ci,那麼就有可能出現Ci被執行兩次,因此要求Ci冪等。
  • Ci必須是可以成功的,若是沒法成功則須要人工介入。若是Ci不能執行成功就意味着整個Saga沒法徹底撤銷,這個是不容許的。但總會出現一些特殊狀況好比Ci的代碼有bug、服務長時間崩潰等,這個時候就須要人工介入了
  • Ti - Ci和Ci - Ti的執行結果必須是同樣的:sub-transaction被撤銷了。舉例說明,仍是考慮Ti執行超時的場景,咱們採用了backward recovery,發送一個Ci,那麼就會有三種狀況:
    1:Ti的請求丟失了,服務以前沒有、以後也不會執行Ti
    2:Ti在Ci以前執行
    3:Ci在Ti以前執行
    對於第1種狀況,容易處理。對於第二、3種狀況,則要求Ti和Ci是可交換的(commutative),而且其最終結果都是sub-transaction被撤銷。

Saga架構

Image text

  • Saga Execution Component解析請求JSON並構建請求圖
  • TaskRunner 用任務隊列確保請求的執行順序
  • TaskConsumer 處理Saga任務,將事件寫入saga log,並將請求發送到遠程服務
2.2.3.2 SAGA的ACID特性
  • 原子性:經過SAGA協調器實現
  • 一致性:本地事務+SAGA Log
  • 持久性:SAGA Log
  • 隔離性:不保證(同TCC)

3 分佈式事務的處理方案

3.1 XA

僅在同一個事務上下文中須要協調多種資源(即數據庫,以及消息主題或隊列)時,纔有必要使用 X/Open XA 接口。數據庫接入XA須要使用XA版的數據庫驅動,消息隊列要實現XA須要實現javax.transaction.xa.XAResource接口。

3.1.1 jotm的分佈式事務

代碼以下:

public class UserService {
    @Autowired
    private UserDao userDao;
    @Autowired
    private LogDao logDao;
    @Transactional
    public void save(User user){
        userDao.save(user);
        logDao.save(user);
        throw new RuntimeException();
    }
}
@Resource
public class UserDao {
    @Resource(name="jdbcTemplateA")
    private JdbcTemplate jdbcTemplate;
    public void save(User user){
        jdbcTemplate.update("insert into user(name,age) values(?,?)",user.getName(),user.getAge());
    }
}
@Repository
public class LogDao {
    @Resource(name="jdbcTemplateB")
    private JdbcTemplate jdbcTemplate;
    public void save(User user){
        jdbcTemplate.update("insert into log(name,age) values(?,?)",user.getName(),user.getAge());
    }
}
複製代碼

配置:

<bean id="jotm" class="org.objectweb.jotm.Current" />
    <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="userTransaction" ref="jotm" />
    </bean>
    <tx:annotation-driven transaction-manager="transactionManager"/>
    <!-- 配置數據源 -->
    <bean id="dataSourceA" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
        <property name="dataSource">
            <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
                <property name="transactionManager" ref="jotm" />
                <property name="driverName" value="com.mysql.jdbc.Driver" />
                <property name="url" value="jdbc:mysql://localhost:3306/test?useUnicode=true&amp;characterEncoding=utf-8" />
            </bean>
        </property>
        <property name="user" value="xxx" />
        <property name="password" value="xxx" />
    </bean>
    <!-- 配置數據源 -->
    <bean id="dataSourceB" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
        <property name="dataSource">
            <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
                <property name="transactionManager" ref="jotm" />
                <property name="driverName" value="com.mysql.jdbc.Driver" />
                <property name="url" value="jdbc:mysql://localhost:3306/test2?useUnicode=true&amp;characterEncoding=utf-8" />
            </bean>
        </property>
        <property name="user" value="xxx" />
        <property name="password" value="xxx" />
    </bean>
    <bean id="jdbcTemplateA" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSourceA" />
    </bean>
    <bean id="jdbcTemplateB" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSourceB" />
    </bean>
複製代碼

使用到的JAR包:

compile 'org.ow2.jotm:jotm-core:2.3.1-M1'
  compile 'org.ow2.jotm:jotm-datasource:2.3.1-M1'
  compile 'com.experlog:xapool:1.5.0'
複製代碼

事務配置: 咱們知道分佈式事務中須要一個事務管理器即接口javax.transaction.TransactionManager、面向開發人員的javax.transaction.UserTransaction。對於jotm來講,他們的實現類都是Current

public class Current implements UserTransaction, TransactionManager

咱們若是想使用分佈式事務的同時,又想使用Spring帶給咱們的@Transactional便利,就須要配置一個JtaTransactionManager,而該JtaTransactionManager是須要一個userTransaction實例的,因此用到了上面的Current,以下配置:

<bean id="jotm" class="org.objectweb.jotm.Current" />
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">  
    <property name="userTransaction" ref="jotm" />  
</bean>
<tx:annotation-driven transaction-manager="transactionManager"/>
複製代碼

同時上述StandardXADataSource是須要一個TransactionManager實例的,因此上述StandardXADataSource配置把jotm加了進去.

執行過程:

  • 第一步:事務攔截器開啓事務
    咱們知道加入了@Transactional註解,同時開啓tx:annotation-driven,會對本對象進行代理,加入事務攔截器。在事務攔截器中,獲取javax.transaction.UserTransaction,這裏即org.objectweb.jotm.Current,而後使用它開啓事務,並和當前線程進行綁定,綁定關係數據存放在org.objectweb.jotm.Current中。
  • 第二步:使用jdbcTemplate進行業務操做
    dbcTemplateA要從dataSourceA中獲取Connection,和當前線程進行綁定,同時以對應的dataSourceA做爲key。同時判斷當前線程是否含有事務,經過dataSourceA中的org.objectweb.jotm.Current發現當前線程有事務,則把Connection自動提交設置爲false,同時將該鏈接歸入當前事務中。
    jdbcTemplateB要從dataSourceB中獲取Connection,和當前線程進行綁定,同時以對應的dataSourceB做爲key。同時判斷當前線程是否含有事務,經過dataSourceB中的org.objectweb.jotm.Current發現當前線程有事務,則把Connection自動提交設置爲false,同時將該鏈接歸入當前事務中。
  • 第三步:異常回滾 一旦拋出異常,則須要進行事務的回滾操做。回滾就是將當前事務進行回滾,該事務的回滾會調用和它關聯的全部Connection的回滾。

3.1.2 Atomikos的分佈式事務

代碼同上,配置爲:

<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="300" />
    </bean>

    <bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="userTransaction" ref="atomikosUserTransaction" />
    </bean>

    <tx:annotation-driven transaction-manager="springTransactionManager"/>

    <!-- 配置數據源 -->
    <bean id="dataSourceC" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="XA1DBMS" />
        <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="URL">jdbc:mysql://localhost:3306/test?useUnicode=true&amp;characterEncoding=utf-8</prop>
                <prop key="user">xxx</prop>
                <prop key="password">xxx</prop>
            </props>
        </property>
        <property name="poolSize" value="3" />
        <property name="minPoolSize" value="3" />
        <property name="maxPoolSize" value="5" />
    </bean>

    <!-- 配置數據源 -->
    <bean id="dataSourceD" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="XA2DBMS" />
        <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="URL">jdbc:mysql://localhost:3306/test2?useUnicode=true&amp;characterEncoding=utf-8</prop>
                <prop key="user">xxx</prop>
                <prop key="password">xxx</prop>
            </props>
        </property>
        <property name="poolSize" value="3" />
        <property name="minPoolSize" value="3" />
        <property name="maxPoolSize" value="5" />
    </bean>

    <bean id="jdbcTemplateC" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSourceC" />
    </bean>

    <bean id="jdbcTemplateD" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSourceD" />
    </bean>
複製代碼

事務配置:
咱們知道分佈式事務中須要一個事務管理器即接口javax.transaction.TransactionManager、面向開發人員的javax.transaction.UserTransaction。對於Atomikos來講分別對應以下:

  • com.atomikos.icatch.jta.UserTransactionImp
  • com.atomikos.icatch.jta.UserTransactionManager 咱們若是想使用分佈式事務的同時,又想使用Spring帶給咱們的@Transactional便利,就須要配置一個JtaTransactionManager,而該JtaTransactionManager是須要一個userTransaction實例的
<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">  
    <property name="transactionTimeout" value="300" />  
</bean>
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">  
    <property name="userTransaction" ref="userTransaction" />   
</bean>
<tx:annotation-driven transaction-manager="springTransactionManager"/>
複製代碼

能夠對比下jotm的案例配置jotm的分佈式事務配置。能夠看到jotm中使用的xapool中的StandardXADataSource是須要一個transactionManager的,而Atomikos使用的AtomikosNonXADataSourceBean則不須要。咱們知道,StandardXADataSource中有了transactionManager就能夠獲取當前線程的事務,同時把XAResource加入進當前事務中去,而AtomikosNonXADataSourceBean卻沒有,它是怎麼把XAResource加入進當前線程綁定的事務呢?這時候就須要能夠經過靜態方法隨時獲取當前線程綁定的事務。 使用到的JAR包:

compile 'com.atomikos:transactions-jdbc:4.0.0M4'
複製代碼

3.2 單機事務+同步回調(異步)

以訂單子系統和支付子系統爲例,以下圖:

Image text

如上圖,payment是支付系統,trade是訂單系統,兩個系統對應的數據庫是分開的。支付完成以後,支付系統須要通知訂單系統狀態變動。
對於payment要執行的操做能夠用僞代碼表示以下:

begin tx;
  count = update account set amount = amount - ${cash} where uid = ${uid} and amount >= amount if (count <= 0) return false update payment_record set status = paid where trade_id = ${tradeId}
commit;
複製代碼

對於trade要執行的操做能夠用僞代碼表示以下:

begin tx;
  count = update trade_record set status = paid where trade_id = ${trade_id} and status = unpaid if (count <= 0) return false do other things ... commit;
複製代碼

可是對於這兩段代碼如何串起來是個問題,咱們增長一個事務表,即圖中的tx_info,來記錄成功完成的支付事務,tx_info中須要有能夠標示被支付系統處理狀態的字段,爲了和支付信息一致,須要放入事務中,代碼以下:

begin tx;
  count = update account set amount = amount - ${cash} where uid = ${uid} and amount >= amount if (count <= 0) return false update payment_record set status = paid where trade_id = ${tradeId}
    insert into tx_info values(${trade_id},${amount}...) commit;
複製代碼

支付系統邊界到此爲止,接下來就是訂單系統輪詢訪問tx_info,拉取已經支付成功的訂單信息,對每一條信息都執行trade系統的邏輯,僞代碼以下:

foreach trade_id in tx_info
  do trade_tx
  save tx_info.id to some store
複製代碼

事無延遲取決於時間程序輪詢間隔,這樣咱們作到了一致性,最終訂單都會在支付以後的最大時間間隔內完成狀態遷移。
固然,這裏也能夠採用支付系統經過RPC方式同步通知訂單系統的方式來實現,處理狀態經過tx_info中的字段來表示。
另外,交易系統每次拉取數據的起點以及消費記錄須要記錄下來,這樣才能不遺漏不重複地執行,因此須要增長一張表用於排重,即上圖中的tx_duplication。可是每次對tx_duplication表的插入要在trade_tx的事務中完成,僞代碼以下:

begin tx;
  c = insert ignore tx_duplication values($trade_id...) if (c <= 0) return false count = update trade_record set status = paid where trade_id = ${trade_id} and status = unpaid if (count <= 0) return false do other things ... commit;
複製代碼

另外,tx_duplication表中trade_id表上必須有惟一鍵,這個算是結合以前的冪等篇來保證trade_tx的操做是冪等的。

3.3 MQ作中間表角色

在上面的方案中,tx_info表所起到的做用就是隊列做用,記錄一個系統的表更,做爲通知給須要感知的系統的事件。而時間程序去拉取只是系統去獲取感興趣事件的一個方式,而對應交易系統的本地事務只是對應消費事件的一個過程。在這樣的描述下,這些功能就是一個MQ——消息中間件。以下圖

Image text

這樣tx_info表的功能就交給了MQ,消息消費的偏移量也不須要關心了,MQ會搞定的,可是tx_duplication仍是必須存在的,由於MQ並不能避免消息的重複投遞,這其中的緣由有不少,主要是仍是分佈式的CAP形成的,再次不詳細描述。
這要求MQ必須支持事務功能,能夠達到本地事務和消息發出是一致性的,可是沒必要是強一致的。一般使用的方式以下的僞代碼:

sendPrepare();
  isCommit = local_tx()
  if (isCommit) sendCommit()
    else sendRollback()
複製代碼

在作本地事務以前,先向MQ發送一個prepare消息,而後執行本地事務,本地事務提交成功的話,向MQ發送一個commit消息,不然發送一個abort消息,取消以前的消息。MQ只會在收到commit確認纔會將消息投遞出去,因此這樣的形式能夠保證在一切正常的狀況下,本地事務和MQ能夠達到一致性。
可是分佈式存在異常狀況,網絡超時,機器宕機等等,好比當系統執行了local_tx()成功以後,還沒來得及將commit消息發送給MQ,或者說發送出去了,網絡超時了等等緣由,MQ沒有收到commit,即commit消息丟失了,那麼MQ就不會把prepare消息投遞出去。若是這個沒法保證的話,那麼這個方案是不可行的。針對這種狀況,須要一個第三方異常校驗模塊來對MQ中在必定時間段內沒有commit/abort 的消息和發消息的系統進行檢查,確認該消息是否應該投遞出去或者丟棄,獲得系統的確認以後,MQ會作投遞仍是丟棄,這樣就徹底保證了MQ和發消息的系統的一致性,從而保證了接收消息系統的一致性。
這個方案要求MQ的系統可用性必須很是高,至少要超過使用MQ的系統(推薦rocketmq,kafka都支持發送預備消息和業務回查),這樣才能保證依賴他的系統能穩定運行。

3.4 SAGA方案

項目地址:github.com/apache/serv… Saga處理場景是要求相關的子事務提供事務處理函數同時也提供補償函數。Saga協調器alpha會根據事務的執行狀況向omega發送相關的指令,肯定是否向前重試或者向後恢復。

成功場景

成功場景下,每一個事務都會有開始和有對應的結束事件。

Image text

異常場景

異常場景下,omega會向alpha上報中斷事件,而後alpha會向該全局事務的其它已完成的子事務發送補償指令,確保最終全部的子事務要麼都成功,要麼都回滾。

Image text

超時場景

超時場景下,已超時的事件會被alpha的按期掃描器檢測出來,與此同時,該超時事務對應的全局事務也會被中斷。

Image text

例子

假設要租車、預訂酒店知足分佈式事務。
租車服務

@Service
class CarBookingService {
  private Map<Integer, CarBooking> bookings = new ConcurrentHashMap<>();

  @Compensable(compensationMethod = "cancel")
  void order(CarBooking booking) {
    booking.confirm();
    bookings.put(booking.getId(), booking);
  }

  void cancel(CarBooking booking) {
    Integer id = booking.getId();
    if (bookings.containsKey(id)) {
      bookings.get(id).cancel();
    }
  }

  Collection<CarBooking> getAllBookings() {
    return bookings.values();
  }

  void clearAllBookings() {
    bookings.clear();
  }
}
複製代碼

酒店預訂

@Service
class HotelBookingService {
  private Map<Integer, HotelBooking> bookings = new ConcurrentHashMap<>();

  @Compensable(compensationMethod = "cancel")
  void order(HotelBooking booking) {
    if (booking.getAmount() > 2) {
      throw new IllegalArgumentException("can not order the rooms large than two");
    }
    booking.confirm();
    bookings.put(booking.getId(), booking);
  }

  void cancel(HotelBooking booking) {
    Integer id = booking.getId();
    if (bookings.containsKey(id)) {
      bookings.get(id).cancel();
    }
  }

  Collection<HotelBooking> getAllBookings() {
    return bookings.values();
  }

  void clearAllBookings() {
    bookings.clear();
  }
}
複製代碼

主服務

@RestController
public class BookingController {

  @Value("${car.service.address:http://car.servicecomb.io:8080}")
  private String carServiceUrl;

  @Value("${hotel.service.address:http://hotel.servicecomb.io:8080}")
  private String hotelServiceUrl;

  @Autowired
  private RestTemplate template;

  @SagaStart
  @PostMapping("/booking/{name}/{rooms}/{cars}")
  public String order(@PathVariable String name, @PathVariable Integer rooms, @PathVariable Integer cars) {
    template.postForEntity(
        carServiceUrl + "/order/{name}/{cars}",
        null, String.class, name, cars);

    postCarBooking();

    template.postForEntity(
        hotelServiceUrl + "/order/{name}/{rooms}",
        null, String.class, name, rooms);

    postBooking();

    return name + " booking " + rooms + " rooms and " + cars + " cars OK";
  }

  // This method is used by the byteman to inject exception here
  private void postCarBooking() {

  }

  // This method is used by the byteman to inject the faults such as the timeout or the crash
  private void postBooking() {

  }
}
複製代碼

執行流程

  • 在Alpha目錄執行 mvn clean package -DskipTests -Pdemo
  • 執行 java -Dspring.profiles.active=prd -D"spring.datasource.url=jdbc:postgresql://{host_address}:5432/saga?useSSL=false" -jar alpha-server-{saga_version}-exec.jar
  • 在saga spring demo目錄執行 mvn clean package -DskipTests -Pdemo
  • java -Dserver.port=8081 -Dalpha.cluster.address={alpha_address}:8080 -jar hotel-{saga_version}-exec.jar
  • java -Dserver.port=8082 -Dalpha.cluster.address={alpha_address}:8080 -jar car-{saga_version}-exec.jar
  • java -Dserver.port=8083 -Dalpha.cluster.address={alpha_address}:8080 -Dcar.service.address={host_address}:8082 -Dhotel.service.address={host_address}:8081  -jar booking-{saga_version}-exec.jar[alpha_address不帶http其餘地址要帶上http]

3.5 TCC方案

項目地址https://github.com/QNJR-GROUP/EasyTransaction[對比tcc-transaction,Hmily,ByteTCC來講EasyTransaction性能最好,壓測未發現錯誤], 固然你也可使用上面提到的SAGA項目,也是支持TCC協議的。下面咱們舉個例子來看TCC是如何處理業務邏輯的。

eg:訂單支付

  • 1:訂單服務->修改訂單狀態
  • 2:庫存服務->扣減庫存
  • 3:積分服務->增長積分
  • 4:倉庫服務->建立出庫單

try階段

  • 1:訂單服務->狀態變動爲「UpDating」
  • 2:庫存服務->可用庫存減小1,凍結庫存增長1
  • 3:積分服務->積分不變,增長預備積分
  • 4:倉庫服務->建立出庫單,狀態設置爲「UnKnown」

confirm階段

  • 1:訂單服務->狀態變動爲「已支付」
  • 2:庫存服務->凍結庫存清零
  • 3:積分服務->積分增長,預備積分清零
  • 4:倉庫服務->狀態設置爲「出庫單已建立」

cancel階段

  • 1:訂單服務->狀態變動爲「已取消」
  • 2:庫存服務->可用庫存增長,凍結庫存清零
  • 3:積分服務->預備積分清零
  • 4:倉庫服務->狀態設置爲「已取消」

4 小結

基本概念 優勢 缺點
本地事務。事務由資源管理器(如DBMS)本地管理 嚴格的ACID 不具有分佈事務處理能力
全局事務(DTP模型)
TX協議:應用或應用服務器與事務管理器的接口
XA協議:全局事務管理器與資源管理器的接口
嚴格的ACID 效率很是低
JTA:面向應用、應用服務器與資源管理器的高層事務接口
JTS:JTA事務管理器的實現標準,向上支持JTA,向下經過CORBA OTS實現跨事務域的互操做性
EJB
簡單一致的編程模型
跨域分佈處理的ACID保證
DTP模型自己的侷限
缺乏充分公開的大規模、高可用、密集事務應用的成功案例
基於MQ 消息數據獨立存儲、獨立伸縮
下降業務系統與消息系統間的耦合
一次消息發送須要兩次請求
業務處理服務需實現消息狀態回查接口
二階段提交 原理簡單,實現方便 同步阻塞:在二階段提交的過程當中,全部的節點都在等待其餘節點的響應,沒法進行其餘操做。這種同步阻塞極大的限制了分佈式系統的性能。
單點問題:協調者在整個二階段提交過程當中很重要,若是協調者在提交階段出現問題,那麼整個流程將沒法運轉。更重要的是,其餘參與者將會處於一直鎖定事務資源的狀態中,而沒法繼續完成事務操做。
數據不一致:假設當協調者向全部的參與者發送commit請求以後,發生了局部網絡異常,或者是協調者在還沒有發送完全部 commit請求以前自身發生了崩潰,致使最終只有部分參與者收到了commit請求。這將致使嚴重的數據不一致問題。
容錯性很差:若是在二階段提交的提交詢問階段中,參與者出現故障,致使協調者始終沒法獲取到全部參與者的確認信息,這時協調者只能依靠其自身的超時機制,判斷是否須要中斷事務。顯然,這種策略過於保守。換句話說,二階段提交協議沒有設計較爲完善的容錯機制,任意一個節點是失敗都會致使整個事務的失敗。
TCC 相對於二階段提交,三階段提交主要解決的單點故障問題,並減小了阻塞的時間。由於一旦參與者沒法及時收到來自協調者的信息以後,他會默認執行 commit。而不會一直持有事務資源並處於阻塞狀態。 三階段提交也會致使數據一致性問題。因爲網絡緣由,協調者發送的 abort 響應沒有及時被參與者接收到,那麼參與者在等待超時以後執行了 commit 操做。這樣就和其餘接到 abort 命令並執行回滾的參與者之間存在數據不一致的狀況。
SAGA 簡單業務使用TCC須要修改原來業務邏輯,saga只須要添加一個補償動做
因爲沒有預留動做因此不用擔憂資源釋放的問題異常處理簡單
因爲沒有預留動做致使補償處理麻煩

業務各有各的不一樣,有些業務能容忍短時間不一致,有些業務的操做能夠冪等,不管什麼樣的分佈式事務解決方案都有其優缺點,沒有一個銀彈可以適配全部。所以,業務須要什麼樣的解決方案,還須要結合自身的業務需求、業務特色、技術架構以及各解決方案的特性,綜合分析,才能找到最適合的方案。

相關文章
相關標籤/搜索