本篇實戰所使用Spring有關版本:java
SpringBoot:2.1.7.RELEASEmysql
Spring Cloud:Greenwich.SR2git
Spring CLoud Alibaba:2.1.0.RELEASEgithub
在構建微服務的過程當中,不論是使用什麼框架、組件來構建,都繞不開一個問題,跨服務的業務操做如何保持數據一致性。web
首先,設想一個傳統的單體應用,不管多少內部調用,最後終歸是在同一個數據庫上進行操做來完成一貫業務操做,如圖:redis
隨着業務量的發展,業務需求和架構發生了巨大的變化,總體架構由原來的單體應用逐漸拆分紅爲了微服務,原來的3個服務被從一個單體架構上拆開了,成爲了3個獨立的服務,分別使用獨立的數據源,也不在以前共享同一個數據源了,具體的業務將由三個服務的調用來完成,如圖:算法
此時,每個服務的內部數據一致性仍然有本地事務來保證。可是面對整個業務流程上的事務應該如何保證呢?這就是在微服務架構下面臨的挑戰,如何保證在微服務中的數據一致性。spring
所謂的 XA 方案,即兩階段提交,有一個事務管理器的概念,負責協調多個數據庫(資源管理器)的事務,事務管理器先問問各個數據庫你準備好了嗎?若是每一個數據庫都回復 ok,那麼就正式提交事務,在各個數據庫上執行操做;若是任何其中一個數據庫回答不 ok,那麼就回滾事務。sql
分佈式系統的一個難點是如何保證架構下多個節點在進行事務性操做的時候保持一致性。爲實現這個目的,二階段提交算法的成立基於如下假設:數據庫
TCC的全稱是:Try、Confirm、Cancel。
這種方案說實話幾乎不多人使用,可是也有使用的場景。由於這個事務回滾其實是嚴重依賴於你本身寫代碼來回滾和補償了,會形成補償代碼巨大。
TCC的理論有點抽象,下面咱們藉助一個帳務拆分這個實際業務場景對TCC事務的流程作一個描述,但願對理解TCC有所幫助。
業務流程:分別位於三個不一樣分庫的賬戶A、B、C,A和B一塊兒向C轉賬共80元:
Try:嘗試執行業務。
完成全部業務檢查(一致性):檢查A、B、C的賬戶狀態是否正常,賬戶A的餘額是否很多於30元,賬戶B的餘額是否很多於50元。
預留必須業務資源(準隔離性):賬戶A的凍結金額增長30元,賬戶B的凍結金額增長50元,這樣就保證不會出現其餘併發進程扣減了這兩個賬戶的餘額而致使在後續的真正轉賬操做過程當中,賬戶A和B的可用餘額不夠的狀況。
Confirm:確認執行業務。
真正執行業務:若是Try階段賬戶A、B、C狀態正常,且賬戶A、B餘額夠用,則執行賬戶A給帳戶C轉帳30元、賬戶B給帳戶C轉帳50元的轉賬操做。
不作任何業務檢查:這時已經不須要作業務檢查,Try階段已經完成了業務檢查。
只使用Try階段預留的業務資源:只須要使用Try階段賬戶A和賬戶B凍結的金額便可。
Cancel:取消執行業務。
釋放Try階段預留的業務資源:若是Try階段部分紅功,好比賬戶A的餘額夠用,且凍結相應金額成功,賬戶B的餘額不夠而凍結失敗,則須要對賬戶A作Cancel操做,將賬戶A被凍結的金額解凍掉。
Seata 的方案其實一個 XA 兩階段提交的改進版,具體區別以下:
架構的層面:
XA 方案的 RM 其實是在數據庫層,RM 本質上就是數據庫自身(經過提供支持 XA 的驅動程序來供應用使用)。
而 Seata 的 RM 是以二方包的形式做爲中間件層部署在應用程序這一側的,不依賴與數據庫自己對協議的支持,固然也不須要數據庫支持 XA 協議。這點對於微服務化的架構來講是很是重要的:應用層不須要爲本地事務和分佈式事務兩類不一樣場景來適配兩套不一樣的數據庫驅動。
這個設計,剝離了分佈式事務方案對數據庫在 協議支持 上的要求。
兩階段提交:
不管 Phase2 的決議是 commit 仍是 rollback,事務性資源的鎖都要保持到 Phase2 完成才釋放。
設想一個正常運行的業務,大機率是 90% 以上的事務最終應該是成功提交的,咱們是否能夠在 Phase1 就將本地事務提交呢?這樣 90% 以上的狀況下,能夠省去 Phase2 持鎖的時間,總體提升效率。
這個設計,極大地減小了分支事務對資源(數據和鏈接)的鎖定時間,給總體併發和吞吐的提高提供了基礎。
在本節,咱們將經過一個實戰案例來具體介紹Seata的使用方式,咱們將模擬一個簡單的用戶購買商品下單場景,建立3個子工程,分別是 order-server (下單服務)、storage-server(庫存服務)和 pay-server (支付服務),具體流程圖如圖:
在本次實戰中,咱們使用Nacos作爲服務中心和配置中心,Nacos部署請參考本書的第十一章,這裏再也不贅述。
接下來咱們須要部署Seata的Server端,下載地址爲:https://github.com/seata/seata/releases ,建議選擇最新版本下載,目前筆者看到的最新版本爲 v0.8.0 ,下載 seata-server-0.8.0.tar.gz 解壓後,打開 conf 文件夾,咱們需對其中的一些配置作出修改。
registry {
type = "nacos"
nacos {
serverAddr = "192.168.0.128"
namespace = "public"
cluster = "default"
}
}
config {
type = "nacos"
nacos {
serverAddr = "192.168.0.128"
namespace = "public"
cluster = "default"
}
}複製代碼
這裏咱們選擇使用Nacos做爲服務中心和配置中心,這裏作出對應的配置,同時能夠看到Seata的註冊服務支持:file 、nacos 、eureka、redis、zk、consul、etcd三、sofa等方式,配置支持:file、nacos 、apollo、zk、consul、etcd3等方式。
這裏咱們須要其中配置的數據庫相關配置,具體以下:
## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://192.168.0.128:3306/seata"
user = "root"
password = "123456"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}複製代碼
這裏數據庫默認是使用mysql,須要配置對應的數據庫鏈接、用戶名和密碼等。
service.vgroup_mapping.spring-cloud-pay-server=default
service.vgroup_mapping.spring-cloud-order-server=default
service.vgroup_mapping.spring-cloud-storage-server=default複製代碼
這裏的語法爲:service.vgroup_mapping.${your-service-gruop}=default
,中間的${your-service-gruop}
爲本身定義的服務組名稱,這裏須要咱們在程序的配置文件中配置,筆者這裏直接使用程序的spring.application.name
。
須要在剛纔配置的數據庫中執行數據初始腳本 db_store.sql ,這個是全局事務控制的表,須要提早初始化。
這裏咱們只是作演示,理論上上面三個業務服務應該分屬不一樣的數據庫,這裏咱們只是在同一臺數據庫下面建立三個 Schema ,分別爲 dbaccount 、 dborder 和 db_storage ,具體如圖:
由於咱們是使用的Nacos做爲配置中心,因此這裏須要先執行腳原本初始化Nacos的相關配置,命令以下:
cd conf
sh nacos-config.sh 192.168.0.128複製代碼
執行成功後能夠打開Nacos的控制檯,在配置列表中,能夠看到初始化了不少 Group 爲 SEATA_GROUP 的配置,如圖:
初始化成功後,可使用下面的命令啓動Seata的Server端:
cd bin
sh seata-server.sh -p 8091 -m file複製代碼
啓動後在 Nacos 的服務列表下面能夠看到一個名爲 serverAddr 的服務
到這裏,咱們的環境準備工做就作完了,接下來開始代碼實戰。
因爲本示例代碼偏多,這裏僅介紹核心代碼和一些須要注意的代碼,其他代碼各位讀者能夠訪問本書配套的代碼倉庫獲取。
子工程common用來放置一些公共類,主要包含視圖 VO 類和響應類 OperationResponse.java。
代碼清單:Alibaba/seata-nacos-jpa/pom.xml
***
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Cloud Nacos Service Discovery -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- Spring Cloud Nacos Config -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- Spring Cloud Seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>複製代碼
說明:本示例是使用 JPA 做爲數據庫訪問 ORM 層, Mysql 做爲數據庫,需引入 JPA 和 Mysql 相關依賴, spring-cloud-alibaba-dependencies
的版本是 2.1.0.RELEASE , 其中有關Seata的組件版本爲 v0.7.1 ,雖然和服務端版本不符,經簡單測試,未發現問題。
Seata 是經過代理數據源實現事務分支,因此須要配置 io.seata.rm.datasource.DataSourceProxy 的 Bean,且是 @Primary默認的數據源,不然事務不會回滾,沒法實現分佈式事務,數據源配置類DataSourceProxyConfig.java以下:
代碼清單:Alibaba/seata-nacos-jpa/order-server/src/main/java/com/springcloud/orderserver/config/DataSourceProxyConfig.java***
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}複製代碼
咱們在order-server服務中開始整個業務流程,須要在這裏的方法上增長全局事務的註解@GlobalTransactional
,具體代碼以下:
代碼清單:Alibaba/seata-nacos-jpa/order-server/src/main/java/com/springcloud/orderserver/service/impl/OrderServiceImpl.java***
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private RestTemplate restTemplate;
@Autowired
private OrderDao orderDao;
private final String STORAGE_SERVICE_HOST = "http://spring-cloud-storage-server/storage";
private final String PAY_SERVICE_HOST = "http://spring-cloud-pay-server/pay";
@Override
@GlobalTransactional
public OperationResponse placeOrder(PlaceOrderRequestVO placeOrderRequestVO) {
Integer amount = 1;
Integer price = placeOrderRequestVO.getPrice();
Order order = Order.builder()
.userId(placeOrderRequestVO.getUserId())
.productId(placeOrderRequestVO.getProductId())
.status(OrderStatus.INIT)
.payAmount(price)
.build();
order = orderDao.save(order);
log.info("保存訂單{}", order.getId() != null ? "成功" : "失敗");
log.info("當前 XID: {}", RootContext.getXID());
// 扣減庫存
log.info("開始扣減庫存");
ReduceStockRequestVO reduceStockRequestVO = ReduceStockRequestVO.builder()
.productId(placeOrderRequestVO.getProductId())
.amount(amount)
.build();
String storageReduceUrl = String.format("%s/reduceStock", STORAGE_SERVICE_HOST);
OperationResponse storageOperationResponse = restTemplate.postForObject(storageReduceUrl, reduceStockRequestVO, OperationResponse.class);
log.info("扣減庫存結果:{}", storageOperationResponse);
// 扣減餘額
log.info("開始扣減餘額");
ReduceBalanceRequestVO reduceBalanceRequestVO = ReduceBalanceRequestVO.builder()
.userId(placeOrderRequestVO.getUserId())
.price(price)
.build();
String reduceBalanceUrl = String.format("%s/reduceBalance", PAY_SERVICE_HOST);
OperationResponse balanceOperationResponse = restTemplate.postForObject(reduceBalanceUrl, reduceBalanceRequestVO, OperationResponse.class);
log.info("扣減餘額結果:{}", balanceOperationResponse);
Integer updateOrderRecord = orderDao.updateOrder(order.getId(), OrderStatus.SUCCESS);
log.info("更新訂單:{} {}", order.getId(), updateOrderRecord > 0 ? "成功" : "失敗");
return OperationResponse.builder()
.success(balanceOperationResponse.isSuccess() && storageOperationResponse.isSuccess())
.build();
}
}複製代碼
其次,咱們須要在另外兩個服務的方法中增長註解@Transactional
,表示開啓事務。
這裏的遠端服務調用是經過 RestTemplate
,須要在工程啓動時將 RestTemplate
注入 Spring 容器中管理。
工程中需在 resources 目錄下增長有關Seata的配置文件 registry.conf ,以下:
代碼清單:Alibaba/seata-nacos-jpa/order-server/src/main/resources/registry.conf***
registry {
type = "nacos"
nacos {
serverAddr = "192.168.0.128"
namespace = "public"
cluster = "default"
}
}
config {
type = "nacos"
nacos {
serverAddr = "192.168.0.128"
namespace = "public"
cluster = "default"
}
}複製代碼
在 bootstrap.yml 中的配置以下:
代碼清單:Alibaba/seata-nacos-jpa/order-server/src/main/resources/bootstrap.yml
***
spring:
application:
name: spring-cloud-order-server
cloud:
nacos:
# nacos config
config:
server-addr: 192.168.0.128
namespace: public
group: SEATA_GROUP
# nacos discovery
discovery:
server-addr: 192.168.0.128
namespace: public
enabled: true
alibaba:
seata:
tx-service-group: ${spring.application.name}複製代碼
service.vgroup_mapping.${your-service-gruop}=default
中間的 ${your-service-gruop}
。這兩處配置請務必一致,不然在啓動工程後會一直報錯 no available server to connect
。 數據庫初始腳本位於:Alibaba/seata-nacos-jpa/sql ,請分別在三個不一樣的 Schema 中執行。
測試工具咱們選擇使用 PostMan ,啓動三個服務,順序無關 order-server、pay-server 和 storage-server 。
使用 PostMan 發送測試請求,如圖:
數據庫初始化餘額爲 10 ,這裏每次下單將會消耗 5 ,咱們能夠正常下單兩次,第三次應該下單失敗,而且回滾 db_order 中的數據。數據庫中數據如圖:
咱們進行第三次下單操做,如圖:
這裏看到直接報錯500,查看數據庫 db_order 中的數據,如圖:
能夠看到,這裏的數據並未增長,咱們看下子工程_rder-server的控制檯打印:
日誌已通過簡化處理
Hibernate: insert into orders (pay_amount, product_id, status, user_id) values (?, ?, ?, ?)
c.s.b.c.service.impl.OrderServiceImpl : 保存訂單成功
c.s.b.c.service.impl.OrderServiceImpl : 當前 XID: 192.168.0.102:8091:2021674307
c.s.b.c.service.impl.OrderServiceImpl : 開始扣減庫存
c.s.b.c.service.impl.OrderServiceImpl : 扣減庫存結果:OperationResponse(success=true, message=操做成功, data=null)
c.s.b.c.service.impl.OrderServiceImpl : 開始扣減餘額
i.s.core.rpc.netty.RmMessageListener : onMessage:xid=192.168.0.102:8091:2021674307,branchId=2021674308,branchType=AT,resourceId=jdbc:mysql://192.168.0.128:3306/db_order,applicationData=null
io.seata.rm.AbstractRMHandler : Branch Rollbacking: 192.168.0.102:8091:2021674307 2021674308 jdbc:mysql://192.168.0.128:3306/db_order
i.s.rm.datasource.undo.UndoLogManager : xid 192.168.0.102:8091:2021674307 branch 2021674308, undo_log deleted with GlobalFinished
io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
i.seata.tm.api.DefaultGlobalTransaction : [192.168.0.102:8091:2021674307] rollback status:Rollbacked複製代碼
從日誌中沒有能夠清楚的看到,在服務order-server是先執行了訂單寫入操做,而且調用扣減庫存的接口,經過查看storage-server的日誌也能夠發現,同樣是先執行了庫存修改操做,直到扣減餘額的時候發現餘額不足,開始對 xid 爲 192.168.0.102:8091:2021674307
執行回滾操做,而且這個操做是全局回滾。
目前在 Seata v0.8.0 的版本中,Server端還沒有支持集羣部署,不建議應用於生產環境,而且開源團隊計劃在 v1.0.0 版本的時候可使用與生產環境,各位讀者能夠持續關注這個開源項目。
參考資料:Seata官方文檔