本文目錄:java
後端工具和環境node
在開發個人開源項目 prex 時,加入工做流,解決工做流用戶與當前系統用戶同步問題時,涉及到遠程調用操做兩個數據庫所產生的事務問題,好比系統用戶在增長用戶同步工做流用戶時,系統用戶添加成功,工做流用戶沒有添加成功,則形成數據不一致問題,本地事務沒法回滾,那麼則使用分佈式事務解決方案。
開源項目:gitee.com/kaiyuantuan…mysql
指一次大的操做由不一樣的小操做組成的,這些小的操做分佈在不一樣的服務器上,分佈式事務須要保證這些小操做要麼所有成功,要麼所有失敗。從本質上來講,分佈式事務就是爲了保證不一樣數據庫的數據一致性。
通俗一點說就是單體應用被拆分紅微服務應用,原來的一個模塊被拆分紅三個獨立的應用,分別使用獨立的數據源,業務操做須要調用三個服務來完成。git
分佈式事務做爲微服務應用中的大難題,在現有的解決方案中,我的認爲 Seata
是目前最輕量的解決方案github
Seata
是一款開源的分佈式事務解決方案,致力於提供高性能和簡單易用的分佈式事務服務。Seata
將爲用戶提供了 AT、TCC、SAGA 和 XA 事務模式,爲用戶打造一站式的分佈式解決方案。redis
兩階段提交協議的演變:spring
以一個示例來講明:
兩個全局事務 tx1 和 tx2,分別對 a 表的 m 字段進行更新操做,m 的初始值 1000。sql
tx1 先開始,開啓本地事務,拿到本地鎖,更新操做 m = 1000 - 100 = 900。本地事務提交前,先拿到該記錄的 全局鎖 ,本地提交釋放本地鎖。 tx2 後開始,開啓本地事務,拿到本地鎖,更新操做 m = 900 - 100 = 800。本地事務提交前,嘗試拿該記錄的 全局鎖 ,tx1 全局提交前,該記錄的全局鎖被 tx1 持有,tx2 須要重試等待 全局鎖 。數據庫
tx1 二階段全局提交,釋放 全局鎖 。tx2 拿到 全局鎖 提交本地事務。後端
若是 tx1 的二階段全局回滾,則 tx1 須要從新獲取該數據的本地鎖,進行反向補償的更新操做,實現分支的回滾。
此時,若是 tx2 仍在等待該數據的 全局鎖,同時持有本地鎖,則 tx1 的分支回滾會失敗。分支的回滾會一直重試,直到 tx2 的 全局鎖 等鎖超時,放棄 全局鎖 並回滾本地事務釋放本地鎖,tx1 的分支回滾最終成功。
由於整個過程 全局鎖 在 tx1 結束前一直是被 tx1 持有的,因此不會發生 髒寫 的問題。
在數據庫本地事務隔離級別 讀已提交(Read Committed) 或以上的基礎上,Seata(AT 模式)的默認全局隔離級別是 讀未提交(Read Uncommitted) 。
若是應用在特定場景下,必須要求全局的 讀已提交 ,目前 Seata 的方式是經過 SELECT FOR UPDATE 語句的代理。
SELECT FOR UPDATE 語句的執行會申請 全局鎖 ,若是 全局鎖 被其餘事務持有,則釋放本地鎖(回滾 SELECT FOR UPDATE 語句的本地執行)並重試。這個過程當中,查詢是被 block 住的,直到 全局鎖 拿到,即讀取的相關數據是 已提交 的,才返回。
出於整體性能上的考慮,Seata 目前的方案並無對全部 SELECT 語句都進行代理,僅針對 FOR UPDATE 的 SELECT 語句。
以一個示例來講明整個 AT 分支的工做過程。
業務表:product
Field | Type | Key |
---|---|---|
id | bigint(20) | PRI |
name | varchar(100) | |
since | varchar(100) |
AT 分支事務的業務邏輯:
update product set name = 'GTS' where name = 'TXC';
過程:
select id, name, since from product where name = 'TXC';
複製代碼
獲得前鏡像:
id name since 1 TXC 2014
select id, name, since from product where id = 1`;
複製代碼
獲得後鏡像:
id name since 1 GTS 2014
{
"branchId": 641789253,
"undoItems": [{
"afterImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "GTS"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"beforeImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "TXC"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"sqlType": "UPDATE"
}],
"xid": "xid:xxx"
}
複製代碼
update product set name = 'TXC' where id = 1;
複製代碼
UNDO_LOG Table:不一樣數據庫在類型上會略有差異。
以 MySQL 爲例:
Field | Type |
---|---|
branch_id | bigint PK |
xid | varchar(100) |
context | varchar(128) |
rollback_info | longblob |
log_status | tinyint |
log_created | datetime |
log_modified | datetime |
-- 注意此處0.7.0+ 增長字段 context
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
複製代碼
回顧總覽中的描述:一個分佈式的全局事務,總體是 兩階段提交 的模型。全局事務是由若干分支事務組成的,分支事務要知足 兩階段提交 的模型要求,即須要每一個分支事務都具有本身的:
根據兩階段行爲模式的不一樣,咱們將分支事務劃分爲 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.
AT 模式(參考連接 TBD)基於 支持本地 ACID 事務 的 關係型數據庫:
相應的,TCC 模式,不依賴於底層數據資源的事務支持:
Saga 模式是 SEATA 提供的長事務解決方案,在 Saga 模式中,業務流程中每一個參與者都提交本地事務,當出現某一個參與者失敗則補償前面已經成功的參與者,一階段正向服務和二階段補償服務都由業務開發實現。
理論基礎:Hector & Kenneth 發表論⽂ Sagas (1987)
Nacos
做爲註冊中心,Nacos 的安裝及使用能夠參考seata-server
,這裏下載的是 seata-server-0.9.0.zip,下載地址:github.com/seata/seata…seata安裝包
快速獲取百度雲下載連接解壓完成後咱們獲得了幾個文件夾
seata server
全部的配置都在 conf 文件夾內,該文件夾內有兩個文件咱們必需要詳細介紹下。
seata server
默認使用 file(文件方式)進行存儲事務日誌、事務運行信息,咱們能夠經過-m db 腳本參數的形式來指定,目前僅支持 file、db 這兩種方式。
修改 conf 目錄下的 file.conf 配置文件,主要修改自定義事務組名稱,事務日誌存儲模式及數據庫鏈接信息
transport {
...省略
}
service {
#vgroup->rgroup
vgroup_mapping.prex_tx_group = "default" #修改事務組名稱爲:prex_tx_group,和客戶端自定義的名稱對應
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
## transaction log store
store {
## store mode: file、db
mode = "db" #修改此處將事務信息存儲到db數據庫中
## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "druid"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/seat" #修改數據庫鏈接地址
user = "root" #修改數據庫用戶名
password = "root" #修改數據庫密碼
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
複製代碼
說明:
registry.conf
配置文件,指明註冊中心爲 nacos,及修改 nacos 鏈接信息便可;registry {
# file 、nacos 、eureka、redis、zk、consul、etcd三、sofa
type = "nacos"
nacos {
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
... 省略
}
}
複製代碼
啓動 seata server 的腳本位於 bin 文件內,Linux/Mac
環境使用 seata-server.sh 腳本啓動,Windows 環境使用 seata-server.bat 腳本啓動。
Linux/Mac
啓動方式示例以下所示:
nohup sh seata-server.sh -p 8091 -h 127.0.0.1 -m db &> seata.log &
複製代碼
經過 nohup 命令讓 seata server 在系統後臺運行。
腳本參數:
當咱們看到-Server started 時並未發現其餘錯誤信息,咱們的 seata server 已經啓動成功
讓咱們從一個微服務示例開始 用戶購買商品的業務邏輯。整個業務邏輯由 3 個微服務提供支持:
建立業務數據庫
db-order:存儲訂單的數據庫
db-storage:存儲庫存的數據庫
db-account:存儲帳戶信息的數據庫
order 訂單表:
DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
`id` int(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵Id',
`user_id` int(20) DEFAULT NULL COMMENT '用戶Id',
`pay_money` decimal(11,0) DEFAULT NULL COMMENT '付款金額',
`product_id` int(20) DEFAULT NULL COMMENT '商品Id',
`status` int(11) DEFAULT NULL COMMENT '狀態',
`count` int(11) DEFAULT NULL COMMENT '商品數量',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC COMMENT='訂單表';
SET FOREIGN_KEY_CHECKS = 1;
複製代碼
product 商品表:
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
`id` int(20) NOT NULL COMMENT '主鍵',
`product_id` int(11) DEFAULT NULL COMMENT '商品Id',
`price` decimal(11,0) DEFAULT NULL COMMENT '價格',
`count` int(11) DEFAULT NULL COMMENT '庫存數量',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC COMMENT='倉儲服務';
-- ----------------------------
-- Records of product
-- ----------------------------
BEGIN;
INSERT INTO `product` VALUES (1, 1, 50, 100);
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
複製代碼
account 帳戶表:
DROP TABLE IF EXISTS `account`;
CREATE TABLE `account` (
`id` int(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵Id',
`user_id` int(20) DEFAULT NULL COMMENT '用戶Id',
`balance` decimal(11,0) DEFAULT NULL COMMENT '餘額',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC;
-- ----------------------------
-- Records of account
-- ----------------------------
BEGIN;
INSERT INTO `account` VALUES (1, 1, 100);
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
複製代碼
須要在每一個數據庫中建立日誌回滾表,建表 sql 在 seata-server 的/conf/db_undo_log.sql 中。
三個服務,一個訂單服務,一個倉儲服務,一個帳戶服務。當用戶下單時,會在訂單服務中建立一個訂單,而後經過遠程調用庫存服務來扣減下單商品的庫存,再經過遠程調用帳戶服務來扣減用戶帳戶裏面的餘額,最後在訂單服務中修改訂單狀態爲已完成。該操做跨越三個數據庫,有兩次遠程調用,很明顯會有分佈式事務問題
複製代碼
nacos-seata-account-server 帳戶服務
nacos-seata-order-server 訂單服務
nacos-seata-storage-server 倉儲服務
對 nacos-seata-account-server、nacos-seata-order-server 和 nacos-seata-storage-server 三個 seata 的客戶端進行配置,它們配置大體相同,咱們下面以 nacos-seata-account-server 的配置爲例;
修改 application.yml 文件,自定義事務組的名稱
spring:
cloud:
alibaba:
seata:
tx-service-group: prex_tx_group #自定義事務組名稱須要與seata-server中的對應
複製代碼
service {
#vgroup->rgroup
vgroup_mapping.prex_tx_group = "default" #修改自定義事務組名稱
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
disableGlobalTransaction = false
}
複製代碼
添加並修改 registry.conf 配置文件,主要是將註冊中心改成 nacos
registry {
# file 、nacos 、eureka、redis、zk
type = "nacos" #修改成nacos
nacos {
serverAddr = "localhost:8848" #修改成nacos的鏈接地址
namespace = ""
cluster = "default"
}
}
複製代碼
代碼只展現核心代碼 具體代碼文章尾部連接
@EnableDiscoveryClient
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@MapperScan("com.xd.example.seata.mapper")
public class NacosSeataAccountServerApplication {
public static void main(String[] args) {
SpringApplication.run(NacosSeataAccountServerApplication.class, args);
}
}
複製代碼
MyBatisPlusConfig:
/**
* @Classname MyBatisPlusConfig
* @Description 配置MybatisPlus使用Seata對數據源進行代理
* @Author Created by Lihaodong (alias:小東啊) im.lihaodong@gmail.com
* @Date 2019-11-25 11:21
* @Version 1.0
*/
@Configuration
public class MyBatisPlusConfig {
@Value("${mybatis-plus.mapper-locations}")
private String mapperLocations;
/**
* @param sqlSessionFactory SqlSessionFactory
* @return SqlSessionTemplate
*/
@Bean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
/**
* 從配置文件獲取屬性構造datasource,注意前綴,這裏用的是druid,根據本身狀況配置,
* 原生datasource前綴取"spring.datasource"
*
* @return
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public DataSource hikariDataSource() {
return new HikariDataSource();
}
/**
* 構造datasource代理對象,替換原來的datasource
*
* @param hikariDataSource
* @return
*/
@Primary
@Bean("dataSource")
public DataSourceProxy dataSourceProxy(DataSource hikariDataSource) {
return new DataSourceProxy(hikariDataSource);
}
@Bean(name = "sqlSessionFactory")
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(dataSourceProxy);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
bean.setMapperLocations(resolver.getResources(mapperLocations));
SqlSessionFactory factory = null;
try {
factory = bean.getObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
return factory;
}
/**
* MP 自帶分頁插件
*
* @return
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor page = new PaginationInterceptor();
page.setDialectType("mysql");
return page;
}
}
複製代碼
package com.xd.example.seata.service.impl;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xd.example.seata.domain.Order;
import com.xd.example.seata.mapper.OrderMapper;
import com.xd.example.seata.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.xd.example.seata.service.RemoteAccountService;
import com.xd.example.seata.service.RemoteStorageService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* <p>
* 訂單表 服務實現類
* </p>
*
* @author lihaodong
* @since 2019-11-25
*/
@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {
@Autowired
private RemoteStorageService remoteStorageService;
@Autowired
private RemoteAccountService remoteAccountService;
@GlobalTransactional(rollbackFor = Exception.class)
@Override
public void createOrder(Order order) {
log.info("下單開始,用戶:{},商品:{},數量:{},金額:{}", order.getUserId(), order.getProductId(), order.getCount(), order.getPayMoney());
//建立訂單
order.setStatus(0);
boolean save = save(order);
log.info("保存訂單{}", save ? "成功" : "失敗");
log.info("當前 XID: {}", RootContext.getXID());
//遠程調用庫存服務扣減庫存
log.info("扣減庫存開始");
remoteStorageService.decrease(order.getProductId(), order.getCount());
log.info("扣減庫存結束");
//遠程調用帳戶服務扣減餘額
log.info("扣減餘額開始");
remoteAccountService.decrease(order.getUserId(), order.getPayMoney());
log.info("扣減餘額結束");
//修改訂單狀態爲已完成
log.info("修改訂單狀態開始");
update(Wrappers.<Order>lambdaUpdate().set(Order::getStatus, 1).eq(Order::getUserId, order.getUserId()));
log.info("修改訂單狀態結束");
log.info("下單結束");
}
}
複製代碼
分別運行 nacos-seata-order-server、nacos-seata-storage-server 和 nacos-seata-account-server 三個服務
查詢數據庫初始數據信息
打開瀏覽器/Postman 調用接口進行下單操做:http://localhost:8081/order/create?userId=1&productId=1&count=1&payMoney=50
結果:
倉儲服務:
帳戶服務:
package com.xd.example.seata.service.impl;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xd.example.seata.domain.Account;
import com.xd.example.seata.mapper.AccountMapper;
import com.xd.example.seata.service.IAccountService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.Optional;
/**
* <p>
* 服務實現類
* </p>
*
* @author lihaodong
* @since 2019-11-25
*/
@Slf4j
@Service
public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements IAccountService {
@Override
public boolean reduceBalance(Integer userId, BigDecimal balance) throws Exception {
log.info("當前 XID: {}", RootContext.getXID());
checkBalance(userId, balance);
log.info("開始扣減用戶 {} 餘額", userId);
//模擬超時異常
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer record = baseMapper.reduceBalance(userId, balance);
log.info("結束扣減用戶 {} 餘額結果:{}", userId, record > 0 ? "操做成功" : "扣減餘額失敗");
return record > 0;
}
private void checkBalance(Integer userId, BigDecimal price) throws Exception {
log.info("檢查用戶 {} 餘額", userId);
Optional<Account> account = Optional.ofNullable(baseMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getUserId, userId)));
if (account.isPresent()) {
BigDecimal balance = account.get().getBalance();
if (balance.compareTo(price) == -1) {
log.warn("用戶 {} 餘額不足,當前餘額:{}", userId, balance);
throw new Exception("餘額不足");
}
}
}
}
複製代碼
修改完會重啓帳戶服務,再次發送請求
能夠看到訂單正常,扣除庫存正常,帳戶服務讀取超時異常
發現下單後數據庫數據並無任何改變
咱們在 seata-order-service 中註釋掉@GlobalTransactional 來看看會發生什麼
// @GlobalTransactional(name = "prex-create-order",rollbackFor = Exception.class)
@Override
public void createOrder(Order order) {
log.info("當前 XID: {}", RootContext.getXID());
log.info("下單開始,用戶:{},商品:{},數量:{},金額:{}", order.getUserId(), order.getProductId(), order.getCount(), order.getPayMoney());
//建立訂單
order.setStatus(0);
boolean save = save(order);
log.info("保存訂單{}", save ? "成功" : "失敗");
... 省略代碼
}
複製代碼
保存重啓訂單服務,再次請求接口 因爲 nacos-seata-account-server 的超時會致使當庫存和帳戶金額扣減後訂單狀態並無設置爲已經完成
下一篇更新
下一篇更新
參考資料:
seata.io/zh-cn/