摘要: 本文整理自滴滴出行消息隊列負責人 江海挺 在Apache RocketMQ開發者沙龍北京站的分享。經過本文,您將瞭解到滴滴出行: 在消息隊列技術選型方面的思考; 爲何選擇 RocketMQ 做爲出行業務的消息隊列解決方案; 如何構建本身的消息隊列服務; 在 RocketMQ 上的擴展改造實踐; 在 RocketMQ 上的實踐經驗。緩存
本文整理自滴滴出行消息隊列負責人 江海挺 在Apache RocketMQ開發者沙龍北京站的分享。經過本文,您將瞭解到滴滴出行:架構
在消息隊列技術選型方面的思考;
爲何選擇 RocketMQ 做爲出行業務的消息隊列解決方案;
如何構建本身的消息隊列服務;
在 RocketMQ 上的擴展改造實踐;
在 RocketMQ 上的實踐經驗。app
江海挺:
滴滴出行消息隊列負責人,Apache RocketMQ Contributor,大學畢業後一直在作消息隊列領域相關的技術、產品和服務,積累了豐富的實踐經驗,沉澱了很多關於消息隊列的思考。框架
滴滴出行的消息技術選型
1.1 歷史運維
初期,公司內部沒有專門的團隊維護消息隊列服務,因此消息隊列使用方式較多,主要以Kafka爲主,有業務直連的,也有經過獨立的服務轉發消息的。另外有一些團隊也會用RocketMQ、Redis的list,甚至會用比較非主流的beanstalkkd。致使的結果就是,比較混亂,沒法維護,資源使用也很浪費。異步
1.2 爲何棄用 Kafka工具
一個核心業務在使用Kafka的時候,出現了集羣數據寫入抖動很是嚴重的狀況,常常會有數據寫失敗。性能
主要有兩點緣由:測試
隨着業務增加,Topic的數據增多,集羣負載增大,性能降低;
咱們用的是Kafka0.8.2那個版本,有個bug,會致使副本從新複製,複製的時候有大量的讀,咱們存儲盤用的是機械盤,致使磁盤IO過大,影響寫入。
因此咱們決定作本身的消息隊列服務。優化
首先須要解決業務方消息生產失敗的問題。由於這個Kafka用的是發佈/訂閱模式,一個topic的訂閱方會有不少,涉及到的下游業務也就很是多,沒辦法一口氣直接替換Kafka,遷移到新的一個消息隊列服務上。因此咱們當時的方案是加了一層代理,而後利用codis做爲緩存,解決了Kafka不按期寫入失敗的問題,如上圖。當後面的Kafka出現不可寫入的時候,咱們就會先把數據寫入到codis中,而後延時進行重試,直到寫成功爲止。
1.3 爲何選擇 RocketMQ
通過一系列的調研和測試以後,咱們決定採用RocketMQ,具體緣由在後面會介紹。
爲了支持多語言環境、解決一些遷移和某些業務的特殊需求,咱們又在消費側加上了一個代理服務。而後造成了這麼一個核心框架。業務端只跟代理層交互。中間的消息引擎,負責消息的核心存儲。在以前的基本框架以後,咱們後面就主要圍繞三個方向作。
遷移,把以前提到的全部五花八門的隊列環境,所有遷移到咱們上面。這裏面的遷移方案後面會跟你們介紹一下。
功能迭代和成本性能上的優化。
服務化,業務直接經過平臺界面來申請資源,申請到以後直接使用。
1.4 演進中的架構
這張圖是咱們消息隊列服務的一個比較新的現狀。先縱向看,上面是生產的客戶端,包括了7種語言。而後是咱們的生產代理服務。在中間的是咱們的消息存儲層。目前主要的消息存儲引擎是RocketMQ。而後還有一些在遷移過程當中的Kafka。另外一個是Chronos,它是咱們延遲消息的一個存儲引擎。
再下面就是消費代理。消費代理一樣提供了多種語言的客戶端,還支持多種協議的消息主動推送功能,包括HTTP 協議 RESTful方式。結合咱們的groovy腳本功能,還能實現將消息直接轉存到Redis、Hbase和HDFS上。此外,咱們還在陸續接入更多的下游存儲。
除了存儲系統以外,咱們也對接了實時計算平臺,例如Flink,Spark,Storm,左邊是咱們的用戶控制檯和運維控制檯。這個是咱們服務化的重點。用戶在須要使用隊列的時候,就經過界面申請Topic,填寫各類信息,包括身份信息,消息的峯值流量,消息大小,消息格式等等。而後消費方經過咱們的界面,就能夠申請消費。
運維控制檯,主要負責咱們集羣的管理,自動化部署,流量調度,狀態顯示之類的功能。最後全部運維和用戶操做會影響線上的配置,都會經過ZooKeeper進行同步。
爲何選擇RocketMQ
咱們圍繞如下兩個緯度進行了對比測試,結果顯示RocketMQ的效果更好。
2.1 測試-topic數量的支持
測試環境:Kafka 0.8.2,RocketMQ 3.4.6,1.0 Gbps Network,16 threads
測試結果以下:
這張圖是Kafka和RocketMQ在不一樣topic數量下的吞吐測試。橫座標是每秒消息數,縱座標是測試case。同時覆蓋了有無消費,和不一樣消息體的場景。一共8組測試數據,每組數據分別在Topic個數爲1六、3二、6四、12八、256時得到的,每一個topic包括8個Partition。下面四組數據是發送消息大小爲128字節的狀況,上面四種是發送2k消息大小的狀況。on 表示消息發送的時候,同時進行消息消費,off表示僅進行消息發送。
先看最上面一組數據,用的是Kafka,開啓消費,每條消息大小爲2048字節能夠看到,隨着Topic數量增長,到256 Topic以後,吞吐極具降低。第二組是是RocketMQ。能夠看到,Topic增大以後,影響很是小。第三組和第四組,是上面兩組關閉了消費的狀況。結論基本相似,總體吞吐量會高那麼一點點。
下面的四組跟上面的區別是使用了128字節的小消息體。能夠看到,Kafka吞吐受Topic數量的影響特別明顯。對比來看,雖然topic比較小的時候,RocketMQ吞吐較小,可是基本很是穩定,對於咱們這種共享集羣來講比較友好。
2.2 測試-延遲
Kafka
測試環境:Kafka 0.8.2.2,topic=1/8/32,Ack=1/all,replica=3
測試結果:
上面的一組的3條線對應Ack=3,須要3個備份都確認後才完成數據的寫入。下面的一組的3條線對應Ack=1,有1個備份收到數據後就能夠完成寫入。能夠看到下面一組只須要主備份確認的寫入,延遲明顯較低。每組的三條線之間主要是Topic數量的區別,Topic數量增長,延遲也增大了。
RocketMQ
測試環境:
RocketMQ 3.4.6,brokerRole=ASYNC/SYNC_MASTER, 2 Slave,
flushDiskType=SYNC_FLUSH/ASYNC_FLUSH
測試結果:
上面兩條是同步刷盤的狀況,延遲相對比較高。下面的是異步刷盤。橙色的線是同步主從,藍色的線是異步主從。而後能夠看到在副本同步複製的狀況下,即橙色的線,4w的TPS以內都不超過1ms。用這條橙色的線和上面Kafka的圖中的上面三條線橫向比較來看,Kafka超過1w TPS 就超過1ms了。Kafka的延遲明顯更高。
如何構建本身的消息隊列
3.1 問題與挑戰
面臨的挑戰(順時針看)
客戶端語言,須要支持PHP、Go、Java、C++;
只有3個開發人員;
決定用RocketMQ,可是沒看過源碼;
上線時間緊,線上的Kafka還有問題;
可用性要求高。
使用RocketMQ時的兩個問題:
客戶端語言支持不全,以Java爲主,而咱們還須要支持PHP、Go、C++;
功能特別多,如tag、property、消費過濾、RETRYtopic、死信隊列、延遲消費之類的功能,但這對咱們穩定性維護來講,挑戰很是大。
針對以上兩個問題的解決辦法,以下圖所示:
使用ThriftRPC框架來解決跨語言的問題;
簡化調用接口。能夠認爲只有兩個接口,send用來生產,pull用來消費。
主要策略就是堅持KISS原則(Keep it simple, stupid),保持簡單,先解決最主要的問題,讓消息可以流轉起來。而後咱們把其餘主要邏輯都放在了proxy這一層來作,好比限流、權限認證、消息過濾、格式轉化之類的。這樣,咱們就能儘量地簡化客戶端的實現邏輯,不須要把不少功能用各類語言都寫一遍。
3.2 遷移方案
架構肯定後,接下來是咱們的一個遷移過程。
遷移這個事情,在pub-sub的消息模型下,會比較複雜。由於下游的數據消費方可能不少,上游的數據無法作到一刀切流量,這就會致使整個遷移的週期特別長。而後咱們爲了儘量地減小業務遷移的負擔,加快遷移的效率,咱們在Proxy層提供了雙寫和雙讀的功能。
雙寫:ProcucerProxy同時寫RocketMQ和Kafka;
雙讀:ConsumerProxy同時從RocketMQ和Kafka消費數據。
有了這兩個功能以後,咱們就能提供如下兩種遷移方案了。
3.2.1 雙寫
生產端雙寫,同時往Kafka和RocketMQ寫一樣的數據,保證兩邊在整個遷移過程當中都有一樣的全量數據。Kafka和RocketMQ有相同的數據,這樣下游的業務也就能夠開始遷移。若是消費端不關心丟數據,那麼能夠直接切換,切完直接更新消費進度。若是須要保證消費必達,能夠先在ConsumerProxy設置消費進度,消費客戶端保證沒有數據堆積後再去遷移,這樣會有一些重複消息,通常客戶端會保證消費處理的冪等。
生產端的雙寫其實也有兩種方案:
客戶端雙寫,以下圖:
業務那邊不停原來的kafka 客戶端。只是加上咱們的客戶端,往RocketMQ裏追加寫。這種方案在整個遷移完成以後,業務還須要把老的寫入停掉。至關於兩次上線。
Producer Proxy雙寫,以下圖:
業務方直接切換生產的客戶端,只往咱們的proxy上寫數據。而後咱們的proxy負責把數據複製,同時寫到兩個存儲引擎中。這樣在遷移完成以後,咱們只須要在Proxy上關掉雙寫功能就能夠了。對生產的業務方來講是無感知的,生產方全程只須要改造一次,上一下線就能夠了。
因此表面看起來,應該仍是第二種方案更加簡單。可是,從總體可靠性的角度來看,通常仍是認爲第一種相對高一點。由於客戶端到Kafka這一條鏈路,業務以前都已經跑穩定了。通常不會出問題。可是寫咱們Proxy就不必定了,在接入過程當中,是有可能出現一些使用上的問題,致使數據寫入失敗,這就對業務方測試質量的要求會高一點。而後消費的遷移過程,其實風險是相對比較低的。出問題的時候,能夠當即回滾。由於它在老的Kafka上消費進度,是一直保留的,並且在遷移過程當中,能夠認爲是全量雙消費。
以上就是數據雙寫的遷移方案,這種方案的特色就是兩個存儲引擎都有相同的全量數據。
3.2.2 雙讀
特色:保證不會重複消費。對於P2P 或者消費下游不太多,或者對重複消費數據比較敏感的場景比較適用。
這個方案的過程是這樣的,消費先切換。所有遷移到到咱們的Proxy上消費,Proxy從Kafka上獲取。這個時候RocketMQ上沒有流量。可是咱們的消費Proxy保證了雙消費,一旦RocketMQ有流量了,客戶端一樣也能收到。而後生產方改造客戶端,直接切流到RocketMQ中,這樣就完成了整個流量遷移過程。運行一段時間,好比Kafka裏的數據都過時以後,就能夠把消費Proxy上的雙消費關了,下掉Kafka集羣。
整個過程當中,生產直接切流,因此數據不會重複存儲。而後在消費遷移的過程當中,咱們消費Proxy上的group和業務原有的group能夠用一個名字,這樣就能實現遷移過程當中自動rebalance,這樣就能實現沒有大量重複數據的效果。因此這個方案對重複消費比較敏感的業務會比較適合的。這個方案的整個過程當中,消費方和生產方都只須要改造一遍客戶端,上一次線就能夠完成。
RocketMQ擴展改造
說完遷移方案,這裏再簡單介紹一下,咱們在本身的RocketMQ分支上作的一些比較重要的事情。
首先一個很是重要的一點是主從的自動切換。
熟悉RocketMQ的同窗應該知道,目前開源版本的RocketMQ broker 是沒有主從自動切換的。若是你的Master掛了,那你就寫不進去了。而後slave只能提供只讀的功能。固然若是你的topic在多個主節點上都建立了,雖然不會徹底寫不進去,可是對單分片順序消費的場景,仍是會產生影響。因此呢,咱們就本身加了一套主從自動切換的功能。
第二個是批量生產的功能。
RocketMQ4.0以後的版本是支持批量生產功能的。可是限制了,只能是同一個ConsumerQueue的。這個對於咱們的Proxy服務來講,不太友好,由於咱們的proxy是有多個不一樣的topic的,因此咱們就擴展了一下,讓它可以支持不一樣Topic、不一樣Consume Queue。原理上其實差很少,只是在傳輸的時候,把Topic和Consumer Queue的信息都編碼進去。
第三個,元信息管理的改造。
目前RocketMQ單機可以支持的Topic數量,基本在幾萬這麼一個量級,在增長上去以後,元信息的管理就會很是耗時,對整個吞吐的性能影響相對來講就會很是大。而後咱們有個場景又須要支持單機百萬左右的Topic數量,因此咱們就改造了一下元信息管理部分,讓RocketMQ單機可以支撐的Topic數量達到了百萬。
後面一些就不過重要了,好比集成了咱們公司內部的一些監控和部署工具,修了幾個bug,也給提了PR。最新版都已經fix掉了。
RocketMQ使用經驗
接下來,再簡單介紹一下,咱們在RocketMQ在使用和運維上的一些經驗。主要是涉及在磁盤IO性能不夠的時候,一些參數的調整。
5.1 讀老數據的問題
咱們都知道,RocketMQ的數據是要落盤的,通常只有最新寫入的數據纔會在PageCache中。好比下游消費數據,由於一些緣由停了一天以後,又忽然起來消費數據。這個時候就須要讀磁盤上的數據。而後RocketMQ的消息體是所有存儲在一個append only的 commitlog 中的。若是這個集羣中混雜了不少不一樣topic的數據的話,要讀的兩條消息就頗有可能間隔很遠。最壞狀況就是一次磁盤IO讀一條消息。這就基本等價於隨機讀取了。若是磁盤的IOPS(Input/Output Operations Per Second)扛不住,還會影響數據的寫入,這個問題就嚴重了。
值得慶幸的是,RocketMQ提供了自動從Slave讀取老數據的功能。這個功能主要由slaveReadEnable這個參數控制。默認是關的(slaveReadEnable = false bydefault)。推薦把它打開,主從都要開。這個參數打開以後,在客戶端消費數據時,會判斷,當前讀取消息的物理偏移量跟最新的位置的差值,是否是超過了內存容量的一個百分比(accessMessageInMemoryMaxRatio= 40 by default)。若是超過了,就會告訴客戶端去備機上消費數據。若是採用異步主從,也就是brokerRole等於ASYNC_AMSTER的時候,你的備機IO打爆,其實影響不太大。可是若是你採用同步主從,那仍是有影響。因此這個時候,最好掛兩個備機。由於RocketMQ的主從同步複製,只要一個備機響應了確認寫入就能夠了,一臺IO打爆,問題不大。
5.2 過時數據刪除
RocketMQ默認數據保留72個小時(fileReservedTime=72)。而後它默認在凌晨4點開始刪過時數據(deleteWhen="04")。你能夠設置多個值用分號隔開。由於數據都是定時刪除的,因此在磁盤充足的狀況,數據的最長保留會比你設置的還多一天。又因爲默認都是同一時間,刪除一成天的數據,若是用了機械硬盤,通常磁盤容量會比較大,須要刪除的數據會特別多,這個就會致使在刪除數據的時候,磁盤IO被打滿。這個時候又要影響寫入了。
爲了解決這個問題,能夠嘗試多個方法,一個是設置文件刪除的間隔,有兩個參數能夠設置,
deleteCommitLogFilesInterval = 100(毫秒)。每刪除10個commitLog文件的時間間隔;
deleteConsumeQueueFilesInterval=100(毫秒)。每刪除一個ConsumeQueue文件的時間間隔。
另一個就是增長刪除頻率,把00-23都寫到deleteWhen,就能夠實現每一個小時都刪數據。
5.3 索引
默認狀況下,全部的broker都會創建索引(messageIndexEnable=true)。這個索引功能能夠支持按照消息的uniqId,消息的key來查詢消息體。索引文件實現的時候,本質上也就是基於磁盤的個一個hashmap。若是broker上消息數量比較多,查詢的頻率比較高,這也會形成必定的IO負載。因此咱們的推薦方案是在Master上關掉了index功能,只在slave上打開。而後全部的index查詢所有在slave上進行。固然這個須要簡單修改一下MQAdminImpl裏的實現。由於默認狀況下,它會向Master發出請求。