輕鬆構建微服務之分佈式事物

微信公衆號:內核小王子
關注可瞭解更多關於數據庫,JVM內核相關的知識;
若是你有任何疑問也能夠加我pigpdong[1]
此爲輕鬆構建微服務系列的第三篇文章node

拜占庭將軍問題

在存在消息丟失的不可信信道上經過消息傳遞的方式獲得一致性是不可能的mysql

因爲當時拜占庭帝國國土遼闊,爲了防護目的,每一個軍隊都分隔很遠,將軍和將軍之間只能靠信差來通訊,可是軍隊內可能存在間諜和叛徒,那麼如何在已知有叛徒的狀況下,在其餘忠誠的將軍之間達成一致性協定。映射到網絡問題,例如網絡被劫持,網絡不可達等算法

CAP

一個分佈式系統中,最多隻能同時知足一致性,可用性,分區容忍性spring

  • 一致性(Consistance)sql

一致性是指,all nodes see the same data at same time,怎麼理解這句話? 假設在一個有N個節點的集羣環境下,同一時刻訪問不一樣的節點的同一個數據,返回的值應該同樣.數據庫

例如一個寫請求發送到節點A將值從5改成6,在寫操做以前假設全部節點上的數據都是5,那麼當節點A把值從5改成6以後,還須要把這個操做同步到其餘的節點,直到其餘節點都同步成功後,才能返回客戶端寫成功,而且在這個過程當中節點若是已經修改成6的值不能被外界看到.緩存

  • 可用性 (Availablity)微信

可用性是指 read and write always succeed.是指服務永遠能夠響應客戶端的請求,這個也是在集羣環境下,當一個節點不可用的時候,能夠將請求發到其餘節點上,一個節點不可用不必定是由於宕機,網絡不通等緣由,也有多是這個節點的數據沒有複製全.固然狹義上的可用性也能夠指代請求響應時間,在如今高併發的環境下,若是一個請求的響應時間太長,一方面影響用戶體驗倒置不可用,另外一方面因爲響應時間太長倒置QPS過低,當客戶端大量請求打過來的時候會將服務端打垮倒置機器不可用.
在一個分佈式系統中,上下游系統任何一個節點出故障均可能倒置整個系統不可用,像數據庫,負載均衡,緩存,微服務中的某一個服務,因此分佈式系統在知足高可用的需求上須要進行彈性設計,便可以支持服務降級,限流,以及監控等.網絡

  • 分區容忍性架構

分區容忍性是指 the system continue to operate despite arbitrary message loss or failure of part of the system, 也就是說當網絡存在丟包和某幾個節點網絡不可達的狀況下,整個服務集羣能夠繼續保持可用,operate是經營,管理的意思,despite是儘管的意思,arbitrary是隨意的意思.

在分佈式系統中,只有多個節點才能保證一個節點掛了另一個節點能夠頂上來實現高可用,不能存在單點,而爲了知足高可用必須保證每一個節點上都有完整的數據,也就是須要各個節點之間須要進行數據複製,這個複製操做只能經過網絡,而網絡環境是不穩定的,光纖被挖,機房斷電,操做員失誤等是機器沒法預料的,當網絡的不可靠就可能引發數據不一樣步,數據不一樣步就會致使節點不可用,因此在現代軟件架構中,會在三者之間作一個權衡,例如對一致性,能夠放低要求作到弱一致性,就是運行中間短暫的數據不一致,而最終數據仍是一致的,因此出來了一個變種就是BASE.
basic avalilablity,基本可用意味着系統能夠出現短暫的不可用,後面會恢復,Soft-state 軟狀態,是在有狀態和無狀態之間的一種中間態,運行應用短暫的保存一小部分數據和狀態.Eventual Consistency 最終一致性,系統在一個短暫的時間段內是不一致的,但最終會恢復一致.

2PC

兩階段提交Two Phase Commit,一階段,協調者發起prepare操做,參與者將操做結果同步給協調者,二階段 由協調者根據上一階段操做的結果,決定提交仍是回滾.也就是全部參與者都返回成功就提交,有一個參與者返回失敗就回滾.

該協議比較簡單,咱們對照下圖進行理解,就不在具體分析過程了.

