消息中間件使用普遍,經常使用來削峯填谷、系統解耦、異步處理。異步處理多是使用的最多的場景了,好比如今的技術博客網站,都採用積分制,用戶發表一篇文章後,能夠獲取想要的積分,爲了提高系統的性能,給用戶加積分的操做能夠異步處理,並不須要放在同步流程中。html
咱們能夠把用戶ID,須要增長的積分封裝成一條消息投遞到消息系統中,異步處理加積分操做,因爲這是發生在不一樣服務器之間,消息有可能投遞失敗、處理失敗等問題,從而致使用戶加積分失敗,還有一種多是消息重複投遞,那麼用戶就有可能重複加積分,無論出現那種狀況,都是不正常的狀況。java
要避免上面的兩種狀況,就須要咱們儘可能保證消息不丟失和消息只被消費一次,這篇文章拋開具體的消息中間件,從消息系統的通用層面上,談談如何避免這兩種狀況。git
一條消息從生產到消費這條鏈路中,有三個地方可能會形成消息丟失,分別以下:github
消息生產者和消息系統通常都是獨立部署在不一樣的服務器上,兩臺服務器之間要通訊就要經過網絡來完成,網絡是不穩定,可能會發生抖動,那麼數據就有可能丟失。網絡發生抖動會有如下兩種狀況。算法
情景一:消息在傳送給消息系統的過程當中發生網絡抖動,數據直接丟失。 情景二:消息已經到達消息系統,可是在消息系統給生產者服務器返回信息時,網絡發生抖動,此時的數據不必定真正的丟失,極可能只是生產者認爲數據丟失。sql
針對消息在消息生產時丟失,能夠採起重投機制,當程序檢測到網絡異常時,將消息再次投遞到消息系統。可是從新投遞在情景二狀況下,可能形成數據重複,如何解決這個問題,在後面會提到。數據庫
消息系統是能夠對消息進行持久化,通常都是將消息存儲到本地磁盤中,固然也有少數消息中間件支持將數據持久化到數據庫中,那麼消息系統的性能可能就會降低。服務器
若是你對 Redis 的持久化有必定的瞭解話,你會發現 Redis 在持久化數據時並非每新增一條就當即存入到本地磁盤,而是會將數據先寫入到操做系統的 Page Cache 中,當知足必定條件時,再將 Page Cache 中的數據刷入磁盤,由於這樣能夠減小對磁盤的隨機 I/O 操做,咱們知道隨機 I/O 是很是耗時的,這樣也提升了系統性能,消息中間件也不例外,在持久化時也是採用這種方式。網絡
在某些極端狀況下,可能會形成 Page Cache 中的數據丟失,好比忽然停電或者機器異常重啓操做。要解決 Page Cache 中數據丟失問題,能夠採用集羣部署的方式,來儘可能保證數據不丟失。異步
消息在消費過程當中也是會發生丟失的,並且在消費過程當中丟失的機率要比前兩種狀況大不少。一條消息消費過程大概分紅三步:消費者拉取消息,消費者處理消息,消息系統更新消費進度。
第一步在拉取消息的時候可能發生網絡抖動異常,第二步在處理消息的時候可能發生一些業務異常,而致使流程並無走完,若是在第一步、第二步發生異常的狀況下,通知消息系統更新消費進度,那麼這條失敗的消息就永遠不會在被處理了,天然就丟失了,其實咱們的業務並無跑完。
要避免消息在消費時丟失的狀況,能夠在消息接收和處理完成以後才更新消費進度,可是在極端的狀況下,會出現消息重複消費的問題,好比某一條消息在處理完成以後,消費者宕機了,這時尚未更新消費進度,消費者重啓後,這條消息仍是會被消費到。
消息系統自己不能保證消息僅被消費一次,由於消費自己可能重複、下游系統啓動拉取重複、失敗重試帶來的重複、補償邏輯致使的重複都有可能造重複消息,要保證消息僅被消費一次能夠利用等冪性來實現。
等冪是數學上的一個概念,就是屢次執行同一個操做和執行一次操做,最終獲得的結果是相同的。
從等冪的概念上就能夠看出來,就算消息執行屢次也不會對系統形成影響,那麼在使用消息系統時如何保證等冪性呢?由於生產者和消費者都有可能產生重複消息,因此要在生產者和消費者兩端都保證等冪性。
保證生產者等冪性,在生產消息的時候,利用雪花算法給消息生成一個全局 ID,在消息系統中維護消息已 ID 映射關係,若是在映射表中已經存在相同 ID,這丟棄這條消息,雖然消息被投遞了兩次,可是實際上就保存了一條,避免了消息重複問題。
生產者等冪性跟所選者的消息中間件有關係,由於絕大數狀況下消息系統不須要咱們本身實現,因此等冪性是不太好控制的,消費者等冪性纔是咱們開發人員控制的重點方向。
在消費者端能夠從通用層和業務層兩個方面來作等冪操做,取決於咱們的業務要求。
在通用層面中,利用好消息生成是產生的全局惟一ID,消息被處理成功後,把這個全局 ID 存入到數據中,在處理下一條消息以前,先從數據庫中查詢這個全局 ID 是否存在,若是已經存在,則直接放棄該消息。
利用這個全局惟一ID就實現了消息等冪性,僞代碼以下:
boolean isIDExisted = selectByID(ID); // 判斷ID是否存在 if(isIDExisted) { return; //存在則直接返回 } else { process(message); //不存在,則處理消息 saveID(ID); //存儲ID }
可是在極端狀況下,這種方式仍是會出問題,若是消息在處理以後,還沒來得及保存到數據庫,消費者就宕機重啓了,重啓以後還會再次獲取該消息,執行時查詢該消息並未被消費過,仍是會執行兩次消費。能夠引入數據庫事務來解決這個問題,可是會下降系統性能。若是對消息重複消費沒有特別嚴格要求的話,直接使用這種沒有引入事務的通用方案就行了,畢竟這也是極小機率的事情。
在業務層面上,咱們可選擇性就變多了,好比樂觀鎖、悲觀鎖、內存去重(https://github.com/RoaringBitmap/RoaringBitmap)等方法。
咱們拿樂觀鎖來舉例,好比咱們要給一個用戶加積分,由於加積分操做並不須要放在主業務中,因此就可使用消息系統來異步通知,要使用樂觀鎖,就須要給積分表添加一個版本號字段。而且在生產消息的時候先查詢這個帳號的版本號而且連同消息一塊兒發送到消息系統中。
消費者拿到消息和版本號後,在執行更新積分操做的 SQL 時帶上版本號,相似於:
update score set score = score + 20, version=version+1 where userId=1 and version=1;
這條消息消費成功後,version 就變成了 2,那麼若是有重複的 version=1 的消息再次被消費者拉取到,SQL 語句並不會執行成功,從而保證了消息的冪等性。
要保證消息僅被消費一次,咱們須要把重點放在消費者這一段,利用等冪性來保證消息被消費一次。
今天站在消息中間件的通用層面上,聊了聊如何保證數據不丟失和僅被消費一次,但願今天的文章對您的學習或者工做有所幫助,若是您認爲文章有價值,歡迎點個贊,謝謝。
目前互聯網上不少大佬都有消息中間件相關文章,若有雷同,請多多包涵了。原創不易,碼字不易,還但願你們多多支持。若文中有所錯誤之處,還望提出,謝謝。
原文出處:https://www.cnblogs.com/jamaler/p/12467206.html