在好久以前寫過一篇Kafka相關的文章,你須要知道的Kafka,那個時候在業務上更多的是使用的是Kafka,而如今換了公司以後,更多的使用的是Rocketmq,本篇文章會盡力全面的介紹RocketMQ和Kafka各個關鍵點的比較,但願你們讀完能有所收穫。mysql
RocketMQ前身叫作MetaQ, 在MeataQ發佈3.0版本的時候更名爲RocketMQ,其本質上的設計思路和Kafka相似,可是和Kafka不一樣的是其使用Java進行開發,因爲在國內的Java受衆羣體遠遠多於Scala,因此RocketMQ是不少以Java語言爲主的公司的首選。一樣的RocketMQ和Kafka都是Apache基金會中的頂級項目,他們社區的活躍度都很是高,項目更新迭代也很是快。算法
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
直接定義好一個producer,建立好Message,調用send方法便可。sql
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800 consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
對於RocketMQ先拋出幾個問題:markdown
對於RocketMQ的架構圖,在大致上來看和Kafka並無太多的差異,可是在不少細節上是有不少差異的,接下來會一一進行講述。網絡
在3.1的架構中咱們有多個Producer,多個主Broker,多個從Broker,每一個Producer能夠對應多個Topic,每一個Consumer也能夠消費多個Topic。架構
Broker信息會上報至NameServer,Consumer會從NameServer中拉取Broker和Topic的信息。併發
不少朋友都在問什麼是無狀態呢?狀態的有無實際上就是數據是否會作存儲,有狀態的話數據會被持久化,無狀態的服務能夠理解就是一個內存服務,NameServer自己也是一個內存服務,全部數據都存儲在內存中,重啓以後都會丟失。框架
在RocketMQ中的每一條消息,都有一個Topic,用來區分不一樣的消息。一個主題通常會有多個消息的訂閱者,當生產者發佈消息到某個主題時,訂閱了這個主題的消費者均可以接收到生產者寫入的新消息。異步
在Topic中有分爲了多個Queue,這實際上是咱們發送/讀取消息通道的最小單位,咱們發送消息都須要指定某個寫入某個Queue,拉取消息的時候也須要指定拉取某個Queue,因此咱們的順序消息能夠基於咱們的Queue維度保持隊列有序,若是想作到全局有序那麼須要將Queue大小設置爲1,這樣全部的數據都會在Queue中有序。socket
在上圖中咱們的Producer會經過一些策略進行Queue的選擇:
咱們同一組Consumer也會根據一些策略來選Queue,常見的好比平均分配或者一致性Hash分配。
要注意的是當Consumer出現下線或者上線的時候,這裏須要作重平衡,也就是Rebalance,RocketMQ的重平衡機制以下:
因爲重平衡是定時作的,因此這裏有可能會出現某個Queue同時被兩個Consumer消費,因此會出現消息重複投遞。
Kafka的重平衡機制和RocketMQ不一樣,Kafka的重平衡是經過Consumer和Coordinator聯繫來完成的,當Coordinator感知到消費組的變化,會在心跳過程當中發送重平衡的信號,而後由一個ConsumerLeader進行重平衡選擇,而後再由Coordinator將結果通知給全部的消費者。
在RocketMQ中Queue被分爲讀和寫兩種,在最開始接觸RocketMQ的時候一直覺得讀寫隊列數量配置不一致不會出現什麼問題的,好比當消費者機器不少的時候咱們配置不少讀的隊列,可是實際過程當中發現會出現消息沒法消費和根本沒有消息消費的狀況。
這個功能在RocketMQ在我看來明顯沒什麼用,由於基本上都會設置爲讀寫隊列大小同樣,這個爲啥不直接將其進行統一,反而容易讓用戶配置不同出現錯誤。
這個問題在RocketMQ的Issue裏也沒有收到好的答案。
通常來講消息隊列的消費模型分爲兩種,基於推送的消息(push)模型和基於拉取(poll)的消息模型。
基於推送模型的消息系統,由消息代理記錄消費狀態。消息代理將消息推送到消費者後,標記這條消息爲已經被消費,可是這種方式沒法很好地保證消費的處理語義。好比當咱們把已經把消息發送給消費者以後,因爲消費進程掛掉或者因爲網絡緣由沒有收到這條消息,若是咱們在消費代理將其標記爲已消費,這個消息就永久丟失了。若是咱們利用生產者收到消息後回覆這種方法,消息代理須要記錄消費狀態,這種不可取。
用過RocketMQ的同窗確定不由會想到,在RocketMQ中不是提供了兩種消費者嗎?
MQPullConsumer和MQPushConsumer,其中MQPushConsumer不就是咱們的推模型嗎?其實這兩種模型都是客戶端主動去拉消息,其中的實現區別以下:
消費模式咱們分爲兩種,集羣消費,廣播消費:
在Kafka中使用的原生的socket實現網絡通訊,而RocketMQ使用的是Netty網絡框架,如今愈來愈多的中間件都不會直接選擇原生的socket,而是使用的Netty框架,主要得益於下面幾個緣由:
選擇框架是一方面,而想要保證網絡通訊的高效,網絡線程模型也是一方面,咱們常見的有1+N(1個Acceptor線程,N個IO線程),1+N+M(1個acceptor線程,N個IO線程,M個worker線程)等模型,RocketMQ使用的是1+N1+N2+M的模型,以下圖所示:
1個acceptor線程,N1個IO線程,N2個線程用來作Shake-hand,SSL驗證,編解碼;M個線程用來作業務處理。這樣的好處將編解碼,和SSL驗證等一些可能耗時的操做放在了一個單獨的線程池,不會佔據咱們業務線程和IO線程。
作爲一個好的消息系統,高性能的存儲,高可用都不可少。
RocketMQ和Kafka的存儲核心設計有很大的不一樣,因此其在寫入性能方面也有很大的差異,這是16年阿里中間件團隊對RocketMQ和Kafka不一樣Topic下作的性能測試:
從圖上能夠看出:
那RocketMQ爲何在多Topic的狀況下,依然還能很好的保持較多的吞吐量呢?咱們首先來看一下RocketMQ中比較關鍵的文件:
這裏有四個目錄(這裏的解釋就直接用RocketMQ官方的了):
咱們發現咱們的消息主體數據並無像Kafka同樣寫入多個文件,而是寫入一個文件,這樣咱們的寫入IO競爭就很是小,能夠在不少Topic的時候依然保持很高的吞吐量。有同窗說這裏的ConsumeQueue寫是在不停的寫入呢,而且ConsumeQueue是以Queue維度來建立文件,那麼文件數量依然不少,在這裏ConsumeQueue的寫入的數據量很小,每條消息只有20個字節,30W條數據也才6M左右,因此其實對咱們的影響相對Kafka的Topic之間影響是要小不少的。咱們整個的邏輯能夠以下:
Producer不斷的再往CommitLog添加新的消息,有一個定時任務ReputService會不斷的掃描新添加進來的CommitLog,而後不斷的去構建ConsumerQueue和Index。
注意:這裏指的都是普通的硬盤,在SSD上面多個文件併發寫入和單個文件寫入影響不大。
讀取消息
Kafka中每一個Partition都會是一個單獨的文件,因此當消費某個消息的時候,會很好的出現順序讀,咱們知道OS從物理磁盤上訪問讀取文件的同時,會順序對其餘相鄰塊的數據文件進行預讀取,將數據放入PageCache,因此Kafka的讀取消息性能比較好。
RocketMQ讀取流程以下:
ConsumerQueue也是每一個Queue一個單獨的文件,而且其文件體積小,因此很容易利用PageCache提升性能。而CommitLog,因爲同一個Queue的連續消息在CommitLog實際上是不連續的,因此會形成隨機讀,RocketMQ對此作了幾個優化:
咱們首先須要選擇一種集羣模式,來適應咱們可忍耐的可用程度,通常來講分爲三種:
通常來講投入生產環境的話都會選擇第四種,來保證最高的可用性。
當咱們選擇好了集羣模式以後,那麼咱們須要關心的就是怎麼去存儲和複製這個數據,rocketMQ對消息的刷盤提供了同步和異步的策略來知足咱們的,當咱們選擇同步刷盤以後,若是刷盤超時會給返回FLUSH_DISK_TIMEOUT,若是是異步刷盤不會返回刷盤相關信息,選擇同步刷盤能夠盡最大程度知足咱們的消息不會丟失。
除了存儲有選擇以後,咱們的主從同步提供了同步和異步兩種模式來進行復制,固然選擇同步能夠提高可用性,可是消息的發送RT時間會降低10%左右。
咱們上面對於master-slave部署模式已經作了不少分析,咱們發現,當master出現問題的時候,咱們的寫入怎麼都會不可用,除非恢復master,或者手動將咱們的slave切換成master,致使了咱們的Slave在多數狀況下只有讀取的做用。RocketMQ在最近的幾個版本中推出了Dleger-RocketMQ,使用Raft協議複製CommitLog,而且自動進行選主,這樣master宕機的時候,寫入依然保持可用。
有關Dleger-RocketMQ的信息更多的能夠查看這篇文章:Dledger-RocketMQ 基於Raft協議的commitlog存儲庫。
定時消息和延時消息在實際業務場景中使用的比較多,好比下面的一些場景:
在開源版本的RocketMQ中延時消息並不支持任意時間的延時,須要設置幾個固定的延時等級,目前默認設置爲:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,從1s到2h分別對應着等級1到18,而阿里雲中的版本(要付錢)是能夠支持40天內的任什麼時候刻(毫秒級別)。咱們先看下在RocketMQ中定時任務原理圖:
能夠看見延時消息是利用新建單獨的Topic和Queue來實現的,若是咱們要實現40天以內的任意時間度,基於這種方案,那麼須要402460601000個queue,這樣的成本是很是之高的,那阿里雲上面的支持任意時間是怎麼實現的呢?這裏猜想是持久化二級TimeWheel時間輪,二級時間輪用於替代咱們的ConsumeQueue,保存Commitlog-Offset,而後經過時間輪不斷的取出當前已經到了的時間,而後再次投遞消息。具體的實現邏輯須要後續會單獨寫一篇文章。
事務消息一樣的也是RocketMQ中的一大特點,其能夠幫助咱們完成分佈式事務的最終一致性,有關分佈式事務相關的能夠看我之前的不少文章都有不少詳細的介紹,這裏直接關注公衆號:咖啡拿鐵。
具體使用事務消息步驟以下:
事務消息的使用整個流程相對以前幾種消息使用比較複雜,下面是事務消息實現的原理圖:
咱們發現RocketMQ實現事務消息也是經過修改原Topic信息,和延遲消息同樣,而後模擬成消費者進行消費,作一些特殊的業務邏輯。固然咱們還能夠利用這種方式去作RocketMQ更多的擴展。
這裏讓咱們在回到文章中提到的幾個問題:
想必讀完這篇文章,你心中已經有答案。這篇文章主要講了RocketMQ全面的設計架構,若是你尚未看夠,那麼就請關注個人公衆號吧。
若是你們以爲這篇文章對你有幫助,你的關注和轉發是對我最大的支持,O(∩_∩)O: