[RocketMQ]消息中間件—RocketMQ消息消費(二)(push模式實現)

摘要:在RocketMQ中,消息消費都是基於Pull消息方式,那麼Push模式中又是如何實現Consumer端準實時消費的呢?
在上一篇—「消息中間件—RocketMQ消息消費(一)」中,已經簡要地介紹了下RocketMQ中「Pull和Push兩種消費方式的簡要流程」以及「Push消費方式的啓動流程」(ps:若是不熟悉這幾塊內容的童鞋,能夠本身回顧下上一篇的內容)。本文將詳細介紹RocketMQ中Push消費方式下的「Pull消息的長輪詢機制」和「Consumer端的負載均衡機制」這兩塊關鍵核心內容。
因爲RocketMQ系列的技術分享存在必定的連續性,所以但願讀者能回顧下往期RocketMQ分享的篇幅:
(1)消息中間件—RocketMQ的RPC通訊(一)
(2)消息中間件—RocketMQ的RPC通訊(二)
(3)消息中間件—RocketMQ消息發送
(4)消息中間件—RocketMQ消息消費(一)算法

1、RocketMQ中長輪詢的Pull消息機制

在上一篇中,已經簡略地介紹過RocketMQ中消息消費時Pull消息的長輪詢機制了,其主要的思路是:Consumer若是第一次嘗試Pull消息失敗(好比:Broker端沒有能夠消費的消息),並不當即給消費者客戶端返回Response的響應,而是先hold住而且掛起請求。而後在Broker端,經過後臺獨立線程—PullRequestHoldService重複嘗試執行Pull消息請求來取消息。同時,另一個ReputMessageService線程不斷地構建ConsumeQueue/IndexFile數據,並取出hold住的Pull請求進行二次處理。經過這種長輪詢機制,便可解決Consumer端須要經過不斷地發送無效的輪詢Pull請求,而致使整個RocketMQ集羣中Broker端負載很高的問題。數組

1.1 Consumer向Broker端發送Pull消息請求的主要過程

在RocketMQ的Consumer端,後臺獨立線程服務—pullMessageService是Pull消息請求的發起者,它不斷地嘗試從阻塞隊列—LinkedBlockingQueue<PullRequest>中獲取元素PullRequest,並根據pullRequest中的參數以及訂閱關係信息調用pullAPIWrapper的pullKernelImpl()方法發送封裝後的Pull消息請求—PullMessageRequestHeader至Broker端來拉取消息(具體完成發送一次Pull消息的PRC通訊請求的是MQClientAPIImpl中的pullMessage()方法)。這裏涉及細節的時序圖(ps:時序圖中沒有涉及PRC異步通訊中的callback過程)以下:緩存

Consumer向Broker端發送長輪詢請求的時序圖.jpg網絡


其中, DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法是發送Pull消息請求的關鍵:
(1)校驗ProcessQueue是否「drop」, 若是爲「drop」爲true則直接返回(這個「drop」的設置在下面一節—「Consumer端的負載均衡機制」中會提到);
(2)給ProcessQueue設置Pull消息的時間戳;
(3)作流量控制,對於知足下面條件的任何一種狀況,稍後再發起Pull消息的請求;
條件1:正在消費的隊列中,未被消費的消息數和消息大小超過閥值(默認每一個隊列消息數爲1000個/消息存儲容量100MB);
條件2:若是是順序消費,正在消費的隊列中,消息的跨度超過閥值(默認2000);
(4)根據topic獲取訂閱關係—SubscriptionData;
(5)構建Pull消息的回調對象—PullBack,這裏從Broker端Pull消息的返回結果處理是經過異步回調(發送異步通訊RPC請求),其中若是Broker端返回Pull消息成功,在回調方法中先填充至處理隊列—processQueue中(將Pull下來的消息,設置到ProcessQueue的msgTreeMap容器中),而後經過消費消息的服務線程—consumeMessageService,將封裝好的ConsumeRequest提交至消費端消費線程池—consumeExecutor異步執行處理(具體處理邏輯:經過業務應用系統在DefaultMQPushConsumer實例中註冊的消息監聽器完成業務端的消息消費);
(6)從Consumer端內存中獲取commitOffsetValue;
(7)經過RocketMQ的Remoting通訊層向Broker端發送Pull消息的RPC請求;app

 

