文章摘要:在發送消息給RocketMQ後,消費者須要消費。消息的消費比發送要複雜一些,那麼RocketMQ是如何來作的呢?
在RocketMQ系列文章的前面幾篇幅中已經對其「RPC通訊部分」和「普通消息發送」兩部分進行了詳細的闡述,本文將主要從消息消費爲切入點簡要地介紹下「RocketMQ中Pull和Push的兩種消費方式」、「RocketMQ中消費者(Push模式)的啓動流程」和「RocketMQ中Pull和Push兩種消費方式的簡要流程」。在閱讀本篇以前但願讀者可以先仔細閱讀下關於RocketMQ分佈式消息隊列的前幾篇文章:
(1)消息中間件—RocketMQ的RPC通訊(一)
(2)消息中間件—RocketMQ的RPC通訊(二)
(3)消息中間件—RocketMQ消息發送緩存
對於任何一款消息中間件而言,消費者客戶端通常有兩種方式從消息中間件獲取消息並消費:
(1)Push方式:由消息中間件(MQ消息服務器代理)主動地將消息推送給消費者;採用Push方式,能夠儘量實時地將消息發送給消費者進行消費。可是,在消費者的處理消息的能力較弱的時候(好比,消費者端的業務系統處理一條消息的流程比較複雜,其中的調用鏈路比較多致使消費時間比較久。歸納起來地說就是「慢消費問題」),而MQ不斷地向消費者Push消息,消費者端的緩衝區可能會溢出,致使異常;
(2)Pull方式:由消費者客戶端主動向消息中間件(MQ消息服務器代理)拉取消息;採用Pull方式,如何設置Pull消息的頻率須要重點去考慮,舉個例子來講,可能1分鐘內連續來了1000條消息,而後2小時內沒有新消息產生(歸納起來講就是「消息延遲與忙等待」)。若是每次Pull的時間間隔比較久,會增長消息的延遲,即消息到達消費者的時間加長,MQ中消息的堆積量變大;若每次Pull的時間間隔較短,可是在一段時間內MQ中並無任何消息能夠消費,那麼會產生不少無效的Pull請求的RPC開銷,影響MQ總體的網絡性能;服務器
思考題:
上面簡要說明了Push和Pull兩種消息消費方式的概念和各自特色。若是長時間沒有消息,而消費者端又不停的發送Pull請求不就會致使RocketMQ中Broker端負載很高嗎?那麼在RocketMQ中如何解決以作到高效的消息消費呢?網絡
經過研究源碼可知,RocketMQ的消費方式都是基於拉模式拉取消息的,而在這其中有一種長輪詢機制(對普通輪詢的一種優化),來平衡上面Push/Pull模型的各自缺點。基本設計思路是:消費者若是第一次嘗試Pull消息失敗(好比:Broker端沒有能夠消費的消息),並不當即給消費者客戶端返回Response的響應,而是先hold住而且掛起請求(將請求保存至pullRequestTable本地緩存變量中),而後Broker端的後臺獨立線程—PullRequestHoldService會從pullRequestTable本地緩存變量中不斷地去取,具體的作法是查詢待拉取消息的偏移量是否小於消費隊列最大偏移量,若是條件成立則說明有新消息達到Broker端(這裏,在RocketMQ的Broker端會有一個後臺獨立線程—ReputMessageService不停地構建ConsumeQueue/IndexFile數據,同時取出hold住的請求並進行二次處理),則經過從新調用一次業務處理器—PullMessageProcessor的處理請求方法—processRequest()來從新嘗試拉取消息(此處,每隔5S重試一次,默認長輪詢總體的時間設置爲30s)。
RocketMQ消息Pull的長輪詢機制的關鍵在於Broker端的PullRequestHoldService和ReputMessageService兩個後臺線程。對於RocketMQ的長輪詢(LongPolling)消費模式後面會專門詳細介紹。app
(1)Pull模式的Consumer端代碼以下:負載均衡
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("consumer"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest111"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: System.out.println(pullResult.getMsgFoundList().get(0).toString()); break; case NO_NEW_MSG: break SINGLE_MQ; case NO_MATCHED_MSG: case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { //TODO } } } consumer.shutdown();
在示例代碼中,能夠看到業務工程在Consumer啓動後,Consumer主動獲取MessageQueue的Set集合,遍歷該集合中的每個隊列,發送Pull的請求(參數中帶有隊列中的消息偏移量),同時須要Consumer端本身保存消息消費的offset偏移量至本地變量中。在Pull模式下,須要業務應用代碼自身去完成比較多的事情,所以在實際應用中用的較少。
(2)Push模式的Consumer端代碼以下:分佈式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest111", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setInstanceName("consumer1"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
在示例代碼中,業務工程的應用程序使用Push方式進行消費時,Consumer端註冊了一個監聽器,Consumer在收到消息後主動調用這個監聽器完成消費並進行對應的業務邏輯處理。因而可知,業務應用代碼只須要完成消息消費便可,無需參與MQ自己的一些任務處理(ps:業務代碼顯得更爲簡潔一些)。ide
這一節主要先講下RocketMQ消費者的啓動流程,看下在啓動的時候究竟完成了什麼樣的操做。因爲RocketMQ的DefaultMQPushConsumer和DefaultMQPullConsumer啓動流程大部分相似,而DefaultMQPushConsumer更爲複雜一些,所以這一節內容主要講的是DefaultMQPushConsumer啓動流程。Push方式的Consumer啓動流程的時序圖以下圖所示:性能
RocketMQ的PushConsumer啓動時序圖.jpgfetch
從上面的時序圖上能夠看出,Push方式的Consumer啓動流程完成的任務比較多,主要任務以下:
(1)設置consumerGroup、NameServer服務地址、消費起始偏移地址並根據參數Topic構建Consumer端的SubscriptionData(訂閱關係值);
(2)在Consumer端註冊消費者監聽器,當消息到來時完成消費消息;
(3)啓動defaultMQPushConsumerImpl實例,主要完成前置校驗、複製訂閱關係(將defaultMQPushConsumer的訂閱關係複製至rebalanceImpl中,包括retryTopic(重試主題)對應的訂閱關係)、建立MQClientInstance實例、設置rebalanceImpl的各個屬性值、pullAPIWrapper包裝類對象的初始化、初始化offsetStore實例並加載消費進度、啓動消息消費服務線程以及在MQClientInstance中註冊consumer等任務;
(4)啓動MQClientInstance實例,其中包括完成客戶端網絡通訊線程、拉取消息服務線程、負載均衡服務線程和若干個定時任務的啓動;
(5)向全部的Broker端發送心跳(採用加鎖方式);
(6)最後,喚醒負載均衡服務線程在Consumer端開始負載均衡;優化
RocketMQ提供了兩種消費模式,Push和Pull,大多數場景使用的是Push模式,在源碼中這兩種模式分別對應的是DefaultMQPushConsumer類和DefaultMQPullConsumer類。Push模式實際上在內部仍是使用的Pull方式實現的,經過Pull不斷地輪詢Broker獲取消息,當不存在新消息時,Broker端會掛起Pull請求,直到有新消息產生才取消掛起,返回新消息。
(1)RocketMQ的Pull消費模式流程簡析
RocketMQ的Pull模式相對來得簡單,從上面的demo代碼中能夠看出,業務應用代碼經過由Topic獲取到的MessageQueue直接拉取消息(最後真正執行的是PullAPIWrapper的pullKernelImpl()方法,經過發送拉取消息的RPC請求給Broker端)。其中,消息消費的偏移量須要Consumer端本身去維護。
(2)RocketMQ的Push消費模式流程簡析
在本文前面已經提到過了,從嚴格意義上說,RocketMQ並無實現真正的消息消費的Push模式,而是對Pull模式進行了必定的優化,一方面在Consumer端開啓後臺獨立的線程—PullMessageService不斷地從阻塞隊列—pullRequestQueue中獲取PullRequest請求並經過網絡通訊模塊發送Pull消息的RPC請求給Broker端。另一方面,後臺獨立線程—rebalanceService根據Topic中消息隊列個數和當前消費組內消費者個數進行負載均衡,將產生的對應PullRequest實例放入阻塞隊列—pullRequestQueue中。這裏算是比較典型的生產者-消費者模型,實現了準實時的自動消息拉取。而後,再根據業務反饋是否成功消費來推進消費進度。
在Broker端,PullMessageProcessor業務處理器收到Pull消息的RPC請求後,經過MessageStore實例從commitLog獲取消息。如1.2節內容所述,若是第一次嘗試Pull消息失敗(好比Broker端沒有能夠消費的消息),則經過長輪詢機制先hold住而且掛起該請求,而後經過Broker端的後臺線程PullRequestHoldService從新嘗試和後臺線程ReputMessageService的二次處理。
思考題
使用RocketMQ的Pull模式進行消息消費時,由上面可知該模式下無需自動拉取消息,這樣在DefaultMQPullConsumerImpl啓動時,消息拉取線程—PullMessageService和消息隊列負載線程—RebalanceService其實也就不必啓動,但實際上卻啓動了,這裏會有問題麼?
RocketMQ的消息消費(一)(入門篇幅)就先分析到這裏了。建議讀者能夠將做者以前寫的三篇文章—「RocketMQ的RPC通訊(一)/(二)」以及「RocketMQ消息發送」結合起來讀,這樣會總體會更加連貫,收穫更大。關於RocketMQ消息消費的內容比較多也比較複雜,涉及「Consumer端的負載均衡機制」、「RocketMQ的長輪詢機制」和「RocketMQ中Pull和Push消費模式的細節內容」將在後續篇幅進行介紹和分析。限於筆者的才疏學淺,對本文內容可能還有理解不到位的地方,若有闡述不合理之處還望留言一塊兒探討。
做者:癲狂俠 連接:https://www.jianshu.com/p/f071d5069059 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。