兩階段提交有如下缺點

  • 1.單點故障 一旦協調者發生故障,那麼事物的其餘參與者會一直阻塞下去,尤爲在第二階段,因爲一階段已經將資源鎖定,協調者能夠設計爲分佈式集羣部署,當master節點宕機,能夠從新選舉一個master,可是協調者就須要將參與者的返回結果狀態進行持久化,而且和其餘協調者進行同步,這個過程又會涉及新的一致性問題.

  • 2.同步阻塞問題,阻塞週期長,當第一個參與者鎖定資源後須要等到階段2提交後纔會釋放資源

  • 3.可能出現數據不一致,當協調者發出commit後,某一個參與者因爲網絡緣由沒有收到,而其餘參與者已經commit,因而出現數據不一致

基於兩階段提交的缺陷,後來提出了三階段提交的方案,就是在2pc的基礎上增長了一個CanCommit的詢問動做,這一步能夠保證後續的commit操做大機率會成功,例如從用戶帳戶扣50塊錢,而canCommit詢問的時候發現帳戶只有40,就會直接返回不能夠提交,提升成功率,同時減小了鎖定資源的時間,可是3PC並無解決2PC的問題
固然三階段也設置了超時時間,二階段只有協調中的請求會有超時時間.

XA

X/OPEN 組織定義了一套分佈式事物處理模型,也就是Distributed Trasaction Processing Reference Model 簡稱DTP模型,並定義了兩套協議,分別是XA,TX

從上圖能夠看出,DTP使用了兩階段提交協議,並提出了RM,AP,TM的概念,目前mysql的innodb有對XA的支持,spring也封裝了XA相關的接口,可是XA存在兩階段提交相關問題,因此目前在互聯網公司高併發的場景下並無太普及.

業務補償

在分佈式環境下,因爲在大多數狀況下咱們沒法作到強一致性的ACID,特別是咱們的系統跨越多個系統,並且有些系統可能還須要調用其餘公司的服務,咱們要保證一個事物所有執行成功,要麼所有回滾,在哪些場景須要回滾,哪些場景不須要回滾可能會和業務強相關.

業務補償的設計,就是業務正常執行,當遇到某個分支場景不可用的時候就啓用回滾流程,將以前的流程進行逆向操做.咱們用客戶購買理財產品的例子,第一步 下訂單,第二步,扣產品份額,第三步,支付.第四部給用戶加持倉,第五步將訂單狀態改成成功.

  • 1.扣份額失敗則直接修改訂單狀態爲失敗

  • 2.若是支付失敗,則回滾產品份額,修改訂單狀態爲失敗

  • 3.若是加持倉失敗就一直重試

  • 4.若是修改訂單狀態失敗就一直重試

  • 5.若是支付返回處理中就不停輪詢支付狀態直到有肯定結果

根據以上流程咱們能夠總結下采用補償方案的設計重點

  • 1.能夠採用工做量引擎進行業務補償操做

  • 2.業務服務須要支持重試

  • 3.業務服務要冪等

  • 4.業務服務要提升回滾接口,如上面的回滾標的份額

  • 5.有些業務失敗可能不須要進行回滾,因此業務補償和業務關聯緊密,很難用中間件解決

  • 6.有些業務可能沒有明確結果,須要採用JOB進行狀態查詢

  • 7.業務服務要支持狀態查詢的接口

  • 8.補償業務不必定是強相關或依賴的,有些服務能夠並行執行能夠提升效率

TCC

TCC是(Try Commit Cancel)的簡稱,源於國外的一篇論文,最先由阿里的程立博士在infoQ的一篇介紹中引入國內,目前國內大多數互聯網公司都在採用這種方案.

咱們用一個帳戶A轉帳給帳戶B100元爲例子

  • Try  預留資源,完成一致性檢查 (帳戶A可用餘額減小100,凍結金額增長100元,帳戶B凍結金額增長100)

  • Commit 執行業務,不作任何一致性檢查,並且只能使用try中預留的資源 (帳戶A凍結金額減小100,帳戶B可用餘額增長100,凍結金額減小100)

  • Cancel 釋放TRY階段預留的資源 (帳戶A可用餘額增長100,凍結金額減小100,帳戶B凍結金額減小100)

 

TCC模式本質上也是兩階段提交,上圖對比了TCC和XA方案,二者一個是業務操做一個是針對數據庫,因此TCC方案不會採用數據庫本地事物去鎖定資源,因此使用TCC也須要各個接口可以支持冪等,而且可以重試,並且須要提供狀態查詢接口,否則在網絡超時後,發起方不肯定分支事物是執行成功仍是失敗.

SAGA

saga提供了一種根據工做流引擎進行管理事物的提交和回滾,流程上相似於上面的事物補償機制

可靠消息最終一致性