1.2 Broker端處理Pull消息請求的通常過程

這裏先來講下對於通常狀況下(即爲所要Pull的消息在RocketMQ的Broker端已是存在,通常能夠Pull到的狀況),Broker端處理這個Pull消息請求的主要過程。其時序圖(ps:圖中只是畫了大部分的流程,詳細細節還須要對照源碼看下)以下:負載均衡

Broker端接受長輪詢請求的處理時序圖.jpg異步


從上面的簡易時序圖中能夠看到Broker端Pull消息的主要關鍵點以下:
(1)Pull消息的業務處理器—PullMessageProcessor的processRequest爲處理拉取消息請求的入口,在設置reponse返回結果中的opaque值後,就完成一些前置的校驗(Broker是否可讀、Topic/ConsumerGroup是否存在、讀取隊列Id是否在Topic配置的隊列範圍數內);
(2)根據「ConsumerGroup」、「Topic」、「queueId」和「offset」這些參數來調用MessageStore實例的getMessage()方法來產嘗試讀取Broker端的消息;
(3)其中,經過findConsumeQueue()方法,獲取邏輯消費隊列—ConsumeQueue;
(4)根據offset與邏輯消費隊列中的maxOffset、minOffset的比較,來設置狀態值status,同時計算出下次Pull消息的開始偏移量值—nextBeginOffset,而後經過MappedFile的方式獲取ConsumeQueue的Buffer映射結果值;
(5)根據算出來的offsetPy(物理偏移量值)和sizePy(消息的物理大小),從commitLog獲取對應消息的Buffer映射結果值,並填充至GetMessageResult返回對象,並設置返回結果(狀態/下次其實偏移量/maxOffset/minOffset)後return;
(6)根據isTransferMsgByHeap的設置狀況(默認爲true),選擇下面兩種方式之一來真正讀取GetMessageResult的消息內容並返回至Consumer端;
方式1:使用JDK NIO的ByteBuffer,循環地讀取存有消息內容的messageBufferList至堆內內存中,返回byte[]字節數組,並設置到響應的body中;而後,經過RPC通訊組件—NettyRemotingServer發送響應至Consumer端;
方式2:採用基於Zero-Copy的Netty組件的FileRegion,其包裝的「FileChannel.tranferTo」實現文件傳輸,能夠直接將文件緩衝區的數據發送至通訊目標通道Channel中,避免了經過循環write方式致使的內存拷貝開銷,這種方式性能上更優;
(7)在PullMessageProcessor業務處理器的最後,提交併持久化消息消費的offset偏移量進度;ide

 

1.3 Broker端對於Pull請求掛起處理的流程

