可靠消息最終一致(異步確保型)

前言

一致性設計在分佈式系統中是一個重要問題。若是一個系統同時使用多個子數據系統來存儲與讀取數據,就必須設計知足功能需求的一致性定義。若是系統對不一樣數據子系統進行操做的結果不一致,不但可能會使用戶困惑,更可能引起更嚴重的數據問題或系統錯誤。一致性有多種級別,適用於不一樣的業務場景。對於金融等對數據一致性要求較高的行業,傳統的事務能夠提供較高的一致性保證。對於分佈式系統等對性能(performance)和可用性(availability)要求較高的場景,犧牲必定的強一致性來換取更好的用戶體驗也可接受。git

在以前寫過一次關於TCC事務的筆記,也瞭解了分佈式事務產生的緣由,以及部分解決方案,此次一塊兒跟你們總結下最終一致性方案設計思路的相關內容;github

場景

消息發送一致性的概念:是指產生消息的業務動做與消息發送的一致。數據庫

也就是說,若是業務操做成功,那麼由這個業務操做所產生的消息必定要成功投遞出去,不然就丟消息

最終一致性可使用在相似以下功能場景當中:segmentfault

  • 對應支付系統會計異步記帳業務
  • 普通的積分帳戶增長積分的服務

也就是說:採用最終一致性的數據系統一般不要求數據操做失敗時執行回滾(rollback)。用戶或系統日誌將得知操做失敗,但在另外一次成功的操做以前,數據的不一致問題並不會被自動修復。併發

ps:確定會有小夥伴有疑問,我執行程序的時候代碼報錯了致使最終一致性的方案不成功怎麼辦???小朋友你是否是有不少???

若是是代碼報錯,自己說明了你的業務代碼有問題,而不是最終一致性方案的鍋~~。異步

實現流程

最終一致性能夠藉助消息中間件,消息隊列等工具實現,須要根據本身的業務去定製不一樣的技術方案;分佈式

我們要介紹的是基於RabbitMq實現的一個可靠消息服務系統來完成事務的執行,具體流程以下圖:
圖片8.png工具

  1. 主動方應用先把消息發給消息中間件,消息狀態標記爲「待確認」;
  2. 消息中間件收到消息後,把消息持久化到消息存儲中,但並不向被動方應用投遞消息;
  3. 消息中間件返回消息持久化結果(成功/失敗),主動方應用根據返回結果進行判斷如何進行業務操做處理:性能

    • 失敗:放棄業務操做處理,結束(必要時向上層返回失敗結果);
    • 成功:執行業務操做處理;
  4. 業務操做完成後,把業務操做結果(成功/失敗)發送給消息中間件;
  5. 消息中間件收到業務操做結果後,根據業務結果進行處理;學習

    • 失敗:刪除消息存儲中的消息,結束;
    • 成功:更新消息存儲中的消息狀態爲「待發送(可發送)」,緊接着執行消息投遞;
  6. 被動方應用監聽並接收「待發送」狀態的消息,執行業務處理;
  7. 業務處理完成後,向消息中間件發送ACK,確認消息已經收到(消息)中間件將從隊列中刪除該消息)

除了以上幾個流程,消息系統還應該提供ackMsg消息確認服務、消息狀態查詢服務。

異常的處理流程

圖片1.png

主動方角度

圖片2.png

圖片3.png

從中間件的角度

圖片4.png
圖片5.png

異常狀況的總結處理

圖片6.png

方案落地

圖片7.png

消息系統組成

  1. 消息服務子系統:

是最重要的一個子系統,它接收並存儲預發送的消息,並提供進一步的確認功能。通常須要實現如下接口服務。

  • 存儲預發送消息(主動方應用系統)
  • 確認併發送消息(主動方應用系統)
  • 查詢狀態確認超時的消息(消息狀態確認子系統)
  • 確認消息已被成功消費(被動方應用系統)
  • 查詢消費確認超時的消息(消息恢復子系統)
  1. 消息管理子系統:

提供一個可視化的管理界面,對可靠消息服務系統中的數據,進行查詢和管理。好比可查看已死亡的消息,可經過界面手工重發等

  1. 消息狀態確認子系統:

提供對異常狀況的處理。當消息服務子系統收到並保存預發送消息,但因異常狀況,沒有收到確認發送消息時,這種消息不可能一直留存在數據庫中。這種狀況的數據,就須要消息狀態確認子系統按期撈取這些待確認超時的數據,去調用主動方應用系統中的業務查詢接口進行覈對確認。根據覈對結果決定是發送消息仍是刪除數據。

  1. 消息恢復子系統:

若是消息數據已經接收到業務確認,這種通過業務確認的消息,就必定要發送到MQ,並被消費方成功消費,毫不能丟。消息恢復子系統按期撈取那些狀態是「發送中」,而沒有被消費確認的超時消息,進行從新發送。

  1. 實時消息服務子系統(MQ):

消費方監聽程序,接收MQ消息,成功處理後調用消息服務子系統的接口,確認消息已被成功消費,能夠刪除。

總體流程

  1. 用戶下單,主動方應用預發送消息給消息服務子系統。
  2. 消息服務子系統存儲預發送的消息。
  3. 返回存儲預發送消息的結果。
  4. 若是第3步返回的結果是成功的,則執行業務操做,不然不執行。
  5. 業務操做成功後,調用消息服務子系統進行確認發送消息。
  6. 將消息服務庫中存儲的預發送消息發送,並更新該消息的狀態爲已發送(但不是已被消費)。
  7. 消息中間件發送消息到消費端應用。
  8. 消費端應用調用被動方應用服務。
  9. 被動方應用返回結果給消費端應用。
  10. 消費端應用向消息中間件ack此條消息,並向消息服務子系統進行確認成功消費消息,
  11. 讓消息服務子系統刪除該條消息或者將狀態置爲已成功消費。
  12. 消息狀態子系統定時去查一下消息數據,看看有沒有是已發送狀態的超時消息,就是一直沒有變成已成功消費的那種消息,主動方應用系統應該提供查詢接口,針對某條消息查詢該條消息對應的業務數據是否爲處理成功
  13. 若是業務數據是處理成功的狀態,那麼就再次調用確認併發送消息,即進入第6步。
  14. 若是業務數據是處理失敗的,那麼就調用消息服務子系統進行刪除該條消息數據。

謝謝觀賞


本文是學習過程當中的筆記整理,若是有不對的地方請你們聯繫我,及時糾正,避免誤導童鞋們,謝謝各位童鞋的耐心觀看,但願本文會幫助到您~後期計劃在Hyperf中寫一個demo,感興趣的能夠留意個人GitHub

相關文章
相關標籤/搜索