消息隊列已經逐漸成爲企業IT系統內部通訊的核心手段。它具備低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成爲異步RPC的主要手段之一。
當今市面上有不少主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,煊赫一時的Kafka,阿里巴巴自主開發的Notify、MetaQ、RocketMQ等。
本文不會一一介紹這些消息隊列的全部特性,而是探討一下自主開發設計一個消息隊列時,你須要思考和設計的重要方面。過程當中咱們會參考這些成熟消息隊列的不少重要思想。
本文首先會闡述何時你須要一個消息隊列,而後以Push模型爲主,從零開始分析設計一個消息隊列時須要考慮到的問題,如RPC、高可用、順序和重複消息、可靠投遞、消費關係解析等。
也會分析以Kafka爲表明的pull模型所具有的優勢。最後是一些高級主題,如用批量/異步提升性能、pull模型的系統設計理念、存儲子系統的設計、流量控制的設計、公平調度的實現等。其中最後四個方面會放在下篇講解。前端
當你須要使用消息隊列時,首先須要考慮它的必要性。可使用mq的場景有不少,最經常使用的幾種,是作業務解耦/最終一致性/廣播/錯峯流控等。反之,若是須要強一致性,關注業務邏輯的處理結果,則RPC顯得更爲合適。算法
解耦是消息隊列要解決的最本質問題。所謂解耦,簡單點講就是一個事務,只關心核心的流程。而須要依賴其餘系統但不那麼重要的事情,有通知便可,無需等待結果。換句話說,基於消息的模型,關心的是「通知」,而非「處理」。
好比在美團旅遊,咱們有一個產品中心,產品中心上游對接的是主站、移動後臺、旅遊供應鏈等各個數據源;下游對接的是篩選系統、API系統等展現系統。當上遊的數據發生變動的時候,若是不使用消息系統,勢必要調用咱們的接口來更新數據,就特別依賴產品中心接口的穩定性和處理能力。但其實,做爲旅遊的產品中心,也許只有對於旅遊自建供應鏈,產品中心更新成功纔是他們關心的事情。而對於團購等外部系統,產品中心更新成功也好、失敗也罷,並非他們的職責所在。他們只須要保證在信息變動的時候通知到咱們就行了。
而咱們的下游,可能有更新索引、刷新緩存等一系列需求。對於產品中心來講,這也不是咱們的職責所在。說白了,若是他們定時來拉取數據,也能保證數據的更新,只是實時性沒有那麼強。但使用接口方式去更新他們的數據,顯然對於產品中心來講太過於「重量級」了,只須要發佈一個產品ID變動的通知,由下游系統來處理,可能更爲合理。
再舉一個例子,對於咱們的訂單系統,訂單最終支付成功以後可能須要給用戶發送短信積分什麼的,但其實這已經不是咱們系統的核心流程了。若是外部系統速度偏慢(好比短信網關速度很差),那麼主流程的時間會加長不少,用戶確定不但願點擊支付過好幾分鐘纔看到結果。那麼咱們只須要通知短信系統「咱們支付成功了」,不必定非要等待它處理完成。數據庫
最終一致性指的是兩個系統的狀態保持一致,要麼都成功,要麼都失敗。固然有個時間限制,理論上越快越好,但實際上在各類異常的狀況下,可能會有必定延遲達到最終一致狀態,但最後兩個系統的狀態是同樣的。
業界有一些爲「最終一致性」而生的消息隊列,如Notify(阿里)、QMQ(去哪兒)等,其設計初衷,就是爲了交易系統中的高可靠通知。
以一個銀行的轉帳過程來理解最終一致性,轉帳的需求很簡單,若是A系統扣錢成功,則B系統加錢必定成功。反之則一塊兒回滾,像什麼都沒發生同樣。
然而,這個過程當中存在不少可能的意外:編程
可見,想把這件看似簡單的事真正作成,真的不那麼容易。全部跨VM的一致性問題,從技術的角度講通用的解決方案是:緩存
最終一致性不是消息隊列的必備特性,但確實能夠依靠消息隊列來作最終一致性的事情。另外,全部不保證100%不丟消息的消息隊列,理論上沒法實現最終一致性。好吧,應該說理論上的100%,排除系統嚴重故障和bug。
像Kafka一類的設計,在設計層面上就有丟消息的可能(好比定時刷盤,若是掉電就會丟消息)。哪怕只丟千分之一的消息,業務也必須用其餘的手段來保證結果正確。性能優化
消息隊列的基本功能之一是進行廣播。若是沒有消息隊列,每當一個新的業務方接入,咱們都要聯調一次新接口。有了消息隊列,咱們只須要關心消息是否送達了隊列,至於誰但願訂閱,是下游的事情,無疑極大地減小了開發和聯調的工做量。
好比本文開始提到的產品中心發佈產品變動的消息,以及景點庫不少去重更新的消息,可能「關心」方有不少個,但產品中心和景點庫只須要發佈變動消息便可,誰關心誰接入。網絡
試想上下游對於事情的處理能力是不一樣的。好比,Web前端每秒承受上千萬的請求,並非什麼神奇的事情,只須要加多一點機器,再搭建一些LVS負載均衡設備和Nginx等便可。但數據庫的處理能力卻十分有限,即便使用SSD加分庫分表,單機的處理能力仍然在萬級。因爲成本的考慮,咱們不能奢求數據庫的機器數量追上前端。
這種問題一樣存在於系統和系統之間,如短信系統可能因爲短板效應,速度卡在網關上(每秒幾百次請求),跟前端的併發量不是一個數量級。但用戶晚上個半分鐘左右收到短信,通常是不會有太大問題的。若是沒有消息隊列,兩個系統之間經過協商、滑動窗口等複雜的方案也不是說不能實現。但系統複雜性指數級增加,勢必在上游或者下游作存儲,而且要處理定時、擁塞等一系列問題。並且每當有處理能力有差距的時候,都須要單獨開發一套邏輯來維護這套邏輯。因此,利用中間系統轉儲兩個系統的通訊內容,並在下游系統有能力處理這些消息的時候,再處理這些消息,是一套相對較通用的方式。session
總而言之,消息隊列不是萬能的。對於須要強事務保證並且延遲敏感的,RPC是優於消息隊列的。
對於一些無關痛癢,或者對於別人很是重要可是對於本身不是那麼關心的事情,能夠利用消息隊列去作。
支持最終一致性的消息隊列,可以用來處理延遲不那麼敏感的「分佈式事務」場景,並且相對於笨重的分佈式事務,多是更優的處理方式。
當上下游系統處理能力存在差距的時候,利用消息隊列作一個通用的「漏斗」。在下游有能力處理的時候,再進行分發。
若是下游有不少系統關心你的系統發出的通知的時候,果斷地使用消息隊列吧。併發
咱們如今明確了消息隊列的使用場景,下一步就是如何設計實現一個消息隊列了。
基於消息的系統模型,不必定須要broker(消息隊列服務端)。市面上的的Akka(actor模型)、ZeroMQ等,其實都是基於消息的系統設計範式,可是沒有broker。
咱們之因此要設計一個消息隊列,而且配備broker,無外乎要作兩件事情:負載均衡
通常來說,設計消息隊列的總體思路是先build一個總體的數據流,例如producer發送給broker,broker發送給consumer,consumer回覆消費確認,broker刪除/備份消息等。
利用RPC將數據流串起來。而後考慮RPC的高可用性,儘可能作到無狀態,方便水平擴展。
以後考慮如何承載消息堆積,而後在合適的時機投遞消息,而處理堆積的最佳方式,就是存儲,存儲的選型須要綜合考慮性能/可靠性和開發維護成本等諸多因素。
爲了實現廣播功能,咱們必需要維護消費關係,能夠利用zk/config server等保存消費關係。
在完成了上述幾個功能後,消息隊列基本就實現了。而後咱們能夠考慮一些高級特性,如可靠投遞,事務特性,性能優化等。
下面咱們會以設計消息隊列時重點考慮的模塊爲主線,穿插灌輸一些消息隊列的特性實現方法,來具體分析設計實現一個消息隊列時的方方面面。
剛纔講到,所謂消息隊列,無外乎兩次RPC加一次轉儲,固然須要消費端最終作消費確認的狀況是三次RPC。既然是RPC,就必然牽扯出一系列話題,什麼負載均衡啊、服務發現啊、通訊協議啊、序列化協議啊,等等。在這一塊,個人強烈建議是不要重複造輪子。利用公司現有的RPC框架:Thrift也好,Dubbo也好,或者是其餘自定義的框架也好。由於消息隊列的RPC,和普通的RPC沒有本質區別。固然了,自主利用Memchached或者Redis協議從新寫一套RPC框架並不是不可(如MetaQ使用了本身封裝的Gecko NIO框架,卡夫卡也用了相似的協議)。但實現成本和難度無疑倍增。排除對效率的極端要求,均可以使用現成的RPC框架。
簡單來說,服務端提供兩個RPC服務,一個用來接收消息,一個用來確認消息收到。而且作到無論哪一個server收到消息和確認消息,結果一致便可。固然這中間可能還涉及跨IDC的服務的問題。這裏和RPC的原則是一致的,儘可能優先選擇本機房投遞。你可能會問,若是producer和consumer自己就在兩個機房了,怎麼辦?首先,broker必須保證感知的到全部consumer的存在。其次,producer儘可能選擇就近的機房就行了。
其實全部的高可用,是依賴於RPC和存儲的高可用來作的。先來看RPC的高可用,美團的基於MTThrift的RPC框架,阿里的Dubbo等,其自己就具備服務自動發現,負載均衡等功能。而消息隊列的高可用,只要保證broker接受消息和確認消息的接口是冪等的,而且consumer的幾臺機器處理消息是冪等的,這樣就把消息隊列的可用性,轉交給RPC框架來處理了。
那麼怎麼保證冪等呢?最簡單的方式莫過於共享存儲。broker多機器共享一個DB或者一個分佈式文件/kv系統,則處理消息天然是冪等的。就算有單點故障,其餘節點能夠馬上頂上。另外failover能夠依賴定時任務的補償,這是消息隊列自己自然就能夠支持的功能。存儲系統自己的可用性咱們不須要操太多心,放心大膽的交給DBA們吧!
對於不共享存儲的隊列,如Kafka使用分區加主備模式,就略微麻煩一些。須要保證每個分區內的高可用性,也就是每個分區至少要有一個主備且須要作數據的同步,關於這塊HA的細節,能夠參考下篇pull模型消息系統設計。
消息到達服務端若是不通過任何處理就到接收者了,broker就失去了它的意義。爲了知足咱們錯峯/流控/最終可達等一系列需求,把消息存儲下來,而後選擇時機投遞就顯得是瓜熟蒂落的了。
只是這個存儲能夠作成不少方式。好比存儲在內存裏,存儲在分佈式KV裏,存儲在磁盤裏,存儲在數據庫裏等等。但歸結起來,主要有持久化和非持久化兩種。
持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),而且理論上能承載更大限度的消息堆積(外存的空間遠大於內存)。
但並非每種消息都須要持久化存儲。不少消息對於投遞性能的要求大於可靠性的要求,且數量極大(如日誌)。這時候,消息不落地直接暫存內存,嘗試幾回failover,最終投遞出去也何嘗不可。
市面上的消息隊列廣泛兩種形式都支持。固然具體的場景還要具體結合公司的業務來看。
咱們來看看若是須要數據落地的狀況下各類存儲子系統的選擇。理論上,從速度來看,文件系統>分佈式KV(持久化)>分佈式文件系統>數據庫,而可靠性卻截然相反。仍是要從支持的業務場景出發做出最合理的選擇,若是大家的消息隊列是用來支持支付/交易等對可靠性要求很是高,但對性能和量的要求沒有這麼高,並且沒有時間精力專門作文件存儲系統的研究,DB是最好的選擇。
可是DB受制於IOPS,若是要求單broker 5位數以上的QPS性能,基於文件的存儲是比較好的解決方案。總體上能夠採用數據文件+索引文件的方式處理,具體這塊的設計比較複雜,能夠參考下篇的存儲子系統設計。
分佈式KV(如MongoDB,HBase)等,或者持久化的Redis,因爲其編程接口較友好,性能也比較可觀,若是在可靠性要求不是那麼高的場景,也不失爲一個不錯的選擇。
如今咱們的消息隊列初步具有了轉儲消息的能力。下面一個重要的事情就是解析發送接收關係,進行正確的消息投遞了。
市面上的消息隊列定義了一堆讓人暈頭轉向的名詞,如JMS 規範中的Topic/Queue,Kafka裏面的Topic/Partition/ConsumerGroup,RabbitMQ裏面的Exchange等等。拋開現象看本質,無外乎是單播與廣播的區別。所謂單播,就是點到點;而廣播,是一點對多點。固然,對於互聯網的大部分應用來講,組間廣播、組內單播是最多見的情形。
消息須要通知到多個業務集羣,而一個業務集羣內有不少臺機器,只要一臺機器消費這個消息就能夠了。
固然這不是絕對的,不少時候組內的廣播也是有適用場景的,如本地緩存的更新等等。另外,消費關係除了組內組間,可能會有多級樹狀關係。這種狀況太過於複雜,通常不列入考慮範圍。因此,通常比較通用的設計是支持組間廣播,不一樣的組註冊不一樣的訂閱。組內的不一樣機器,若是註冊一個相同的ID,則單播;若是註冊不一樣的ID(如IP地址+端口),則廣播。
至於廣播關係的維護,通常因爲消息隊列自己都是集羣,因此都維護在公共存儲上,如config server、zookeeper等。維護廣播關係所要作的事情基本是一致的:
上面都是些消息隊列基本功能的實現,下面來看一些關於消息隊列特性相關的內容,無論可靠投遞/消息丟失與重複以及事務乃至於性能,不是每一個消息隊列都會照顧到,因此要依照業務的需求,來仔細衡量各類特性實現的成本,利弊,最終作出最爲合理的設計。
這是個激動人心的話題,徹底不丟消息,究竟可不可能?答案是,徹底可能,前提是消息可能會重複,而且,在異常狀況下,要接受消息的延遲。
方案說簡單也簡單,就是每當要發生不可靠的事情(RPC等)以前,先將消息落地,而後發送。當失敗或者不知道成功失敗(好比超時)時,消息狀態是待發送,定時任務不停輪詢全部待發送消息,最終必定能夠送達。
具體來講:
對於各類不肯定(超時、down機、消息沒有送達、送達後數據沒落地、數據落地了回覆沒收到),其實對於發送方來講,都是一件事情,就是消息沒有送達。
重推消息所面臨的問題就是消息重複。重複和丟失就像兩個噩夢,你必需要面對一個。好在消息重複還有處理的機會,消息丟失再想找回就難了。
Anyway,做爲一個成熟的消息隊列,應該儘可能在各個環節減小重複投遞的可能性,不能由於重複有解決方案就放縱的亂投遞。
最後說一句,不是全部的系統都要求最終一致性或者可靠投遞,好比一個論壇系統、一個招聘系統。一個重複的簡歷或話題被髮布,可能比丟失了一個發佈顯得更讓用戶沒法接受。不斷重複一句話,任何基礎組件要服務於業務場景。
當broker把消息投遞給消費者後,消費者能夠當即響應我收到了這個消息。但收到了這個消息只是第一步,我能不能處理這個消息卻不必定。或許由於消費能力的問題,系統的負荷已經不能處理這個消息;或者是剛纔狀態機裏面提到的消息不是我想要接收的消息,主動要求重發。
把消息的送達和消息的處理分開,這樣才真正的實現了消息隊列的本質-解耦。因此,容許消費者主動進行消費確認是必要的。固然,對於沒有特殊邏輯的消息,默認Auto Ack也是能夠的,但必定要容許消費方主動ack。
對於正確消費ack的,沒什麼特殊的。可是對於reject和error,須要特別說明。reject這件事情,每每業務方是沒法感知到的,系統的流量和健康情況的評估,以及處理能力的評估是一件很是複雜的事情。舉個極端的例子,收到一個消息開始build索引,可能這個消息要處理半個小時,但消息量倒是很是的小。因此reject這塊建議作成滑動窗口/線程池相似的模型來控制,
消費能力不匹配的時候,直接拒絕,過一段時間重發,減小業務的負擔。
但業務出錯這件事情是隻有業務方本身知道的,就像上文提到的狀態機等等。這時應該容許業務方主動ack error,並能夠與broker約定下次投遞的時間。
上文談到重複消息是不可能100%避免的,除非能夠容許丟失,那麼,順序消息可否100%知足呢? 答案是能夠,但條件更爲苛刻:
因此絕對的順序消息基本上是不能實現的,固然在METAQ/Kafka等pull模型的消息隊列中,單線程生產/消費,排除消息丟失,也是一種順序消息的解決方案。
通常來說,一個主流消息隊列的設計範式裏,應該是不丟消息的前提下,儘可能減小重複消息,不保證消息的投遞順序。
談到重複消息,主要是兩個話題:
先來看看第一個話題,每個消息應該有它的惟一身份。無論是業務方自定義的,仍是根據IP/PID/時間戳生成的MessageId,若是有地方記錄這個MessageId,消息到來是可以進行比對就
能完成重複的鑑定。數據庫的惟一鍵/bloom filter/分佈式KV中的key,都是不錯的選擇。因爲消息不能被永久存儲,因此理論上都存在消息從持久化存儲移除的瞬間上游還在投遞的可能(上游因種種緣由投遞失敗,不停重試,都到了下游清理消息的時間)。這種事情都是異常狀況下才會發生的,畢竟是小衆狀況。兩分鐘消息都還沒送達,多送一次又能怎樣呢?冪等的處理消息是一門藝術,由於種種緣由重複消息或者錯亂的消息仍是來到了,說兩種通用的解決方案:
舉個簡單的例子,一個產品的狀態有上線/下線狀態。若是消息1是下線,消息2是上線。不巧消息1判重失敗,被投遞了兩次,且第二次發生在2以後,若是不作重複性判斷,顯然最終狀態是錯誤的。
可是,若是每一個消息自帶一個版本號。上游發送的時候,標記消息1版本號是1,消息2版本號是2。若是再發送下線消息,則版本號標記爲3。下游對於每次消息的處理,同時維護一個版本號。
每次只接受比當前版本號大的消息。初始版本爲0,當消息1到達時,將版本號更新爲1。消息2到來時,由於版本號>1.能夠接收,同時更新版本號爲2.當另外一條下線消息到來時,若是版本號是3.則是真實的下線消息。若是是1,則是重複投遞的消息。
若是業務方只關心消息重複不重複,那麼問題就已經解決了。但不少時候另外一個頭疼的問題來了,就是消息順序若是和想象的順序不一致。好比應該的順序是12,到來的順序是21。則最後會發生狀態錯誤。
參考TCP/IP協議,若是想讓亂序的消息最後可以正確的被組織,那麼就應該只接收比當前版本號大一的消息。而且在一個session週期內要一直保存各個消息的版本號。
若是到來的順序是21,則先把2存起來,待2到來後,再處理1,這樣重複性和順序性要求就都達到了。
基於版本號來處理重複和順序消息聽起來是個不錯的主意,但凡事總有瑕疵。使用版本號的最大問題是:
還不能只存儲最新的版本號的消息,要把亂序到來的消息都存儲起來。並且必需要對此作出處理。試想一個永不過時的"session",好比一個物品的狀態,會不停流轉於上下線。那麼中間環節的全部存儲
就必須保留,直到在某個版本號以前的版本一個不丟的到來,成本過高。
就剛纔的場景看,若是消息沒有版本號,該怎麼解決呢?業務方只須要本身維護一個狀態機,定義各類狀態的流轉關係。例如,"下線"狀態只容許接收"上線"消息,「上線」狀態只能接收「下線消息」,若是上線收到上線消息,或者下線收到下線消息,在消息不丟失和上游業務正確的前提下。要麼是消息發重了,要麼是順序到達反了。這時消費者只須要把「我不能處理這個消息」告訴投遞者,要求投遞者過一段時間重發便可。並且重發必定要有次數限制,好比5次,避免死循環,就解決了。
舉例子說明,假設產品自己狀態是下線,1是上線消息,2是下線消息,3是上線消息,正常狀況下,消息應該的到來順序是123,但實際狀況下收到的消息狀態變成了3123。
那麼下游收到3消息的時候,判斷狀態機流轉是下線->上線,能夠接收消息。而後收到消息1,發現是上線->上線,拒絕接收,要求重發。而後收到消息2,狀態是上線->下線,因而接收這個消息。
此時不管重發的消息1或者3到來,仍是能夠接收。另外的重發,在必定次數拒絕後中止重發,業務正確。
迴歸到消息隊列的話題來說。上述通用的版本號/狀態機/ID判重解決方案裏,哪些是消息隊列該作的、哪些是消息隊列不應作業務方處理的呢?其實這裏沒有一個徹底嚴格的定義,但回到咱們的出發點,咱們保證不丟失消息的狀況下儘可能少重複消息,消費順序不保證。那麼重複消息下和亂序消息下業務的正確,應該是由消費方保證的,咱們要作的是減小消息發送的重複。
咱們沒法定義業務方的業務版本號/狀態機,若是API裏強制須要指定版本號,則顯得過於綁架客戶了。何況,在消費方維護這麼多狀態,就涉及到一個消費方的消息落地/多機間的同步消費狀態問題,複雜度指數級上升,並且只能解決部分問題。
減小重複消息的關鍵步驟:
持久性是事務的一個特性,然而只知足持久性卻不必定能知足事務的特性。仍是拿扣錢/加錢的例子講。知足事務的一致性特徵,則必需要麼都不進行,要麼都能成功。
解決方案從大方向上有兩種:
分佈式事務存在的最大問題是成本過高,兩階段提交協議,對於仲裁down機或者單點故障,幾乎是一個無解的黑洞。對於交易密集型或者I/O密集型的應用,沒有辦法承受這麼高的網絡延遲,系統複雜性。
而且成熟的分佈式事務必定構建與比較靠譜的商用DB和商用中間件上,成本也過高。
那如何使用本地事務解決分佈式事務的問題呢?以本地和業務在一個數據庫實例中建表爲例子,與扣錢的業務操做同一個事務裏,將消息插入本地數據庫。若是消息入庫失敗,則業務回滾;若是消息入庫成功,事務提交。
而後發送消息(注意這裏能夠實時發送,不須要等定時任務檢出,以提升消息實時性)。之後的問題就是前文的最終一致性問題所提到的了,只要消息沒有發送成功,就一直靠定時任務重試。
這裏有一個關鍵的點,本地事務作的,是業務落地和消息落地的事務,而不是業務落地和RPC成功的事務。這裏不少人容易混淆,若是是後者,無疑是事務嵌套RPC,是大忌,會有長事務死鎖等各類風險。
而消息只要成功落地,很大程度上就沒有丟失的風險(磁盤物理損壞除外)。而消息只要投遞到服務端確認後本地才作刪除,就完成了producer->broker的可靠投遞,而且當消息存儲異常時,業務也是能夠回滾的。
本地事務存在兩個最大的使用障礙:
話說回來,不是每一個業務都須要強事務的。扣錢和加錢須要事務保證,但下單和生成短信卻不須要事務,不能由於要求發短信的消息存儲投遞失敗而要求下單業務回滾。因此,一個完整的消息隊列應該定義清楚本身能夠投遞的消息類型,如事務型消息,本地非持久型消息,以及服務端不落地的非可靠消息等。對不一樣的業務場景作不一樣的選擇。另外事務的使用應該儘可能低成本、透明化,能夠依託於現有的成熟框架,如Spring的聲明式事務作擴展。業務方只須要使用@Transactional標籤便可。
首先澄清一個概念,異步,同步和oneway是三件事。異步,歸根結底你仍是須要關心結果的,但可能不是當時的時間點關心,能夠用輪詢或者回調等方式處理結果;同步是須要當時關心
的結果的;而oneway是發出去就無論死活的方式,這種對於某些徹底對可靠性沒有要求的場景仍是適用的,但不是咱們重點討論的範疇。
迴歸來看,任何的RPC都是存在客戶端異步與服務端異步的,並且是能夠任意組合的:客戶端同步對服務端異步,客戶端異步對服務端異步,客戶端同步對服務端同步,客戶端異步對服務端同步。
對於客戶端來講,同步與異步主要是拿到一個Result,仍是Future(Listenable)的區別。實現方式能夠是線程池,NIO或者其餘事件機制,這裏先不展開講。
服務端異步可能稍微難理解一點,這個是須要RPC協議支持的。參考servlet 3.0規範,服務端能夠吐一個future給客戶端,而且在future done的時候通知客戶端。
整個過程能夠參考下面的代碼:
客戶端同步服務端異步。
Future<Result> future = request(server);//server馬上返回future synchronized(future){ while(!future.isDone()){ future.wait();//server處理結束後會notify這個future,並修改isdone標誌 } } return future.get();
客戶端同步服務端同步。
Result result = request(server);
客戶端異步服務端同步(這裏用線程池的方式)。
Future<Result> future = executor.submit(new Callable(){public void call<Result>(){ result = request(server); }}) return future;
客戶端異步服務端異步。
Future<Result> future = request(server);//server馬上返回future return future
上面說了這麼多,實際上是想讓你們脫離兩個誤區:
那麼,服務端使用異步最大的好處是什麼呢?說到底,是解放了線程和I/O。試想服務端有一堆I/O等待處理,若是每一個請求都須要同步響應,每條消息都須要結果馬上返回,那麼就幾乎無法作I/O合併
(固然接口能夠設計成batch的,但可能batch發過來的仍然數量較少)。而若是用異步的方式返回給客戶端future,就能夠有機會進行I/O的合併,把幾個批次發過來的消息一塊兒落地(這種合併對於MySQL等容許batch insert的數據庫效果尤爲明顯),而且完全釋放了線程。不至於說來多少請求開多少線程,可以支持的併發量直線提升。
來看第二個誤區,返回future的方式不必定只有線程池。換句話說,能夠在線程池裏面進行同步操做,也能夠進行異步操做,也能夠不使用線程池使用異步操做(NIO、事件)。
回到消息隊列的議題上,咱們固然不但願消息的發送阻塞主流程(前面提到了,server端若是使用異步模型,則可能因消息合併帶來必定程度上的消息延遲),因此能夠先使用線程池提交一個發送請求,主流程繼續往下走。
可是線程池中的請求關心結果嗎?Of course,必須等待服務端消息成功落地,纔算是消息發送成功。因此這裏的模型,準確地說事客戶端半同步半異步(使用線程池不阻塞主流程,但線程池中的任務須要等待server端的返回),server端是純異步。客戶端的線程池wait在server端吐回的future上,直到server端處理完畢,才解除阻塞繼續進行。
總結一句,同步可以保證結果,異步可以保證效率,要合理的結合才能作到最好的效率。
談到批量就不得不提生產者消費者模型。但生產者消費者模型中最大的痛點是:消費者到底應該什麼時候進行消費。大處着眼來看,消費動做都是事件驅動的。主要事件包括:
對於及時性要求高的數據,可用採用方式3來完成,好比客戶端向服務端投遞數據。只要隊列有數據,就把隊列中的全部數據刷出,不然將本身掛起,等待新數據的到來。
在第一次把隊列數據往外刷的過程當中,又積攢了一部分數據,第二次又能夠造成一個批量。僞代碼以下:
Executor executor = Executors.newFixedThreadPool(4); final BlockingQueue<Message> queue = new ArrayBlockingQueue<>(); private Runnable task = new Runnable({//這裏因爲共享隊列,Runnable能夠複用,故作成全局的 public void run(){ List<Message> messages = new ArrayList<>(20); queue.drainTo(messages,20); doSend(messages);//阻塞,在這個過程當中會有新的消息到來,若是4個線程都佔滿,隊列就有機會囤新的消息 } }); public void send(Message message){ queue.offer(message); executor.submit(task) }
這種方式是消息延遲和批量的一個比較好的平衡,但優先響應低延遲。延遲的最高程度由上一次發送的等待時間決定。但可能形成的問題是發送過快的話批量的大小不夠知足性能的極致。
Executor executor = Executors.newFixedThreadPool(4); final BlockingQueue<Message> queue = new ArrayBlockingQueue<>(); volatile long last = System.currentMills(); Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){ flush(); },500,500,TimeUnits.MILLS); private Runnable task = new Runnable({//這裏因爲共享隊列,Runnable能夠複用,顧作成全局的。 public void run(){ List<Message> messages = new ArrayList<>(20); queue.drainTo(messages,20); doSend(messages);//阻塞,在這個過程當中會有新的消息到來,若是4個線程都佔滿,隊列就有機會屯新的消息。 } }); public void send(Message message){ last = System.currentMills(); queue.offer(message); flush(); } private void flush(){ if(queue.size>200||System.currentMills()-last>200){ executor.submit(task) } }
相反對於能夠用適量的延遲來換取高性能的場景來講,用定時/定量二選一的方式可能會更爲理想,既到達必定數量才發送,但若是數量一直達不到,也不能幹等,有一個時間上限。
具體說來,在上文的submit以前,多判斷一個時間和數量,而且Runnable內部維護一個定時器,避免沒有新任務到來時舊的任務永遠沒有機會觸發發送條件。對於server端的數據落地,使用這種方式就很是方便。
最後囉嗦幾句,曾經有人問我,爲何網絡請求小包合併成大包會提升性能?主要緣由有兩個:
上文提到的消息隊列,大可能是針對push模型的設計。如今市面上有不少經典的也比較成熟的pull模型的消息隊列,如Kafka、MetaQ等。這跟JMS中傳統的push方式有很大的區別,可謂另闢蹊徑。
咱們簡要分析下push和pull模型各自存在的利弊。
慢消費無疑是push模型最大的致命傷,穿成流水線來看,若是消費者的速度比發送者的速度慢不少,勢必形成消息在broker的堆積。假設這些消息都是有用的沒法丟棄的,消息就要一直在broker端保存。固然這還不是最致命的,最致命的是broker給consumer推送一堆consumer沒法處理的消息,consumer不是reject就是error,而後來回踢皮球。
反觀pull模式,consumer能夠按需消費,不用擔憂本身處理不了的消息來騷擾本身,而broker堆積消息也會相對簡單,無需記錄每個要發送消息的狀態,只須要維護全部消息的隊列和偏移量就能夠了。因此對於創建索引等慢消費,消息量有限且到來的速度不均勻的狀況,pull模式比較合適。
這是pull模式最大的短板。因爲主動權在消費方,消費方沒法準確地決定什麼時候去拉取最新的消息。若是一次pull取到消息了還能夠繼續去pull,若是沒有pull取到則須要等待一段時間從新pull。
但等待多久就很難斷定了。你可能會說,我能夠有xx動態pull取時間調整算法,但問題的本質在於,有沒有消息到來這件事情決定權不在消費方。也許1分鐘內連續來了1000條消息,而後半個小時沒有新消息產生,
可能你的算法算出下次最有可能到來的時間點是31分鐘以後,或者60分鐘以後,結果下條消息10分鐘後到了,是否是很讓人沮喪?
固然也不是說延遲就沒有解決方案了,業界較成熟的作法是從短期開始(不會對broker有太大負擔),而後指數級增加等待。好比開始等5ms,而後10ms,而後20ms,而後40ms……直到有消息到來,而後再回到5ms。
即便這樣,依然存在延遲問題:假設40ms到80ms之間的50ms消息到來,消息就延遲了30ms,並且對於半個小時來一次的消息,這些開銷就是白白浪費的。
在阿里的RocketMq裏,有一種優化的作法-長輪詢,來平衡推拉模型各自的缺點。基本思路是:消費者若是嘗試拉取失敗,不是直接return,而是把鏈接掛在那裏wait,服務端若是有新的消息到來,把鏈接notify起來,這也是不錯的思路。但海量的長鏈接block對系統的開銷仍是不容小覷的,仍是要合理的評估時間間隔,給wait加一個時間上限比較好~
若是push模式的消息隊列,支持分區,單分區只支持一個消費者消費,而且消費者只有確認一個消息消費後才能push送另一個消息,還要發送者保證全局順序惟一,聽起來也能作順序消息,但成本過高了,尤爲是必須每一個消息消費確認後才能發下一條消息,這對於自己堆積能力和慢消費就是瓶頸的push模式的消息隊列,簡直是一場災難。
反觀pull模式,若是想作到全局順序消息,就相對容易不少:
因此對於日誌push送這種最好全局有序,但容許出現小偏差的場景,pull模式很是合適。若是你不想看到通篇亂套的日誌~~
Anyway,須要順序消息的場景仍是比較有限的並且成本過高,請慎重考慮。
本文從爲什麼使用消息隊列開始講起,而後主要介紹瞭如何從零開始設計一個消息隊列,包括RPC、事務、最終一致性、廣播、消息確認等關鍵問題。並對消息隊列的push、pull模型作了簡要分析,最後從批量和異步角度,分析了消息隊列性能優化的思路。下篇會着重介紹一些高級話題,如存儲系統的設計、流控和錯峯的設計、公平調度等。但願經過這些,讓你們對消息隊列有個提綱挈領的總體認識,並給自主開發消息隊列提供思路。另外,本文主要是源自本身在開發消息隊列中的思考和讀源碼時的體會,比較不"官方",也不免會存在一些漏洞,歡迎你們多多交流。
後續咱們還會推出消息隊列設計高級篇,內容會涵蓋如下方面:
- pull模型消息系統設計理念
- 存儲子系統設計
- 流量控制
- 公平調度
敬請期待哦~
王燁,如今是美團旅遊後臺研發組的程序猿,以前曾經在百度、去哪和優酷工做過,專一Java後臺開發。對於網絡編程和併發編程具備濃厚的興趣,曾經作過一些基礎組件,也翻過一些源碼,屬於比較典型的宅男技術控。期待可以與更多知己,在coding的路上並肩前行~