說完了Pull消息請求的通常流程,下面主要看下Broker端的PullMessageProcessor業務處理器在RocketMQ中尚未消息能夠拉取狀況下(即爲:PULL_NOT_FOUND)的處理流程,本節內容也是RocketMQ中長輪詢機制的關鍵。
長輪詢機制是對普通輪詢的一種優化方案,它平衡了傳統Push/Pull模型的各自缺點,Server端若是當前沒有Client端請求拉取的相關數據會hold住這個請求,直到Server端存在相關的數據,或者等待超時時間後返回。在響應返回後,Client端又會再次發起下一次的長輪詢請求。RocketMQ的push模式正是採用了這種長輪詢機制的設計思路,若是在上面所述的第一次嘗試Pull消息失敗後(好比Broker端暫時沒有能夠消費的消息),先hold住而且掛起該請求(這裏,設置返回響應response爲null,此時不會向Consumer端發送任何響應的內容,即不會對響應結果進行處理),而後經過Broker端的後臺線程PullRequestHoldService從新嘗試和後臺線程ReputMessageService的二次處理。在Broker端,兩個後臺線程服務PullRequestHoldService和ReputMessageService是實現長輪詢機制的關鍵點。下面就來分別介紹這兩個服務線程:
(1)PullRequestHoldService:該服務線程會從pullRequestTable本地緩存變量中取PullRequest請求,檢查輪詢條件—「待拉取消息的偏移量是否小於消費隊列最大偏移量」是否成立,若是條件成立則說明有新消息達到Broker端,則經過PullMessageProcessor的executeRequestWhenWakeup()方法從新嘗試發起Pull消息的RPC請求(此處,每隔5S重試一次,默認長輪詢總體的時間設置爲30s);
(2)ReputMessageService:該服務線程會在Broker端不斷地從數據存儲對象—commitLog中解析數據並分發請求,隨後構建出ConsumeQueue(邏輯消費隊列)和IndexFile(消息索引文件)兩種類型的數據。同時從本地緩存變量—pullRequestTable中,取出hold住的PullRequest請求並執行二次處理(具體的作法是,在PullMessageProcessor的executeRequestWhenWakeup()方法中,經過業務線程池pullMessageExecutor,異步提交從新Pull消息的請求任務,即爲從新調了一次PullMessageProcessor業務處理器的processRequest()方法,來實現Pull消息請求的二次處理)。這裏,ReputMessageService服務線程,每處理一次,Thread.sleep(1),繼續下一次處理。性能

2、Consumer端的負載均衡機制

看了上面一節—「RocketMQ中長輪詢的Pull消息機制」後,你們可能會有這樣子一個疑問:在Consumer端pullMessageService線程做爲消息的主動拉取者不斷地從阻塞隊列中獲取元素PullRequest,那麼這裏的PullRequest是在哪兒由哪一個線程放入至阻塞隊列中的呢?本節內容將介紹「Consumer端的負載均衡機制」,同時解答上面的疑問。優化

2.1 RocketMQ爲什麼須要在Consumer端作負載均衡?

在RocketMQ中,Consumer端的兩種消費模式(Push/Pull)都是基於拉模式來Pull消息的,而在Push模式中只是採用了長輪詢的方式而實現了準實時的自動消息拉取。在兩種基於拉模式的消費方式(Push/Pull)中,均須要Consumer端在知道從Broker端的哪個消息隊列—MessageQueue中去Pull消息。所以,消息隊列的負載均衡處理(即Broker端中多個MessageQueue分配給同一個ConsumerGroup中的哪些Consumer消費),由Consumer端來主動完成更爲合理。

2.2 Consumer端負載均衡的主要流程

1. Consumer端的心跳包發送
在Consumer啓動後,它就會經過定時任務不斷地向RocketMQ集羣中的全部Broker實例發送心跳包(其中包含了,消息消費分組名稱、訂閱關係集合、消息通訊模式和客戶端id的值等信息)。Broker端在收到Consumer的心跳消息後,會將它維護在ConsumerManager的本地緩存變量—consumerTable,同時並將封裝後的客戶端網絡通道信息保存在本地緩存變量—channelInfoTable中,爲以後作Consumer端的負載均衡提供能夠依據的元數據信息。
2. Consumer端實現負載均衡的核心類—RebalanceImpl
在上一篇文章的"Consumer啓動流程"中已經介紹了在啓動MQClientInstance實例時候,會完成負載均衡服務線程—RebalanceService的啓動(每隔20s執行一次)。經過查看源碼能夠發現,RebalanceService線程的run()方法最終調用的是RebalanceImpl類的rebalanceByTopic()方法,該方法是實現Consumer端負載均衡的核心關鍵。
這裏,rebalanceByTopic()方法會根據消費者通訊類型爲「廣播模式」仍是「集羣模式」作不一樣的邏輯處理。這裏主要來看下集羣模式下的主要處理流程:
(1)從rebalanceImpl實例的本地緩存變量—topicSubscribeInfoTable中,獲取該Topic主題下的消息消費隊列集合(mqSet);
(2)根據topic和consumerGroup爲參數調用mQClientFactory.findConsumerIdList()方法向Broker端發送獲取該消費組下消費者Id列表的RPC通訊請求(Broker端基於前面Consumer端上報的心跳包數據而構建的consumerTable作出響應返回,業務請求碼:GET_CONSUMER_LIST_BY_GROUP);
(3)先對Topic下的消息消費隊列、消費者Id排序,而後用消息隊列分配策略算法(默認爲:消息隊列的平均分配算法),計算出待拉取的消息隊列。