大多時候咱們但願多個業務可以並行處理,這個時候咱們能夠藉助消息隊列來異步通知其餘應用進行相應的操做,那麼怎麼保證本地事物和接受消息的應用上處理的服務要麼所有成功,要麼所有失敗呢,咱們根據以前的最終一致性方案,容許兩個分支事物能夠前後執行,可是最終確定會執行,不會不執行.

這種方案其實也是業務補償的一種,只是藉助消息隊列進行解耦和異步通知,下面咱們分析下這種方案的兩個難點.

  • 如何保證消息投遞和本地事物要麼所有成功,要麼所有失敗
    首先,事物管理器先發送一條記錄給消息服務,消息服務將這條消息存儲,狀態記錄爲待確認,而後執行本地數據庫操做,而後發送確認消息給消息服務,這兩個動做能夠放在一個本地事物內,若是本地數據庫操做成功,消息確認失敗,就將本地數據庫操做回滾,若是本地數據庫操做失敗就發消息給消息服務請求刪除消息.
    若是發送給消息服務的確認和刪除因爲網絡沒有迴應,那麼就須要把在消息服務裏定時輪詢事物的狀態,也就是消息服務去反查服務發送方,而後決定提交消息或者刪除消息,因此消息服務和服務發送方應該維護一個雙向通道,rocketMQ的作法是將PID和對應的channel緩存起來

  • 如何保證消息百分百會被消費
    消費端消費完消息後,給個確認消息給消息服務,消息服務不停輪詢當前消息列表中,查看是否存在沒有消費完成的消息,若是存在就讓消息隊列從新通知一次.

  • 如何保證消息不會重複消費

原則上咱們但願分支事物本身可以支持冪等,若是必定要讓中間件去重,實際上消息隊列去重的代價是很大的,會犧牲掉高可用性,咱們能夠在應用層維護一張表去存儲已經處理的消息,這樣能夠根據消息ID去重.

FESCAR

FESCAR (Fast easy commit and rollback),是阿里GTS的社區開源版本,基於現有的分佈式事物解決方案,要麼像XA這種會有嚴重的性能問題,要麼像業務補償,TCC,SAGA,可靠消息保證等,這種須要根據業務場景在應用中進行定製,咱們不但願引入微服務後給業務層帶來額外的研發負擔,另一方面不但願引入分佈式事物後拖慢業務,因此FesCar的初衷就是對業務0侵入,而且高性能.

下面先經過官方上的介紹,看下fescar的思想,後面在結合代碼看下fescar的具體實現細節

咱們先簡單回顧下XA裏面分佈式事物的三個角色,TM事物管理器.負責發起一個全局事物,TC事物協調器,獨立在應用層外,負責維護全局事物和分支事物的狀態,來決策提交仍是回滾,RM:資源管理器,存儲實際狀態的容器,像硬盤,數據庫,消息中間件,內存等均可以稱爲RM,Fescar將RM做爲一個二方包的形式遷入到了應用層,做爲應用層和TC進行通信的代理,而且協助本地事物處理.

  • 1.TM向TC申請開啓一個全局事物,全局事物建立成功,並返回一個惟一的XID

  • 2.XID在微服務調用鏈路中進行傳遞,dubbo能夠經過filter方式,spring cloud也很方便,RPC服務端收到XID後存放在本地線程局部變量threadlocal中

  • 3.分支事物RM向TC註冊分支事物,將其歸入XID對應的全局事物管理中

  • 4.TM向TC發起針對XID的全局事物的提交或者回滾操做

  • 5.TC調用XID管轄下的分支事物完成提交和回滾

FESCAR取消了數據庫層的prepare操做,而是直接進行commit操做,這樣就不會帶來昂貴的數據庫鎖開銷,而每個commit操做對應的都是數據庫的本地事物,這個改變是Fescar性能高的主要緣由,同時使他犧牲了隔離性,致使目前Fescar只能支持讀未提交的隔離級別,若是要實現讀已提交須要應用層作一些定製.

Fescar的RM插件,從新實現了一遍JDBC,從而攔截掉數據庫的sql進行解析,並生成undolog,以及事物提交後的加強處理,這種設計使應用方徹底無感,只須要開啓一個全局事物

 

undolog是存儲在業務方本地的數據庫實例裏,這樣業務更新和插入undolog在一個本地事物內,能夠保證事物回滾的時候必定有undolog.

 

目前fescar剛開源,在可靠性上還須要驗證,目前社區也在計劃完善一些新功能.

下面咱們分析下Fescar的一些核心功能

事物管理器

咱們先看下全局事物的三個核心接口,begin,commit,rollback,我在代碼中都加了註釋

