seata是阿里巴巴研發的一套開源分佈式事務框架,提供了AT、TCC、SAGA 和 XA 幾種事務模式。本文以精品課項目組的物流後臺服務爲例,介紹seata框架落地的過程,遇到的問題以及解決方案。java
做者/ 鄧新偉mysql
編輯/ 網易有道redis
有道精品課教務系統是基於springcloud的分佈式集羣服務。在實際業務中,存在許多分佈式事務場景。然而傳統的事務框架是沒法實現全局事務的。長期以來,咱們的分佈式場景的一致性,每每指的是放棄強一致性,保證最終一致性。spring
咱們從調研中發現,seata框架既能夠知足業務需求,靈活兼容多種事務模式,又能夠實現數據強一致性。sql
本文以物流業務爲例,記錄了在實際業務中落地seata框架落地的過程當中遇到的一些問題以及解決方案,供你們學習討論~歡迎你們在留言區討論交流數據庫
seata框架分爲3個組件:segmentfault
維護全局和分支事務的狀態,驅動全局事務提交或回滾。api
定義全局事務的範圍:開始全局事務、提交或回滾全局事務。架構
管理分支事務處理的資源,與TC交談以註冊分支事務和報告分支事務的狀態,並驅動分支事務提交或回滾oracle
在官網下載 seata 服務端,解壓後執行bin/seata-server.sh便可啓動。
seata-server 有2個配置文件:registry.conf 與 file.conf。而 registry.conf 文件決定了 seata-server 使用的註冊中心配置和配置信息獲取方式。
咱們使用 consul 作註冊中心,所以須要在registry.conf文件中,須要修改如下配置:
registry { #file 、nacos 、eureka、redis、zk、consul、etcd三、sofa type = "consul" ## 這裏註冊中心填consul loadBalance = "RandomLoadBalance" loadBalanceVirtualNodes = 10 ... ... consul { cluster = "seata-server" serverAddr = "***註冊中心地址***" #這裏的dc指的是datacenter,若consul爲多數據源配置須要在請求中加入dc參數。 #dc與namespace並不是是seata框架自帶的,文章後面將會進一步解釋 dc="bj-th" namespace="seata-courseop" } ... ... } config { # file、nacos 、apollo、zk、consul、etcd3 ## 若是啓動時從註冊中心獲取基礎配置信息,填consul ## 不然從file.conf文件中獲取 type = "consul" consul { serverAddr = "127.0.0.1:8500" } ... ... }
其中須要注意的是,若是須要高可用部署,seata獲取配置信息的方式就必須是註冊中心,此時file.conf就沒用了。
(固然,須要事先把file.conf文件中的配置信息遷移到consul中)
store { ## store mode: file、db、redis mode = "db" ... ... ## database store property ## 若是使用數據庫模式,須要配置數據庫鏈接設置 db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" driverClassName = "com.mysql.jdbc.Driver" url = "jdbc:mysql://***線上數據庫地址***/seata" user = "******" password = "******" minConn = 5 maxConn = 100 ## 這裏的三張表須要提早在數據庫建好 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 maxWait = 5000 } ... ... } service { #vgroup->rgroup vgroupMapping.tx-seata="seata-server" default.grouplist="127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1" }
其中,global_table, branch_table, lock_table三張表須要提早在數據庫中建好。
每一個使用seata框架的服務都須要引入seata組件
dependencies { api 'com.alibaba:druid-spring-boot-starter:1.1.10' api 'mysql:mysql-connector-java:6.0.6' api('com.alibaba.cloud:spring-cloud-alibaba-seata:2.1.0.RELEASE') { exclude group:'io.seata', module:'seata-all' } api 'com.ecwid.consul:consul-api:1.4.5' api 'io.seata:seata-all:1.4.0' }
每一個服務都一樣須要配置file.conf與registry.conf文件,放在resource目錄下。registry.conf與server的保持一致。在file.conf文件中,除了db配置外,還須要進行client參數的配置:
client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackOnConflict = true } reportRetryCount = 5 tableMetaCheckEnable = false reportSuccessEnable = false } tm { commitRetryCount = 5 rollbackRetryCount = 5 } undo { dataValidation = true logSerialization = "jackson" ## 這個undo_log也須要提早在mysql中建立 logTable = "undo_log" } log { exceptionRate = 100 } }
在application.yml文件中添加seata配置:
spring: cloud: seata: ## 注意tx-seata須要與服務端和客戶端的配置文件保持一致 tx-service-group: tx-seata
另外,還須要替換項目的數據源,
@Primary @Bean("dataSource") public DataSource druidDataSource(){ DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setUrl(url); druidDataSource.setUsername(username); druidDataSource.setPassword(password); druidDataSource.setDriverClassName(driverClassName); return new DataSourceProxy(druidDataSource); }
至此,client端的配置也已經完成了。
一個分佈式的全局事務,總體是兩階段提交的模型。
全局事務是由若干分支事務組成的,
分支事務要知足兩階段提交的模型要求,即須要每一個分支事務都具有本身的:
根據兩階段行爲模式的不一樣,咱們將分支事務劃分爲 Automatic (Branch) Transaction Mode 和 TCC (Branch) Transaction Mode.
AT 模式基於支持本地ACID事務的關係型數據庫:
直接在須要添加全局事務的方法中加上註解@GlobalTransactional
@SneakyThrows @GlobalTransactional @Transactional(rollbackFor = Exception.class) public void buy(int id, int itemId){ // 先生成訂單 Order order = orderFeignDao.create(id, itemId); // 根據訂單扣減帳戶餘額 accountFeignDao.draw(id, order.amount); }
注意:同@Transactional同樣,@GlobalTransactional若要生效也要知足:
TCC 模式是支持把自定義的分支事務歸入到全局事務的管理中。
首先編寫一個TCC服務接口:
@LocalTCC public interface BusinessAction { @TwoPhaseBusinessAction(name = "doBusiness", commitMethod = "commit", rollbackMethod = "rollback") boolean doBusiness(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "message") String msg); boolean commit(BusinessActionContext businessActionContext); boolean rollback(BusinessActionContext businessActionContext); }
其中,BusinessActionContext爲全局事務上下文,能夠今後對象中獲取全局事務相關信息(若是是發起全局事務方,傳入null後自動生成),而後實現該接口:
@Slf4j @Service public class BusinessActionImpl implements BusinessAction { @Transactional(rollbackFor = Exception.class) @Override public boolean doBusiness(BusinessActionContext businessActionContext, String msg) { log.info("準備do business:{}",msg); return true; } @Transactional(rollbackFor = Exception.class) @Override public boolean commit(BusinessActionContext businessActionContext) { log.info("business已經commit"); return true; } @Transactional(rollbackFor = Exception.class) @Override public boolean rollback(BusinessActionContext businessActionContext) { log.info("business已經rollback"); return true; } }
最後,開啓全局事務方法同AT模式。
@SneakyThrows @GlobalTransactional public void doBusiness(BusinessActionContext context, String msg){ accountFeignDao.draw(3, new BigDecimal(100)); businessAction.doBusiness(context, msg); }
在部署seata項目時經常會遇到這樣的問題:在本地調試時一切正常,可是當試圖部署到線上時,老是在clinet端提示註冊TC端失敗。
seata服務的高可用部署只支持註冊中心模式。所以,咱們須要想辦法將file.conf文件以鍵值對的形式存到consul中。
遺憾的是,consul並無顯式支持namespace,咱們只能在put請求中用「/」爲分隔符起到相似的效果。固然,seata框架也沒有考慮到這一點。因此咱們須要修改源碼中的Configuration接口與RegistryProvider接口的consul實現類,增長namespace屬性
TC在想mysql插入日誌數據時,偶爾會報:
Caused by: java.sql.SQLException: Incorrect string value:
application_data字段其實就是對業務數據的記錄。官方給出的建表語句是這樣的:
CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8;
顯然,VARCHAR(2000)的大小是不合適的, utf8的格式也是不合適的。因此咱們須要修改seata關於數據源鏈接的部分代碼:
// connectionInitSql設置 protected Set<String> getConnectionInitSqls(){ Set<String> set = new HashSet<>(); String connectionInitSqls = CONFIG.getConfig(ConfigurationKeys.STORE_DB_CONNECTION_INIT_SQLS); if(StringUtils.isNotEmpty(connectionInitSqls)) { String[] strs = connectionInitSqls.split(","); for(String s:strs){ set.add(s); } } // 默認支持utf8mb4 set.add("set names utf8mb4"); return set; }
seata基於java的spi機制提供了自定義實現接口的功能,咱們只須要在本身的服務中,根據seata的接口寫好本身的實現類便可。
SPI(Service Provider Interface)是JDK內置的服務發現機制,用在不一樣模塊間經過接口調用服務,避免對具體服務服務接口具體實現類的耦合。好比JDBC的數據庫驅動模塊,不一樣數據庫鏈接驅動接口相同但實現類不一樣,在使用SPI機制之前調用驅動代碼須要直接在類裏採用Class.forName(具體實現類全名)的方式調用,這樣調用方依賴了具體的驅動實現,在替換驅動實現時要修改代碼。
以ConsulRegistryProvider爲例:
ConsulRegistryServiceImpl
// 增長DC和namespace private static String NAMESPACE; private static String DC; private ConsulConfiguration() { Config registryCongig = ConfigFactory.parseResources("registry.conf"); NAMESPACE = registryCongig.getString("config.consul.namespace"); DC = CommonSeataConfiguration.getDatacenter(); consulNotifierExecutor = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("consul-config-executor", THREAD_POOL_NUM)); } ... ... // 同時在getHealthyServices中,刪除請求參數wait&index /** * get healthy services * * @param service * @return */ private Response<List<HealthService>> getHealthyServices(String service, long index, long watchTimeout) { return getConsulClient().getHealthServices(service, HealthServicesRequest.newBuilder() .setTag(SERVICE_TAG) .setDatacenter(DC) .setPassing(true) .build()); }
ConsulRegistryProvider 注意order要大於seata包中的默認值1,seata類加載器會優先加載order更大的實現類
@LoadLevel(name = "Consul" ,order = 2) public class ConsulRegistryProvider implements RegistryProvider { @Override public RegistryService provide() { return ConsulRegistryServiceImpl.getInstance(); } }
而後在META-INF 的services目錄下添加:io.seata.discovery.registry.RegistryProvider
com.youdao.ke.courseop.common.seata.ConsulRegistryProvider
這樣就能夠替換seata包中的實現了。
對於這些自定義實現類,以及一些公共client配置,咱們能夠統一封裝到一個工具包下:
這樣,其餘項目只須要引入這個工具包,就能夠無需繁瑣的配置,直接使用了。
gradle引入common包:
api 'com.youdao.ke.courseop.common:common-seata:0.0.+'
以一個物流場景爲例:
業務架構:
業務背景:logistics 執行領用單新增,在 elasticsearch 中更新數據,同時經過 rpc 調用 logistics-k3c 的金蝶出庫方法,生成金蝶單據,如圖2所示
問題:若是elasticsearch單據更新出現異常,金蝶單據將沒法回滾,形成數據不一致的問題。
在部署完seata線上服務後,只須要在logistics與logistics-k3c中分別引入common-seata工具包
logistics服務:
// 使用全局事務註解開啓全局事務 @GlobalTransactional @Transactional(rollbackFor = Exception.class) public void Scm經過(StaffOutStockDoc staffOutStock, String body) throws Exception { ... 一些業務處理... // 構建金蝶單據請求 K3cApi.StaffoutstockReq req = new K3cApi.StaffoutstockReq(); req.materialNums = materialNums; req.staffOutStockId = staffOutStock.id; ... 一些業務處理 ... // 調用logistics-k3c-api金蝶出庫 k3cApi.staffoutstockAuditPass(req); staffOutStock.status = 待發貨; staffOutStock.scmAuditTime = new Date(); staffOutStock.updateTime = new Date(); staffOutStock.historyPush("scm經過"); // 更新對象後存入elasticsearch es.set(staffOutStock); }
logistics-k3c:
因爲咱們新增單據接口是調用金蝶的服務,因此這裏使用TCC模式構建事務接口
首先建立StaffoutstockCreateAction接口
@LocalTCC public interface StaffoutstockCreateAction { @TwoPhaseBusinessAction(name = "staffoutstockCreate") boolean create(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "staffOutStock") StaffOutStock staffOutStock, @BusinessActionContextParameter(paramName = "materialNum") List<Triple<Integer, Integer, Integer>> materialNum); boolean commit(BusinessActionContext businessActionContext); boolean rollback(BusinessActionContext businessActionContext); }
接口實現StaffoutstockCreateActionImpl
@Slf4j @Service public class StaffoutstockCreateActionImpl implements StaffoutstockCreateAction { @Autowired private K3cAction4Staffoutstock k3cAction4Staffoutstock; @SneakyThrows @Transactional(rollbackFor = Exception.class) @Override public boolean create(BusinessActionContext businessActionContext, StaffOutStock staffOutStock, List<Triple<Integer, Integer, Integer>> materialNum) { //金蝶單據新增 k3cAction4Staffoutstock.staffoutstockAuditPass(staffOutStock, materialNum); return true; } @SneakyThrows @Transactional(rollbackFor = Exception.class) @Override public boolean commit(BusinessActionContext businessActionContext) { Map<String, Object> context = businessActionContext.getActionContext(); JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock"); // 若是嘗試新增成功,commit不作任何事 StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class); log.info("staffoutstock {} commit successfully!", staffOutStock.id); return true; } @SneakyThrows @Transactional(rollbackFor = Exception.class) @Override public boolean rollback(BusinessActionContext businessActionContext) { Map<String, Object> context = businessActionContext.getActionContext(); JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock"); StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class); // 這裏調用金蝶單據刪除接口進行回滾 k3cAction4Staffoutstock.staffoutstockRollback(staffOutStock); log.info("staffoutstock {} rollback successfully!", staffOutStock.id); return true; } }
封裝爲業務方法
/** * 項目組領用&報廢的審覈經過:新增其餘出庫單 * 該方法使用seata-TCC方案實現全局事務 * @param staffOutStock * @param materialNum */ @Transactional public void staffoutstockAuditPassWithTranscation(StaffOutStock staffOutStock, List<Triple<Integer, Integer, Integer>> materialNum){ staffoutstockCreateAction.create(null, staffOutStock, materialNum); }
k3c API實現類
@SneakyThrows @Override public void staffoutstockAuditPass(StaffoutstockReq req) { ... 一些業務處理方法 ... //這裏調用了封裝好的事務方法 k3cAction4Staffoutstock.staffoutstockAuditPassWithTranscation(staffOutStock, triples); }
這樣,一個基於 TCC 的全局事務鏈路就創建起來了。
當全局事務執行成功時,咱們能夠在 server 中看到打印的日誌(如圖3):
若是全局事務執行失敗,會進行回滾,此時會執行接口中的rollback,調用金蝶接口刪除生成的單據,如圖4。
本文以seata框架的部署與使用爲主線,記錄了seata 框架運用的一些關鍵步驟與技術細節,並針對項目落地時遇到的一些的技術問題提供瞭解決方案。
在後續的推文中,咱們還將繼續以 seata 框架的源碼解析爲主線,向你們介紹 seata 實現分佈式事務的核心原理與技術細節。
-END-