初期,公司內部沒有專門的團隊維護消息隊列服務,因此消息隊列使用方式較多,主要以Kafka爲主,有業務直連的,也有經過獨立的服務轉發消息的。另外有一些團隊也會用RocketMQ、Redis的list,甚至會用比較非主流的beanstalkkd。致使的結果就是,比較混亂,沒法維護,資源使用也很浪費。緩存
一個核心業務在使用Kafka的時候,出現了集羣數據寫入抖動很是嚴重的狀況,常常會有數據寫失敗。架構
主要有兩點緣由:app
隨着業務增加,Topic的數據增多,集羣負載增大,性能降低;框架
咱們用的是Kafka0.8.2那個版本,有個bug,會致使副本從新複製,複製的時候有大量的讀,咱們存儲盤用的是機械盤,致使磁盤IO過大,影響寫入。運維
因此咱們決定作本身的消息隊列服務。異步
首先須要解決業務方消息生產失敗的問題。由於這個Kafka用的是發佈/訂閱模式,一個topic的訂閱方會有不少,涉及到的下游業務也就很是多,沒辦法一口氣直接替換Kafka,遷移到新的一個消息隊列服務上。因此咱們當時的方案是加了一層代理,而後利用codis做爲緩存,解決了Kafka不按期寫入失敗的問題。當後面的Kafka出現不可寫入的時候,咱們就會先把數據寫入到codis中,而後延時進行重試,直到寫成功爲止。工具
通過一系列的調研和測試以後,咱們決定採用RocketMQ。性能
爲了支持多語言環境、解決一些遷移和某些業務的特殊需求,咱們又在消費側加上了一個代理服務。而後造成了這麼一個核心框架。 業務端只跟代理層交互。中間的消息引擎,負責消息的核心存儲。在以前的基本框架以後,咱們後面就主要圍繞三個方向作。測試
咱們消息隊列服務的一個比較新的現狀,先縱向看,上面是生產的客戶端,包括了7種語言。而後是咱們的生產代理服務。在中間的是咱們的消息存儲層。目前主要的消息存儲引擎是RocketMQ。而後還有一些在遷移過程當中的Kafka。另外一個是Chronos,它是咱們延遲消息的一個存儲引擎。優化
再下面就是消費代理。消費代理一樣提供了多種語言的客戶端,還支持多種協議的消息主動推送功能,包括HTTP 協議 RESTful方式。結合咱們的groovy腳本功能,還能實現將消息直接轉存到Redis、Hbase和HDFS上。此外,咱們還在陸續接入更多的下游存儲。
除了存儲系統以外,咱們也對接了實時計算平臺,例如Flink,Spark,Storm,旁邊是咱們的用戶控制檯和運維控制檯。這個是咱們服務化的重點。用戶在須要使用隊列的時候,就經過界面申請Topic,填寫各類信息,包括身份信息,消息的峯值流量,消息大小,消息格式等等。而後消費方經過咱們的界面,就能夠申請消費。
運維控制檯,主要負責咱們集羣的管理,自動化部署,流量調度,狀態顯示之類的功能。最後全部運維和用戶操做會影響線上的配置,都會經過ZooKeeper進行同步。
先看一組數據,用的是Kafka,開啓消費,每條消息大小爲2048字節能夠看到,隨着Topic數量增長,到256 Topic以後,吞吐極具降低。而後是RocketMQ。能夠看到,Topic增大以後,影響很是小。第三組和第四組,是上面兩組關閉了消費的狀況。結論基本相似,總體吞吐量會高那麼一點點。
下面的四組跟上面的區別是使用了128字節的小消息體。能夠看到,Kafka吞吐受Topic數量的影響特別明顯。對比來看,雖然topic比較小的時候,RocketMQ吞吐較小,可是基本很是穩定,對於咱們這種共享集羣來講比較友好。
面臨的挑戰(順時針看)
客戶端語言,須要支持PHP、Go、Java、C++;
只有3個開發人員;
決定用RocketMQ,可是沒看過源碼;
上線時間緊,線上的Kafka還有問題;
可用性要求高。
使用RocketMQ時的兩個問題:
客戶端語言支持不全,以Java爲主,而咱們還須要支持PHP、Go、C++;
功能特別多,如tag、property、消費過濾、RETRYtopic、死信隊列、延遲消費之類的功能,但這對咱們穩定性維護來講,挑戰很是大。
使用ThriftRPC框架來解決跨語言的問題;
簡化調用接口。能夠認爲只有兩個接口,send用來生產,pull用來消費。
主要策略就是堅持KISS原則(Keep it simple, stupid),保持簡單,先解決最主要的問題,讓消息可以流轉起來。而後咱們把其餘主要邏輯都放在了proxy這一層來作,好比限流、權限認證、消息過濾、格式轉化之類的。這樣,咱們就能儘量地簡化客戶端的實現邏輯,不須要把不少功能用各類語言都寫一遍。
遷移這個事情,在pub-sub的消息模型下,會比較複雜。由於下游的數據消費方可能不少,上游的數據無法作到一刀切流量,這就會致使整個遷移的週期特別長。而後咱們爲了儘量地減小業務遷移的負擔,加快遷移的效率,咱們在Proxy層提供了雙寫和雙讀的功能。
雙寫:ProcucerProxy同時寫RocketMQ和Kafka;
雙讀:ConsumerProxy同時從RocketMQ和Kafka消費數據。
有了這兩個功能以後,咱們就能提供如下兩種遷移方案了。
生產端雙寫,同時往Kafka和RocketMQ寫一樣的數據,保證兩邊在整個遷移過程當中都有一樣的全量數據。 Kafka和RocketMQ有相同的數據,這樣下游的業務也就能夠開始遷移。若是消費端不關心丟數據,那麼能夠直接切換,切完直接更新消費進度。 若是須要保證消費必達,能夠先在ConsumerProxy設置消費進度,消費客戶端保證沒有數據堆積後再去遷移,這樣會有一些重複消息,通常客戶端會保證消費處理的冪等。
業務那邊不停原來的kafka 客戶端。只是加上咱們的客戶端,往RocketMQ裏追加寫。這種方案在整個遷移完成以後,業務還須要把老的寫入停掉。至關於兩次上線。
業務方直接切換生產的客戶端,只往咱們的proxy上寫數據。而後咱們的proxy負責把數據複製,同時寫到兩個存儲引擎中。這樣在遷移完成以後,咱們只須要在Proxy上關掉雙寫功能就能夠了。對生產的業務方來講是無感知的,生產方全程只須要改造一次,上一下線就能夠了。
因此表面看起來,應該仍是第二種方案更加簡單。可是,從總體可靠性的角度來看,通常仍是認爲第一種相對高一點。由於客戶端到Kafka這一條鏈路,業務以前都已經跑穩定了。通常不會出問題。可是寫咱們Proxy就不必定了,在接入過程當中,是有可能出現一些使用上的問題,致使數據寫入失敗,這就對業務方測試質量的要求會高一點。而後消費的遷移過程,其實風險是相對比較低的。出問題的時候,能夠當即回滾。由於它在老的Kafka上消費進度,是一直保留的,並且在遷移過程當中,能夠認爲是全量雙消費。
以上就是數據雙寫的遷移方案,這種方案的特色就是兩個存儲引擎都有相同的全量數據。
特色:保證不會重複消費。對於P2P 或者消費下游不太多,或者對重複消費數據比較敏感的場景比較適用。
這個方案的過程是這樣的,消費先切換。所有遷移到到咱們的Proxy上消費,Proxy從Kafka上獲取。這個時候RocketMQ上沒有流量。可是咱們的消費Proxy保證了雙消費,一旦RocketMQ有流量了,客戶端一樣也能收到。而後生產方改造客戶端,直接切流到RocketMQ中,這樣就完成了整個流量遷移過程。運行一段時間,好比Kafka裏的數據都過時以後,就能夠把消費Proxy上的雙消費關了,下掉Kafka集羣。
整個過程當中,生產直接切流,因此數據不會重複存儲。而後在消費遷移的過程當中,咱們消費Proxy上的group和業務原有的group能夠用一個名字,這樣就能實現遷移過程當中自動rebalance,這樣就能實現沒有大量重複數據的效果。因此這個方案對重複消費比較敏感的業務會比較適合的。這個方案的整個過程當中,消費方和生產方都只須要改造一遍客戶端,上一次線就能夠完成。
說完遷移方案,這裏再簡單介紹一下,咱們在本身的RocketMQ分支上作的一些比較重要的事情。
首先一個很是重要的一點是主從的自動切換。
熟悉RocketMQ的同窗應該知道,目前開源版本的RocketMQ broker 是沒有主從自動切換的。若是你的Master掛了,那你就寫不進去了。而後slave只能提供只讀的功能。固然若是你的topic在多個主節點上都建立了,雖然不會徹底寫不進去,可是對單分片順序消費的場景,仍是會產生影響。因此呢,咱們就本身加了一套主從自動切換的功能。
第二個是批量生產的功能。
RocketMQ4.0以後的版本是支持批量生產功能的。可是限制了,只能是同一個ConsumerQueue的。這個對於咱們的Proxy服務來講,不太友好,由於咱們的proxy是有多個不一樣的topic的,因此咱們就擴展了一下,讓它可以支持不一樣Topic、不一樣Consume Queue。原理上其實差很少,只是在傳輸的時候,把Topic和Consumer Queue的信息都編碼進去。
第三個,元信息管理的改造。
目前RocketMQ單機可以支持的Topic數量,基本在幾萬這麼一個量級,在增長上去以後,元信息的管理就會很是耗時,對整個吞吐的性能影響相對來講就會很是大。而後咱們有個場景又須要支持單機百萬左右的Topic數量,因此咱們就改造了一下元信息管理部分,讓RocketMQ單機可以支撐的Topic數量達到了百萬。
後面一些就不過重要了,好比集成了咱們公司內部的一些監控和部署工具,修了幾個bug,也給提了PR。最新版都已經fix掉了。
在RocketMQ在使用和運維上的一些經驗。主要是涉及在磁盤IO性能不夠的時候,一些參數的調整。
1 讀老數據的問題
咱們都知道,RocketMQ的數據是要落盤的,通常只有最新寫入的數據纔會在PageCache中。好比下游消費數據,由於一些緣由停了一天以後,又忽然起來消費數據。這個時候就須要讀磁盤上的數據。而後RocketMQ的消息體是所有存儲在一個append only的 commitlog 中的。若是這個集羣中混雜了不少不一樣topic的數據的話,要讀的兩條消息就頗有可能間隔很遠。最壞狀況就是一次磁盤IO讀一條消息。這就基本等價於隨機讀取了。若是磁盤的IOPS(Input/Output Operations Per Second)扛不住,還會影響數據的寫入,這個問題就嚴重了。
值得慶幸的是,RocketMQ提供了自動從Slave讀取老數據的功能。這個功能主要由slaveReadEnable這個參數控制。默認是關的(slaveReadEnable = false bydefault)。推薦把它打開,主從都要開。這個參數打開以後,在客戶端消費數據時,會判斷,當前讀取消息的物理偏移量跟最新的位置的差值,是否是超過了內存容量的一個百分比(accessMessageInMemoryMaxRatio= 40 by default)。若是超過了,就會告訴客戶端去備機上消費數據。若是採用異步主從,也就是brokerRole等於ASYNC_AMSTER的時候,你的備機IO打爆,其實影響不太大。可是若是你採用同步主從,那仍是有影響。因此這個時候,最好掛兩個備機。由於RocketMQ的主從同步複製,只要一個備機響應了確認寫入就能夠了,一臺IO打爆,問題不大。
2 過時數據刪除
RocketMQ默認數據保留72個小時(fileReservedTime=72)。而後它默認在凌晨4點開始刪過時數據(deleteWhen=」04」)。你能夠設置多個值用分號隔開。由於數據都是定時刪除的,因此在磁盤充足的狀況,數據的最長保留會比你設置的還多一天。又因爲默認都是同一時間,刪除一成天的數據,若是用了機械硬盤,通常磁盤容量會比較大,須要刪除的數據會特別多,這個就會致使在刪除數據的時候,磁盤IO被打滿。這個時候又要影響寫入了。
爲了解決這個問題,能夠嘗試多個方法,一個是設置文件刪除的間隔,有兩個參數能夠設置,
deleteCommitLogFilesInterval = 100(毫秒)。每刪除10個commitLog文件的時間間隔;
deleteConsumeQueueFilesInterval=100(毫秒)。每刪除一個ConsumeQueue文件的時間間隔。
另一個就是增長刪除頻率,把00-23都寫到deleteWhen,就能夠實現每一個小時都刪數據。
3 索引
默認狀況下,全部的broker都會創建索引(messageIndexEnable=true)。這個索引功能能夠支持按照消息的uniqId,消息的key來查詢消息體。索引文件實現的時候,本質上也就是基於磁盤的個一個hashmap。若是broker上消息數量比較多,查詢的頻率比較高,這也會形成必定的IO負載。因此咱們的推薦方案是在Master上關掉了index功能,只在slave上打開。而後全部的index查詢所有在slave上進行。固然這個須要簡單修改一下MQAdminImpl裏的實現。由於默認狀況下,它會向Master發出請求。