public interface GlobalTransaction {

    /**
     * Begin a new global transaction with default timeout and name.
     *
     * 開啓一個全局事物,像TC發起請求,返回一個XID,並將這個XID進行緩存到ThreadLocal
     *
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    void begin() throws TransactionException;

    /**
     * Commit the global transaction.
     *
     * 提交一個全局事物,將XID發給TC,並清除threadlocal裏的緩存
     *
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    void commit() throws TransactionException;

    /**
     * Rollback the global transaction.
     *
     * 回滾一個全局事物,將XID發給TC,並清除threadlocal裏的緩存
     *
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    void rollback() throws TransactionException;
}

而後咱們在看下事物處理模板,也就是咱們使用的入口,也是接入fescar惟一要關心的一個地方

public class TransactionalTemplate {

    /**
     * Execute object.
     *
     * @param business the business 只須要傳人一個TransactionalExecutor就能夠了,業務實現放在execute裏面就能夠了
     * @return the object
     * @throws ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 2. begin transaction
        try {
            tx.begin(business.timeout(), business.name());

        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }

        Object rs = null;
        try {

            // Do Your Business
            rs = business.execute();

        } catch (Throwable ex) {

            // 3. any business exception, rollback.
            try {
                tx.rollback();

                // 3.1 Successfully rolled back
                throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);

            } catch (TransactionException txe) {
                // 3.2 Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.RollbackFailure, ex);

            }

        }

        // 4. everything is fine, commit.
        try {
            tx.commit();

        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);

        }
        return rs;
    }

}

有沒有發現跟咱們平時使用的JTA用法同樣,因爲分支事物是RPC調用,因此存在網絡超時的狀況,因此分支事物若是超時了,即便分支事物的本地執行成功了,全局事物同樣會進行回滾,由於這裏會捕獲這個超時異常,後面咱們在分析爲何要這樣設計.

xid怎麼在rpc中傳遞

咱們在作分佈式鏈路監控的時候,也須要在rpc之間傳遞一個traceid,方法相似,若是是dubbo,咱們能夠寫一個filter

@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID();
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
        }
        boolean bind = false;
        if (xid != null) {
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } else {
            if (rpcXid != null) {
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                }
            }
        }
        try {
            return invoker.invoke(invocation);

        } finally {
            if (bind) {
                String unbindXid = RootContext.unbind();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                    if (unbindXid != null) {
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                    }
                }
            }
        }
    }
}

RootContext是fescar的threadlocal容器,RpcContext是dubbo得threadcontext容器,Attachment可讓dubbo在遠程調用過程當中攜帶更多地參數,服務調用方傳遞xid,服務提供方接收xid並保存,服務調用結束記得清空threadlocal以防止內存泄露.

分支在一階段的處理流程

PreparedStatementProxy ->  ExecuteTemplate ->  UpdateExecutor ->ConnectionProxy

  • 0.sql進入jdbc的PreparedStatement中,而後這個jdbc對象被PreparedStatementProxy代理,進入他的execute方法

  • 1.查看本身是否在一個Fescar的全局事物中,根據線程本地變量threadlocal中是否存在全局事物id進行判斷,具體代碼在ExecuteTemplate中

  • 2.若是存在,就進入jdbc代理中,解析sql,根據類型選擇不一樣的執行器

3.使用 select for update 查詢獲取原始當前update的記錄當前值,就是修改前的值

  • 4.執行本來的update修改記錄的值

  • 5.使用 select for update 查詢獲取update後的值,就是修改後的值

  • 6.根據修改前和修改後的值,生成undolog,並根據主鍵的值生成lockkey放入context中

  • 5.向TC請求,註冊分支事物並將lockkey傳給TC加上全局鎖

  • 6.若是註冊而且獲取鎖成功,將把undolog插入數據庫中

  • 7.提交本地事物,並向TC反饋執行結果 (本地事物中包含本來的執行語句和插入undolog)

咱們發現分支事物上的RM操做是基於statement和connection,在原來的connection上作了加強,用的時同一個物理connection,因此分支應用上的分支的定義爲一個本地事物,因此
在一個RPC實現中,一個方法中若是存在多個sql語句,那麼將會註冊多個分支,向TC註冊屢次,若是這個方法在一個本地事物中,那麼即便多個sql最終一塊兒提交,而且只會向TC註冊一次,undolog也是一塊兒插入數據庫的,這個地方注意下,若是這個connection是自動提交的,爲了讓update語句和插入undolog放在一個本地事物中,因此會將connection改成非自動提交,開啓一個事物,在用完只會在改成自動提交,不要影響應用程序.

咱們回顧下分支一階段的處理流程

二階段的處理流程

  • 1.若是全部的分支都返回執行成功,TC將當即釋放全局鎖,而且TC異步通知分支刪除undolog

  • 2.若是有一個分支事物返回執行失敗,則TC發起請求執行undolog,全部undolog執行成功了,釋放鎖,異步刪除undolog

如何實現讀已提交

目前官方給出的方案,在須要進行讀取全局事物已經提交的記錄的話,須要將select 語句後面加上for update,fescar發現加上排他鎖後,會去TC獲取對應的鎖,若是沒有鎖上就執行,鎖上就自旋等待,爲了不讀取未提交的記錄,後面全局事物回滾了,就致使髒讀,固然目前可能大部分應用均可以接受這種狀況,例如在扣商品份額的時候都會最終在校驗一次,
固然咱們也能夠藉助undolog實現一個read-view,讓這條sql語句讀取到這個全局事物尚未執行以前的數據.

目前fescar的實現方式是在sql解析後若是發現是select for update語句,將會進入SelectForUpdateExecutor執行器,無論是否是在一個全局事物中,都須要去TC看下是否被鎖住,這裏將不會獲取鎖,只是校驗鎖是否會釋放.

若是分支超時,實際已經執行成功,那麼確定是已經向TC註冊成功的,那麼若是TM發起回滾,分支能夠正常回滾,沒有毛病,若是超時後,分支本地事物尚未提交,那麼回滾請求已經到達分支,那麼將會回滾失敗,可是TC會重試不停進行回滾.

實現HA的挑戰

TC目前是一個單點,若是須要集羣部署,則須要一個服務發現的系統,讓TC能夠自動擴展,應用不須要關心TC具體節點,而TC的全局鎖就不能直接放內存了,可能須要藉助第三方存儲系統,mysql或者etcd

實現XA的方案

可能須要在分支事物中,當解析到在一個全局事物中,不會進行commit,等到全部分支都返回成功了,事物管理器發起commit請求給TC,而後TC在通知各個分支進行提交,和rollback流程差很少

一致性協議 raft

因爲paxos算法太複雜,咱們分析下raft協議是如何保證分佈式集羣下數據複製的一致性的.

  • 1.經過選舉保證集羣中只有一個leader,只有leader對外提供寫服務,leader將日誌廣播給follower,每一條日誌都按順序維護在一個隊列裏,全部節點的隊列裏有一個index來控制前面的是已經提交的,後面的是沒提交的,提交表明已經有超過半數的節點應答,leader先把日誌複製給全部follower,在收到半數節點應答後在通知follower,index位置來控制那些日誌是已經提交的,只有提交過的日誌,follower纔會提供給應用方使用

  • 二、選舉過程,當一個leader長時間沒有應答時,全部的follower均可以成爲candidate,向其餘follower節點發送投票請求,若是超過半數follower節點應答後這個candidate就會升級爲leader,爲了不全部的follower節點已經做爲candidate發起投票,引入隨機超時機制,每一個follower和leader的超時時間在必定範圍內隨機,當candidate發起投票沒有結果時,隨機等待必定時間。

  • 3.candidate的日誌長度要大於等於半數follower節點的日誌才能成爲leader,因此發起投票的時候若是follower發現本身的日誌長度大於後選擇的就會投反對票

  • 4.日誌補齊,當leader發生故障的時候,各個follower上的狀態不同,因此新leader產生後須要補齊全部follow的日誌,並且新leander的日誌也不必定是最長的,可是foller日誌上面沒有的日誌確定是未提交的,這個時候補齊就能夠

  • 5.老leader復活,每一次選舉到下一次選舉的時間稱爲一個term任期,每個任期內都會維護一個數字並自增,當leader發送複製請求的時候會帶上term編號,若是follower發現term比本身小就拒絕,

raft設計中只有兩個rpc請求,一個選舉,一個複製日誌,複製日誌順便作了心跳檢測,當沒有日誌複製的時候發送空日誌,觸發選舉的惟一條件是 election timeout到期,每個節點的 election timeout都會將本身設置爲candidate而後發起投票,每一個節點的election timeout都會存在一個隨機值,因此不一樣,當一個節點被選爲leader後會按期向全部的follower發送心跳包,follower收到心跳包後會延長election timeout的值。節點選舉的時候term值大的會優先於term值小的,每一輪選舉term值都會加1.

相關文章
相關標籤/搜索