Name Server:維護broker的地址列表,以及topic和topic對應的隊列的地址列表。每一個broker與每一個Name Server之間使用長鏈接來保持心跳,並向其定時註冊topic信息。能夠從兩個維度來理解Name Server的能力: 1)Name Server能夠提供一個特定的topic對應的broker地址列表;2)Name Server能夠提供一臺broker上包含的全部topic列表。輕量級的名稱服務。幾乎無狀態的節點,相互之間不會有數據同步。 主要提供topic路由信息的註冊及查詢。(MetaQ 1.x和MetaQ 2.x是依賴ZooKeeper的,因爲ZooKeeper功能太重,RMetaQ 3.x去掉了對ZooKeeper依賴,採用本身的NameServer)java
Producer和集羣中某一個Name Server間採用長鏈接,Producer按期從Name Server中獲取到Topic對應的broker地址列表,即Topic路由信息,並緩存在本地,而後選擇一臺合適的master broker發佈消息。 注意producer發佈消息是將消息發佈到與對應的master broker上,再由master broker同步到slave broker上。web
Consumer和集羣中某一個Name Server間採用長鏈接,並按期從Name Server中獲取到Topic路由信息,而後選擇合適的broker拉取消息進行消費。數據庫
在 Metaq2.x 以前版本,隊列也稱爲「分區」,二者描述的是一個概念。 可是按照 2.x 的實現,使用隊列描 述更合適。json
數據存儲分爲兩級,物理隊列+邏輯隊列。
物理隊列:一個broker只有一個物理隊列,全部發到broker的數據都會順序寫入該隊列,當一個文件被寫滿時(默認爲1G),會新建文件繼續寫入。
邏輯隊列:當consumer消費數據時,consumer先根據nameServer提供的路由信息定位到broker,再從broker的消費隊列讀取index,從而定位到物理隊列的位置。一個topic有多個分區,每一個分區對應一個消費隊列,而消費隊列由index組成。數組
commitlog消息文件:broker在接收到生產者發送來的消息後,是如何對其進行存儲的呢?在MetaQ中,真正的消息自己其實是存放在broker本地一個名爲commitlog的文件中的,而且這個commitlog的寫入是不區分Topic的,即不論什麼Topic的消息,都會在接收到以後順序寫入commitlog文件中,commitlog的文件名就是起始字節位置 寫滿後,產生一個新的文件。緩存
索引文件:讀取的時候又是怎麼從commitlog中找到消息的呢?的確,僅僅只存儲消息自己是沒法作到這個的(由於在僅有commitlog文件的前提下,消息的長度、類型等信息都是沒法肯定的),因此MetaQ還有索引文件(在一些文檔中也稱爲Message Queue)。broker將消息存儲到commitlog文件後,會將該消息在文件的物理位置(offset),消息大小,消息類型等信息封裝成一個固定大小的數據結構,稱爲索引單元。其中,offset是java long型,有64位,從理論上講,offset在100年內都不會發生溢出,因此能夠認爲message queue長度無限。從而簡單地,能夠把message queue理解爲是一個長度無限的數組,offset就是下標。多個索引單元組成一個索引文件,和commitlog文件同樣,文件名是起始字節位置,寫滿後,產生一個新的文件。服務器
broker 將消息存儲到文件後,會將該消息在文件的物理位置,消息大小,消息類型封裝成一個固定大 小的數據結構,暫且稱這個數據結構爲索引單元吧,大小固定爲 16k,消息在物理文件的位置稱爲 offset。
多個索引單元組成了一個索引文件,索引文件默認固定大小爲 20M,和消息文件同樣,文件名是 起始字節位置,寫滿後,產生一個新的文件。網絡
metaq的消息存儲由commit log和邏輯隊列consume queue配合完成。 首先會將全部的消息分topic存在commit log文件中,commit log文件最大爲1G,超過1G會生成新文件,文件以起始字節大小命名。consume queue邏輯隊列至關因而commit log文件的索引,記錄offset偏移量,size長度,消息的hashcode等信息。物理隊列只有一個(也就是commit log文件),採用固定大小的文件順序存儲消息。邏輯隊列有多個(每一個對應一個topic),每一個邏輯隊列有多個分區(topicA_1分區,topicA_2分區,topicA_3分區),每一個分區有多個索引單元。MetaQ中將每個Topic分爲了幾個區,每一個區對應了一個消費隊列,這些消費隊列就由各個索引文件組成。消費端在拉取消息時,只要知道本身是訂閱的Topic從nameserver獲取broker地址創建鏈接後,就能根據消費隊列中的索引文件,去物理隊列中獲取訂閱的消息。以下圖所示,topicA在broker1 和 broker2分別有topicA_1分區, topicA_2分區;topicA_3 分區,topicA_4 分區。每一個分區裏存是的commit log文件的索引信息。消費信息時,須要經過索引信息,到commit log文件中獲取真正的數據信息進行消費。仔細觀察圖metaq_arch_1/2/3.png數據結構
offset:這個概念其實放在這裏講略微有些早了,能夠在瞭解完MetaQ 的消息生產模型以後再來了解。兩個「offset」,剛開始仍是有點迷惑的,梳理以後才理出了頭緒,因而這裏把這兩種offset拿出來專門加以區分。併發
對於某一特定Topic而言,brokerId和分區號組合起來就是一個有序的分區列表, 如Topic 「hello」對應的有序分區列表爲{A-0,A-1,B-0,B-1,B-2},生產者生產消息實際上能夠理解爲是以topic下的分區爲單位進行的,即生產者按照必定規則向「brokerId-分區號」組成的有序分區列表對應的分區發送消息。發送的規則能夠定製,通常採用輪詢的方式。
消息的存儲就是藉助以前介紹過的commitlog文件和索引文件來實現的。
消費者消費消息也是以topic下的分區爲單位,分組內一個消費者對應消耗一個分區的消息。這樣一來就出現如下兩種狀況:
當Topic下的分區數足夠大的時候,能夠認爲消費者負載是平均分配的。
因而,消息的拉取過程描述以下:
(1) 根據Topic和分區號找到對應的邏輯消費隊列,記爲A;
(2) 根據A和offset找到對應的待消費的索引位置,記爲B;
(3) 從B開始,讀取B所對應的commitlog文件中的消息放入消費者隊列中,直到讀取到的消息長度等於給定的maxSize。在這個過程當中,offset同時後移更新。
(4) 返回結果,結果中包含更新後的offset值,並將offset保存下來做爲下一次消費開始的位置標誌。
首先,從以上截圖能夠驗證:
(1) 一個broker上能夠發佈多個Topic(圖中有TopicA、TopicB)
(2) 一個Topic下能夠對應多個分區(圖中TopicA下有0和1兩個分區,TopicB下有0,1,2三個分區)
(3) commitlog文件是不區分Topic的消息存儲文件,全部Topic下的消息都會順序寫到同一個commitlog文件中
(3) 不一樣的ConsumerGroup之間獨立進行消息的消費,消費過程組間互不影響。不一樣的消費者分組有本身獨有的消費隊列。
能夠看到,針對每個ConsumerGroup,都維護有一個重試隊列(RETRY Queue)和一個死信隊列(DLQ Queue)。 其中,重試隊列用於消費失敗後的重試,而且設置有最大重試次數(默認是16次),死信隊列存放屢次重試後依舊沒法消費的消息。消費者在消費消息的過程當中,若是消費失敗,則將這條消息加入到重試隊列中,由重試線程繼續進行重試消費,若是重試最大次數後仍是消費失敗,則將消息加入到死信隊列中,加入到死信隊列中的消息須要進行人工干預。在這個過程當中,主線程繼續日後推動,從而實現消息的亂序消費和相對順序消費。
當前例子是PushConsumer用法,使用方式給用戶感受是消息從MetaQ服務器推到了應用客戶端。 可是實際PushConsumer內部是使用長輪詢Pull方式從MetaQ服務器拉消息,而後再回調用戶Listener方法
RocketMQ消息有有序保證明例:
@Test public void testQueueProducer() { try { MetaProducer producer = new MetaProducer(GROUP_NAME); //隊列的並行度是15個 producer.setDefaultTopicQueueNums(15); producer.start(); // 設置tag是爲了讓消費端過濾不想要的tag String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; //do { for (int i = 0; i < 50000; i++) { // 保證訂單ID相同的消息放在同一個隊列中 int orderId = i % 10; Message msg = new Message(TOPIC, tags[i % tags.length], "KEY" + i, ("Hello MetaQ " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //arg就是orderId Integer id = (Integer) arg; System.out.println(String.format("mq size=%d,arg=%d", mqs.size(), id)); // id即orderId取隊列大小的模 目的就是設置訂單相同放在一個隊列中 int index = id % mqs.size(); System.out.println("index=" + index); return mqs.get(index); } }, orderId); System.out.println(sendResult); } Thread.sleep(10000l); //} while (true); producer.shutdown(); } catch (Exception e) { e.printStackTrace(); } // SendResult [sendStatus=SEND_OK, msgId=0A6549250000277400000AC0059D66F9, messageQueue=MessageQueue [topic=orderTestTopic, brokerName=taobaodaily-f, queueId=3], queueOffset=653] } @Test public void testOrderConsumerQueue() throws MQClientException, InterruptedException { MetaPushConsumer consumer = new MetaPushConsumer(GROUP_NAME); consumer.subscribe(TOPIC, "TagB"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); try { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + new String(msgs.get(0).getBody(), "UTF-8")); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); Thread.sleep(10000l); // ConsumeMessageThread_7 Receive New Messages: Hello MetaQ 67 }
@Test public void testQueueProducer() { try { MetaProducer producer = new MetaProducer(GROUP_NAME); //隊列的並行度是15個 producer.setDefaultTopicQueueNums(15); producer.start(); // 設置tag是爲了讓消費端過濾不想要的tag String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; //do { for (int i = 0; i < 50000; i++) { // 保證訂單ID相同的消息放在同一個隊列中 int orderId = i % 10; Message msg = new Message(TOPIC, tags[i % tags.length], "KEY" + i, ("Hello MetaQ " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //arg就是orderId Integer id = (Integer) arg; System.out.println(String.format("mq size=%d,arg=%d", mqs.size(), id)); // id即orderId取隊列大小的模 目的就是設置訂單相同放在一個隊列中 int index = id % mqs.size(); System.out.println("index=" + index); return mqs.get(index); } }, orderId); System.out.println(sendResult); } Thread.sleep(10000l); //} while (true); producer.shutdown(); } catch (Exception e) { e.printStackTrace(); } // SendResult [sendStatus=SEND_OK, msgId=0A6549250000277400000AC0059D66F9, messageQueue=MessageQueue [topic=orderTestTopic, brokerName=taobaodaily-f, queueId=3], queueOffset=653] } @Test public void testOrderConsumerQueue() throws MQClientException, InterruptedException { MetaPushConsumer consumer = new MetaPushConsumer(GROUP_NAME); consumer.subscribe(TOPIC, "TagB"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); try { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + new String(msgs.get(0).getBody(), "UTF-8")); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); Thread.sleep(10000l); // ConsumeMessageThread_7 Receive New Messages: Hello MetaQ 67 }
從輸出結果能夠看出mq size=8, 分佈在兩臺機器上,brokerName=taobaodaily-05-tx, queueId=0/1/2/3, brokerName=taobaodaily-04-timer, queueId=0/1/2/3
總結
發送順序消息沒法利用集羣FailOver特性
消費順序消息的並行度依賴於隊列數量
隊列熱點問題,個別隊列因爲哈希不均致使消息過多,消費速度跟不上,產生消息堆積問題
遇到消息失敗的消息,沒法跳過,當前隊列消費暫停
發送消息負載均衡:發送消息經過輪詢隊列的方式 發送,每一個隊列接收平均的消息量。經過增長機器,能夠水平擴展隊列容量。 另外也能夠自定義方式選擇發往哪一個隊列。
訂閱消息負載均衡:若是有 5 個隊列,2 個 consumer,那麼第一個 Consumer 消費 3 個隊列,第二 consumer 消費 2 個隊列。 這樣便可達到平均消費的目的,能夠水平擴展 Consumer 來提升消費能力。可是 Consumer 數量要小於等於隊列數 量,若是 Consumer 超過隊列數量,那麼多餘的 Consumer 將不能消費消息。
事務消息:
圖transaction
消息過濾:
(1). 在 Broker 端進行 Message Tag 比對,先遍歷 Consume Queue,若是存儲的 Message Tag 與訂閱的 Message Tag 不符合,則跳過,繼續比對下一個,符合則傳輸給 Consumer。注意:Message Tag 是字符串形式,ConsumeQueue 中存儲的是其對應的 hashcode,比對時也是比對 hashcode。
(2). Consumer 收到過濾後的消息後,一樣也要執行在 Broker 端的操做,可是比對的是真實的 Message Tag 字符串,而不是 Hashcode。
零拷貝原理:
圖zero_copy_1,zero_copy_2
在broker向消費端發送消息時,若採用傳統的IO方式b會從磁盤拷貝數據到頁緩存->用戶空間從頁緩存讀數據->用戶空間再將數據寫入socket緩存中->經過網絡發送數據,顯然這樣作使得用戶空間和內核空間產生了多餘的讀寫。MetaQ採用零拷貝的方式,經過mmap的方式使得頁緩存與用戶空間緩存共享數據,以後直接將數據從頁緩存中將數據傳入socket緩存中加快效率。
metaq如何解決事務問題:通常分佈式事務採用的kv存儲方式,經過key尋找message,第二階段的回滾或者提交須要修改消息狀態;然而metaq第一階段發送prepared消息,拿到offset,第二階段經過offset訪問消息,修改數據狀態。經過offset訪問更改數據的缺點是系統髒也過多。
消息無序性,如何保證其順序性: 消息的有序消費是指按照消息發送的前後順序消費。正常狀況下,單線程下produder同一個topic(一個topic邏輯上是一個隊列,物理上分爲多個隊列)下,metaq支持局部順序消費。分兩種順序消費方式普通順序消費和嚴格順序消費。普通順序消費,當一個broker宕機或者重啓時,容許該部分消息延遲消費,其餘broker照常消費;嚴格消費,當某broker宕機或者重啓時,犧牲分佈式的failover特性,整個集羣都不能使用,大大下降服務可用性,目前使用場景中數據庫binlog同步強依賴嚴格順序消費,其餘應用場景用普通順序消費可知足。然而在平時的應用中,單線程producer幾乎不可能,咱們能夠經過設計規避這個缺點。例如同一個訂單須要發送的建立訂單消息、訂單付款消息、訂單完成按消費順序消費纔有意義,按優先級發送消息(metaq按優先級發消息開銷比較大,沒有特地支持這個特性)同一個隊列,優先級高的消息先發送,能夠在消費端作異常狀況的處理邏輯。
非順序消息如何實現隊列內並行:PullService根據rebalance結果從對應的隊列中獲取數據,並將其緩存到客戶端本地內存,而後根據客戶端設置規則(一次批量消費消息的數目)將數據分紅多段派發給下游的消費線程。消費端一個隊列只維護一個offset,消費成功後只提交最小的offset。將拉回來的數據分紅了三段(0-10,10-20,20-30)分別派發給消費線程。 0-10,20-30這兩部分數據消費完成,但10-20這段數據還未消費完成。提交服務端消費位點則爲10(消費成功的是10和30),知道這段數據消費完成提交位點纔會爲30。
PS:因此併發消費有可能出現重複消費的問題。如中間有部分數據一直沒有被成功消費,此時從新負載,別的消費端拿到當前隊列就會致使重複消費。
最佳實踐
Producer最佳實踐
一、每一個消息在業務層面的惟一標識碼,要設置到 keys 字段,方便未來定位消息丟失問題。因爲是哈希索引,請務必保證 key 儘量惟一,這樣能夠避免潛在的哈希衝突。
二、消息發送成功或者失敗,要打印消息日誌,務必要打印 sendresult 和 key 字段。
三、對於消息不可丟失應用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或者人工觸發重發。
四、某些應用若是不關注消息是否發送成功,請直接使用sendOneWay方法發送消息。
Consumer最佳實踐 一、消費過程要作到冪等(即消費端去重) 二、儘可能使用批量方式消費方式,能夠很大程度上提升消費吞吐量。 三、優化每條消息消費過程