消費端如何保證消息隊列MQ的有序消費

消息無序產生的緣由

消息隊列,既然是隊列就能保證消息在進入隊列,以及出隊列的時候保證消息的有序性,顯然這是在消息的生產端(Producer),可是每每在生產環境中有多個消息的消費端(Consumer),儘管消費端在拉取消息時是有序的,但各個消息因爲網絡等方面緣由沒法保證在各個消費端中處理時有序。html

場景分析

前後兩次修改了商品信息,消息A和消息B前後同步寫入MySQL,接着異步寫入消息隊列中發送消息,此時消息隊列生產端(Producer)按時序前後發出了A和B兩條消息(消息A先發出,消息B後發出)。按業務邏輯,商品信息的最終狀態須要以消息A和消息B綜合爲準。程序員

看似一個比較常見的同步寫數據庫,異步發送消息的場景,但實際上須要保證消息的有序消費。數據庫

  • 假設1:消息A只包含修改的商品名稱,消息B只包含修改的商品重量,此時消息隊列的消費端實際上不須要關注消息時序,消息隊列消費端(Consumer)只管消費便可。
  • 假設2:消息A包含修改的商品名稱、重量,消息B包含修改的商品名稱,此時消費端首先接收到消息B,後接收到消息A,那麼消息B的修改就會被覆蓋。此時消息隊列的消費端實際上又須要關注消息時序

可見,你沒法保證消息中包含什麼信息,此時必須保證消息的有序消費。緩存

業務角度如何保證消息有序消費

  • 生產端在發送消息時,始終保證消息是全量信息。
  • 消費端在接收消息時,經過緩存時間戳的方式,消費消息時判斷消息產生的時間是否最新,若是不是則丟棄,若是是則執行下一步。

下面經過僞代碼的方式描述:安全

生產端僞代碼網絡

insertWare(ware); #插入數據到數據庫,一般在插入數據庫時咱們只會update修改的字段,而不會全量插入異步

ware = selectWareById(ware.getId); #獲取商品的全量信息(此時是最新的),用於將它放入到消息隊列中分佈式

syncMq(ware); #異步發送mq消息Afetch

消費端僞代碼spa

ware = fetchWare(); #獲取消息

if (isLasted(ware)) #經過商品的修改時間戳判斷是不是最新的修改

​ TODO #執行下一步業務邏輯

else

​ return #丟棄該消息

重點在於消費端如何判斷該消息是不是最新的修改也就是isLasted方法。

isLasted方法

Long modified = getCacheById(ware.getId); #獲取緩存中該條商品的最新修改時間

If (ware.getModified > modified) { #若是消息中商品修改時間大於緩存中的時間,說明是最新操做

​ setCacheById(ware); #將該條消息的商品修改時間戳寫入到緩存中

​ return true; } else #若是消息中的商品修改時間小於緩存中的時間,說明該條消息屬於「歷史操做」,不對其更新

​ return false;

以上就是經過僞代碼的方式,描述如何經過業務手段保證消息有序消費,重點在於全量發送信息和緩存時間戳。在其中還有一些技術實現細節。

例如:消費端消費消息B,執行到獲取時間戳緩存以後,並在從新設置新的緩存以前,此時另外一個消費端剛好也正在消費B它也正執行到獲取時間戳緩存,因爲消息A此時並無更新緩存,消息A拿到的緩存仍然是舊的緩存,這時就會存在兩個消費端都認爲本身所消費的消息時最新的,形成該丟棄的消息沒丟。

顯然,這是分佈式線程安全問題,分佈式鎖一般使用Redis或者ZooKeeper,加鎖後的執行時序以下圖所示。

這是從業務角度保證消息在消費端有序消費。經過在消息發送端全量發送消息以及在消息消費端緩存時間戳就能夠保證消息的有序消費。

在上述場景中是先同步寫入MySQL,再獲取商品全量數據,接着再異步發送消息。這一系列的步驟能夠經過接MySQL的binlog實現,在同步寫入MySQL後,MySQL發送binlog變動,經過阿里巴巴Canal中間件接收MySQL的binlog變動再發送消息到消息隊列。

<div align="center">這是一個能給程序員加buff的公衆號 (CoderBuff)</div> <div align="center"><img src="https://img2018.cnblogs.com/blog/630246/201907/630246-20190717223740465-1981496921.png" /></div>

原文出處:https://www.cnblogs.com/yulinfeng/p/11254925.html

相關文章
相關標籤/搜索