Consumer端負載均衡策略的分配.jpg


這裏的平均分配算法,相似於分頁的算法,將全部MessageQueue排好序相似於記錄,將全部消費端Consumer排好序相似頁數,並求出每一頁須要包含的平均size和每一個頁面記錄的範圍range,最後遍歷整個range而計算出當前Consumer端應該分配到的記錄(這裏即爲:MessageQueue)。具體的算法代碼以下:

 

@Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        //省略代碼......
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        //省略代碼......
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;

(4)而後,調用updateProcessQueueTableInRebalance()方法,具體的作法是,先將分配到的消息隊列集合(mqSet)與processQueueTable作一個過濾比對,具體的過濾比對方式以下圖:

RebalancePushImpl負載均衡(分發pullRequest到pullRequestQueue).jpg


這裏能夠分以下兩種狀況來篩選過濾:
a.圖中processQueueTable標註的紅色部分,表示與分配到的消息隊列集合mqSet互不包含。將這些隊列設置Dropped屬性爲true,而後查看這些隊列是否能夠移除出processQueueTable緩存變量,這裏具體執行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否能夠獲取當前消費處理隊列的鎖,拿到的話返回true。若是等待1s後,仍然拿不到當前消費處理隊列的鎖則返回false。若是返回true,則從processQueueTable緩存變量中移除對應的Entry;
b.圖中processQueueTable的綠色部分,表示與分配到的消息隊列集合mqSet的交集。判斷該ProcessQueue是否已通過期了,在Pull模式的不用管,若是是Push模式的,設置Dropped屬性爲true,而且調用removeUnnecessaryMessageQueue()方法,像上面同樣嘗試移除Entry;
最後,爲過濾後的消息隊列集合(mqSet)中的每一個MessageQueue建立一個ProcessQueue對象並存入RebalanceImpl的processQueueTable隊列中(其中調用RebalanceImpl實例的computePullFromWhere(MessageQueue mq)方法獲取該MessageQueue對象的下一個進度消費值offset,隨後填充至接下來要建立的pullRequest對象屬性中),並建立拉取請求對象—pullRequest添加到拉取列表—pullRequestList中,最後執行dispatchPullRequest()方法,將Pull消息的請求對象PullRequest依次放入PullMessageService服務線程的阻塞隊列pullRequestQueue中,待該服務線程取出後向Broker端發起Pull消息的請求。其中,能夠重點對比下,RebalancePushImpl和RebalancePullImpl兩個實現類的dispatchPullRequest()方法不一樣,RebalancePullImpl類裏面的該方法爲空,這樣子也就回答了上一篇中最後的那道思考題了。

 

3、總結

RocketMQ的消息消費(二)(push模式實現)篇幅就先分析到這裏了。關於RocketMQ消息消費的內容比較多也比較複雜,須要讀者結合源碼並屢次debug才能對其有一個較爲深入的理解。另外,對於消息消費部分的「「消息ACK機制」、「消費重試機制」等剩餘內容將在後續的篇幅進行介紹和分析。限於筆者的才疏學淺,對本文內容可能還有理解不到位的地方,若有闡述不合理之處還望留言一塊兒探討。

做者:癲狂俠 連接:https://www.jianshu.com/p/fac642f3c1af 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。

相關文章
相關標籤/搜索