分佈式事務是企業集成中的一個技術難點,也是每個分佈式系統架構中都會涉及到的一個東西,特別是在這幾年愈來愈火的微服務架構中,幾乎能夠說是沒法避免,本文就圍繞單機事務,分佈式事務以及分佈式事務的處理方式來展開。java
事務提供一種「要麼什麼都不作,要麼作全套(All or Nothing)」的機制,她有ACID四大特性mysql
以mysql的InnoDB存儲引擎爲例,來了解單機事務是如何保證ACID特性的。 git
單機事務是經過將操做限制在一個會話內經過數據庫自己的鎖以及日誌來實現ACID,那麼分佈式環境下該如何保證ACID特性那?github
X/Open DTP(X/Open Distributed Transaction Processing Reference Model) 是X/Open 這個組織定義的一套分佈式事務的標準,也就是了定義了規範和API接口,由各個廠商進行具體的實現。 X/Open DTP 定義了三個組件: AP,TM,RM
web
其中在DTP定義瞭如下幾個概念spring
若是一個事務管理器管理着多個資源管理器,DTP是經過兩階段提交協議來控制全局事務和分支事務。sql
TCC(Try-Confirm-Cancel)分佈式事務模型相對於 XA 等傳統模型,其特徵在於它不依賴資源管理器(RM)對分佈式事務的支持,而是經過對業務邏輯的分解來實現分佈式事務。
數據庫
3PC的CanCommit階段其實和2PC的準備階段很像。協調者向參與者發送commit請求,參與者若是能夠提交就返回Yes響應,不然返回No響應。
事務詢問:協調者向參與者發送CanCommit請求。詢問是否能夠執行事務提交操做。而後開始等待參與者的響應
響應反饋:參與者接到CanCommit請求以後,正常狀況下,若是其自身認爲能夠順利執行事務,則返回Yes響應,並進入預備狀態;不然反饋No。apache
執行事務預提交
發送預提交請求:協調者向全部參與者節點發出 preCommit 的請求,並進入 prepared 狀態。
事務預提交:參與者受到 preCommit 請求後,會執行事務操做,對應 2PC 準備階段中的 「執行事務」,也會 Undo 和 Redo 信息記錄到事務日誌中。
各參與者響應反饋:若是參與者成功執行了事務,就反饋 ACK 響應,同時等待指令:提交(commit) 或終止(abort)編程
中斷事務
發送中斷請求:協調者向全部參與者節點發出 abort 請求 。
中斷事務:參與者若是收到 abort 請求或者超時了,都會中斷事務。
執行提交
發送提交請求:協調者接收到各參與者發送的ACK響應,那麼他將從預提交狀態進入到提交狀態。並向全部參與者發送 doCommit 請求。
事務提交:參與者接收到 doCommit 請求以後,執行正式的事務提交。並在完成事務提交以後釋放全部事務資源。
響應反饋:事務提交完以後,向協調者發送 ACK 響應。
完成事務:協調者接收到全部參與者的 ACK 響應以後,完成事務。
中斷事務
協調者沒有接收到參與者發送的 ACK 響應(多是接受者發送的不是ACK響應,也可能響應超時),那麼就會執行中斷事務。
發送中斷請求:協調者向全部參與者發送 abort 請求。
事務回滾:參與者接收到 abort 請求以後,利用其在階段二記錄的 undo 信息來執行事務的回滾操做,並在完成回滾以後釋放全部的事務資源。
反饋結果:參與者完成事務回滾以後,向協調者發送 ACK 消息。
中斷事務:協調者接收到參與者反饋的 ACK 消息以後,完成事務的中斷。
Saga的組成:
saga的執行順序有兩種:
Saga定義了兩種恢復策略:
Saga的注意事項
Saga架構
僅在同一個事務上下文中須要協調多種資源(即數據庫,以及消息主題或隊列)時,纔有必要使用 X/Open XA 接口。數據庫接入XA須要使用XA版的數據庫驅動,消息隊列要實現XA須要實現javax.transaction.xa.XAResource接口。
代碼以下:
public class UserService {
@Autowired
private UserDao userDao;
@Autowired
private LogDao logDao;
@Transactional
public void save(User user){
userDao.save(user);
logDao.save(user);
throw new RuntimeException();
}
}
@Resource
public class UserDao {
@Resource(name="jdbcTemplateA")
private JdbcTemplate jdbcTemplate;
public void save(User user){
jdbcTemplate.update("insert into user(name,age) values(?,?)",user.getName(),user.getAge());
}
}
@Repository
public class LogDao {
@Resource(name="jdbcTemplateB")
private JdbcTemplate jdbcTemplate;
public void save(User user){
jdbcTemplate.update("insert into log(name,age) values(?,?)",user.getName(),user.getAge());
}
}
複製代碼
配置:
<bean id="jotm" class="org.objectweb.jotm.Current" />
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="jotm" />
</bean>
<tx:annotation-driven transaction-manager="transactionManager"/>
<!-- 配置數據源 -->
<bean id="dataSourceA" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
<property name="dataSource">
<bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
<property name="transactionManager" ref="jotm" />
<property name="driverName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8" />
</bean>
</property>
<property name="user" value="xxx" />
<property name="password" value="xxx" />
</bean>
<!-- 配置數據源 -->
<bean id="dataSourceB" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
<property name="dataSource">
<bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
<property name="transactionManager" ref="jotm" />
<property name="driverName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/test2?useUnicode=true&characterEncoding=utf-8" />
</bean>
</property>
<property name="user" value="xxx" />
<property name="password" value="xxx" />
</bean>
<bean id="jdbcTemplateA" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSourceA" />
</bean>
<bean id="jdbcTemplateB" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSourceB" />
</bean>
複製代碼
使用到的JAR包:
compile 'org.ow2.jotm:jotm-core:2.3.1-M1'
compile 'org.ow2.jotm:jotm-datasource:2.3.1-M1'
compile 'com.experlog:xapool:1.5.0'
複製代碼
事務配置: 咱們知道分佈式事務中須要一個事務管理器即接口javax.transaction.TransactionManager、面向開發人員的javax.transaction.UserTransaction。對於jotm來講,他們的實現類都是Current
public class Current implements UserTransaction, TransactionManager
咱們若是想使用分佈式事務的同時,又想使用Spring帶給咱們的@Transactional便利,就須要配置一個JtaTransactionManager,而該JtaTransactionManager是須要一個userTransaction實例的,因此用到了上面的Current,以下配置:
<bean id="jotm" class="org.objectweb.jotm.Current" />
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="jotm" />
</bean>
<tx:annotation-driven transaction-manager="transactionManager"/>
複製代碼
同時上述StandardXADataSource是須要一個TransactionManager實例的,因此上述StandardXADataSource配置把jotm加了進去.
執行過程:
代碼同上,配置爲:
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="atomikosUserTransaction" />
</bean>
<tx:annotation-driven transaction-manager="springTransactionManager"/>
<!-- 配置數據源 -->
<bean id="dataSourceC" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="XA1DBMS" />
<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="xaProperties">
<props>
<prop key="URL">jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8</prop>
<prop key="user">xxx</prop>
<prop key="password">xxx</prop>
</props>
</property>
<property name="poolSize" value="3" />
<property name="minPoolSize" value="3" />
<property name="maxPoolSize" value="5" />
</bean>
<!-- 配置數據源 -->
<bean id="dataSourceD" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="XA2DBMS" />
<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="xaProperties">
<props>
<prop key="URL">jdbc:mysql://localhost:3306/test2?useUnicode=true&characterEncoding=utf-8</prop>
<prop key="user">xxx</prop>
<prop key="password">xxx</prop>
</props>
</property>
<property name="poolSize" value="3" />
<property name="minPoolSize" value="3" />
<property name="maxPoolSize" value="5" />
</bean>
<bean id="jdbcTemplateC" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSourceC" />
</bean>
<bean id="jdbcTemplateD" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSourceD" />
</bean>
複製代碼
事務配置:
咱們知道分佈式事務中須要一個事務管理器即接口javax.transaction.TransactionManager、面向開發人員的javax.transaction.UserTransaction。對於Atomikos來講分別對應以下:
<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="userTransaction" />
</bean>
<tx:annotation-driven transaction-manager="springTransactionManager"/>
複製代碼
能夠對比下jotm的案例配置jotm的分佈式事務配置。能夠看到jotm中使用的xapool中的StandardXADataSource是須要一個transactionManager的,而Atomikos使用的AtomikosNonXADataSourceBean則不須要。咱們知道,StandardXADataSource中有了transactionManager就能夠獲取當前線程的事務,同時把XAResource加入進當前事務中去,而AtomikosNonXADataSourceBean卻沒有,它是怎麼把XAResource加入進當前線程綁定的事務呢?這時候就須要能夠經過靜態方法隨時獲取當前線程綁定的事務。 使用到的JAR包:
compile 'com.atomikos:transactions-jdbc:4.0.0M4'
複製代碼
以訂單子系統和支付子系統爲例,以下圖:
begin tx;
count = update account set amount = amount - ${cash} where uid = ${uid} and amount >= amount if (count <= 0) return false update payment_record set status = paid where trade_id = ${tradeId}
commit;
複製代碼
對於trade要執行的操做能夠用僞代碼表示以下:
begin tx;
count = update trade_record set status = paid where trade_id = ${trade_id} and status = unpaid if (count <= 0) return false do other things ... commit;
複製代碼
可是對於這兩段代碼如何串起來是個問題,咱們增長一個事務表,即圖中的tx_info,來記錄成功完成的支付事務,tx_info中須要有能夠標示被支付系統處理狀態的字段,爲了和支付信息一致,須要放入事務中,代碼以下:
begin tx;
count = update account set amount = amount - ${cash} where uid = ${uid} and amount >= amount if (count <= 0) return false update payment_record set status = paid where trade_id = ${tradeId}
insert into tx_info values(${trade_id},${amount}...) commit;
複製代碼
支付系統邊界到此爲止,接下來就是訂單系統輪詢訪問tx_info,拉取已經支付成功的訂單信息,對每一條信息都執行trade系統的邏輯,僞代碼以下:
foreach trade_id in tx_info
do trade_tx
save tx_info.id to some store
複製代碼
事無延遲取決於時間程序輪詢間隔,這樣咱們作到了一致性,最終訂單都會在支付以後的最大時間間隔內完成狀態遷移。
固然,這裏也能夠採用支付系統經過RPC方式同步通知訂單系統的方式來實現,處理狀態經過tx_info中的字段來表示。
另外,交易系統每次拉取數據的起點以及消費記錄須要記錄下來,這樣才能不遺漏不重複地執行,因此須要增長一張表用於排重,即上圖中的tx_duplication。可是每次對tx_duplication表的插入要在trade_tx的事務中完成,僞代碼以下:
begin tx;
c = insert ignore tx_duplication values($trade_id...) if (c <= 0) return false count = update trade_record set status = paid where trade_id = ${trade_id} and status = unpaid if (count <= 0) return false do other things ... commit;
複製代碼
另外,tx_duplication表中trade_id表上必須有惟一鍵,這個算是結合以前的冪等篇來保證trade_tx的操做是冪等的。
在上面的方案中,tx_info表所起到的做用就是隊列做用,記錄一個系統的表更,做爲通知給須要感知的系統的事件。而時間程序去拉取只是系統去獲取感興趣事件的一個方式,而對應交易系統的本地事務只是對應消費事件的一個過程。在這樣的描述下,這些功能就是一個MQ——消息中間件。以下圖
sendPrepare();
isCommit = local_tx()
if (isCommit) sendCommit()
else sendRollback()
複製代碼
在作本地事務以前,先向MQ發送一個prepare消息,而後執行本地事務,本地事務提交成功的話,向MQ發送一個commit消息,不然發送一個abort消息,取消以前的消息。MQ只會在收到commit確認纔會將消息投遞出去,因此這樣的形式能夠保證在一切正常的狀況下,本地事務和MQ能夠達到一致性。
可是分佈式存在異常狀況,網絡超時,機器宕機等等,好比當系統執行了local_tx()成功以後,還沒來得及將commit消息發送給MQ,或者說發送出去了,網絡超時了等等緣由,MQ沒有收到commit,即commit消息丟失了,那麼MQ就不會把prepare消息投遞出去。若是這個沒法保證的話,那麼這個方案是不可行的。針對這種狀況,須要一個第三方異常校驗模塊來對MQ中在必定時間段內沒有commit/abort 的消息和發消息的系統進行檢查,確認該消息是否應該投遞出去或者丟棄,獲得系統的確認以後,MQ會作投遞仍是丟棄,這樣就徹底保證了MQ和發消息的系統的一致性,從而保證了接收消息系統的一致性。
這個方案要求MQ的系統可用性必須很是高,至少要超過使用MQ的系統(推薦rocketmq,kafka都支持發送預備消息和業務回查),這樣才能保證依賴他的系統能穩定運行。
項目地址:github.com/apache/serv… Saga處理場景是要求相關的子事務提供事務處理函數同時也提供補償函數。Saga協調器alpha會根據事務的執行狀況向omega發送相關的指令,肯定是否向前重試或者向後恢復。
成功場景下,每一個事務都會有開始和有對應的結束事件。
異常場景下,omega會向alpha上報中斷事件,而後alpha會向該全局事務的其它已完成的子事務發送補償指令,確保最終全部的子事務要麼都成功,要麼都回滾。
超時場景下,已超時的事件會被alpha的按期掃描器檢測出來,與此同時,該超時事務對應的全局事務也會被中斷。
假設要租車、預訂酒店知足分佈式事務。
租車服務
@Service
class CarBookingService {
private Map<Integer, CarBooking> bookings = new ConcurrentHashMap<>();
@Compensable(compensationMethod = "cancel")
void order(CarBooking booking) {
booking.confirm();
bookings.put(booking.getId(), booking);
}
void cancel(CarBooking booking) {
Integer id = booking.getId();
if (bookings.containsKey(id)) {
bookings.get(id).cancel();
}
}
Collection<CarBooking> getAllBookings() {
return bookings.values();
}
void clearAllBookings() {
bookings.clear();
}
}
複製代碼
酒店預訂
@Service
class HotelBookingService {
private Map<Integer, HotelBooking> bookings = new ConcurrentHashMap<>();
@Compensable(compensationMethod = "cancel")
void order(HotelBooking booking) {
if (booking.getAmount() > 2) {
throw new IllegalArgumentException("can not order the rooms large than two");
}
booking.confirm();
bookings.put(booking.getId(), booking);
}
void cancel(HotelBooking booking) {
Integer id = booking.getId();
if (bookings.containsKey(id)) {
bookings.get(id).cancel();
}
}
Collection<HotelBooking> getAllBookings() {
return bookings.values();
}
void clearAllBookings() {
bookings.clear();
}
}
複製代碼
主服務
@RestController
public class BookingController {
@Value("${car.service.address:http://car.servicecomb.io:8080}")
private String carServiceUrl;
@Value("${hotel.service.address:http://hotel.servicecomb.io:8080}")
private String hotelServiceUrl;
@Autowired
private RestTemplate template;
@SagaStart
@PostMapping("/booking/{name}/{rooms}/{cars}")
public String order(@PathVariable String name, @PathVariable Integer rooms, @PathVariable Integer cars) {
template.postForEntity(
carServiceUrl + "/order/{name}/{cars}",
null, String.class, name, cars);
postCarBooking();
template.postForEntity(
hotelServiceUrl + "/order/{name}/{rooms}",
null, String.class, name, rooms);
postBooking();
return name + " booking " + rooms + " rooms and " + cars + " cars OK";
}
// This method is used by the byteman to inject exception here
private void postCarBooking() {
}
// This method is used by the byteman to inject the faults such as the timeout or the crash
private void postBooking() {
}
}
複製代碼
執行流程
項目地址https://github.com/QNJR-GROUP/EasyTransaction[對比tcc-transaction,Hmily,ByteTCC來講EasyTransaction性能最好,壓測未發現錯誤], 固然你也可使用上面提到的SAGA項目,也是支持TCC協議的。下面咱們舉個例子來看TCC是如何處理業務邏輯的。
eg:訂單支付
try階段
confirm階段
cancel階段
基本概念 | 優勢 | 缺點 |
---|---|---|
本地事務。事務由資源管理器(如DBMS)本地管理 | 嚴格的ACID | 不具有分佈事務處理能力 |
全局事務(DTP模型) TX協議:應用或應用服務器與事務管理器的接口 XA協議:全局事務管理器與資源管理器的接口 |
嚴格的ACID | 效率很是低 |
JTA:面向應用、應用服務器與資源管理器的高層事務接口 JTS:JTA事務管理器的實現標準,向上支持JTA,向下經過CORBA OTS實現跨事務域的互操做性 EJB |
簡單一致的編程模型 跨域分佈處理的ACID保證 |
DTP模型自己的侷限 缺乏充分公開的大規模、高可用、密集事務應用的成功案例 |
基於MQ | 消息數據獨立存儲、獨立伸縮 下降業務系統與消息系統間的耦合 |
一次消息發送須要兩次請求 業務處理服務需實現消息狀態回查接口 |
二階段提交 | 原理簡單,實現方便 | 同步阻塞:在二階段提交的過程當中,全部的節點都在等待其餘節點的響應,沒法進行其餘操做。這種同步阻塞極大的限制了分佈式系統的性能。 單點問題:協調者在整個二階段提交過程當中很重要,若是協調者在提交階段出現問題,那麼整個流程將沒法運轉。更重要的是,其餘參與者將會處於一直鎖定事務資源的狀態中,而沒法繼續完成事務操做。 數據不一致:假設當協調者向全部的參與者發送commit請求以後,發生了局部網絡異常,或者是協調者在還沒有發送完全部 commit請求以前自身發生了崩潰,致使最終只有部分參與者收到了commit請求。這將致使嚴重的數據不一致問題。 容錯性很差:若是在二階段提交的提交詢問階段中,參與者出現故障,致使協調者始終沒法獲取到全部參與者的確認信息,這時協調者只能依靠其自身的超時機制,判斷是否須要中斷事務。顯然,這種策略過於保守。換句話說,二階段提交協議沒有設計較爲完善的容錯機制,任意一個節點是失敗都會致使整個事務的失敗。 |
TCC | 相對於二階段提交,三階段提交主要解決的單點故障問題,並減小了阻塞的時間。由於一旦參與者沒法及時收到來自協調者的信息以後,他會默認執行 commit。而不會一直持有事務資源並處於阻塞狀態。 | 三階段提交也會致使數據一致性問題。因爲網絡緣由,協調者發送的 abort 響應沒有及時被參與者接收到,那麼參與者在等待超時以後執行了 commit 操做。這樣就和其餘接到 abort 命令並執行回滾的參與者之間存在數據不一致的狀況。 |
SAGA | 簡單業務使用TCC須要修改原來業務邏輯,saga只須要添加一個補償動做 因爲沒有預留動做因此不用擔憂資源釋放的問題異常處理簡單 |
因爲沒有預留動做致使補償處理麻煩 |
業務各有各的不一樣,有些業務能容忍短時間不一致,有些業務的操做能夠冪等,不管什麼樣的分佈式事務解決方案都有其優缺點,沒有一個銀彈可以適配全部。所以,業務須要什麼樣的解決方案,還須要結合自身的業務需求、業務特色、技術架構以及各解決方案的特性,綜合分析,才能找到最適合的方案。