消息中間件—RocketMQ消息消費(二)(push模式實現)服務器
消息中間件—RocketMQ消息消費(三)(消息消費重試)網絡
由消息中間件(MQ消息服務器代理)主動地將消息推送給消費者;採用Push方式,能夠儘量實時地將消息發送給消費者進行消費。可是,在消費者的處理消息的能力較弱的時候(好比,消費者端的業務系統處理一條消息的流程比較複雜,其中的調用鏈路比較多致使消費時間比較久。歸納起來地說就是「慢消費問題」),而MQ不斷地向消費者Push消息,消費者端的緩衝區可能會溢出,致使異常;負載均衡
由消費者客戶端主動向消息中間件(MQ消息服務器代理)拉取消息;採用Pull方式,如何設置Pull消息的頻率須要重點去考慮,舉個例子來講,可能1分鐘內連續來了1000條消息,而後2小時內沒有新消息產生(歸納起來講就是「消息延遲與忙等待」)。若是每次Pull的時間間隔比較久,會增長消息的延遲,即消息到達消費者的時間加長,MQ中消息的堆積量變大;若每次Pull的時間間隔較短,可是在一段時間內MQ中並無任何消息能夠消費,那麼會產生不少無效的Pull請求的RPC開銷,影響MQ總體的網絡性能;異步
RocketMQ的消費方式都是基於拉模式拉取消息的,而在這其中有一種長輪詢機制(對普通輪詢的一種優化),來平衡上面Push/Pull模型的各自缺點。基本設計思路是:消費者若是第一次嘗試Pull消息失敗(好比:Broker端沒有能夠消費的消息),並不當即給消費者客戶端返回Response的響應,而是先hold住而且掛起請求(將請求保存至pullRequestTable本地緩存變量中),而後Broker端的後臺獨立線程—PullRequestHoldService會從pullRequestTable本地緩存變量中不斷地去取,具體的作法是查詢待拉取消息的偏移量是否小於消費隊列最大偏移量,若是條件成立則說明有新消息達到Broker端(這裏,在RocketMQ的Broker端會有一個後臺獨立線程—ReputMessageService不停地構建ConsumeQueue/IndexFile數據,同時取出hold住的請求並進行二次處理),則經過從新調用一次業務處理器—PullMessageProcessor的處理請求方法—processRequest()來從新嘗試拉取消息(此處,每隔5S重試一次,默認長輪詢總體的時間設置爲30s)。
RocketMQ消息Pull的長輪詢機制的關鍵在於Broker端的PullRequestHoldService和ReputMessageService兩個後臺線程。性能
從嚴格意義上說,RocketMQ並無實現真正的消息消費的Push模式,而是對Pull模式進行了必定的優化,一方面在Consumer端開啓後臺獨立的線程—PullMessageService不斷地從阻塞隊列—pullRequestQueue中獲取PullRequest請求並經過網絡通訊模塊發送Pull消息的RPC請求給Broker端。另一方面,後臺獨立線程—rebalanceService根據Topic中消息隊列個數和當前消費組內消費者個數進行負載均衡,將產生的對應PullRequest實例放入阻塞隊列—pullRequestQueue中。這裏算是比較典型的生產者-消費者模型,實現了準實時的自動消息拉取。而後,再根據業務反饋是否成功消費來推進消費進度。
在Broker端,PullMessageProcessor業務處理器收到Pull消息的RPC請求後,經過MessageStore實例從commitLog獲取消息。如1.2節內容所述,若是第一次嘗試Pull消息失敗(好比Broker端沒有能夠消費的消息),則經過長輪詢機制先hold住而且掛起該請求,而後經過Broker端的後臺線程PullRequestHoldService從新嘗試和後臺線程ReputMessageService的二次處理。優化
RabbitMQ提供了消息確認機制。消費者在訂閱隊列時,能夠在代碼中手動設置autoAck參數爲false,這時RabbitMQ會等待消費者顯式地回覆確認信號(即爲顯式地調用channel.basicAck(envelope.getDeliveryTag(), false)方法)後才從集羣中的內存(或磁盤)節點上移除消息,從而保證了這條消息不會由於消費失敗而致使丟失。ui
在Kafka中,也能夠採用上面那種的消費後的確認機制,經過在Consumer端設置「enable.auto.commit」屬性爲false後,待業務工程正常處理完消費後,在代碼中手動調用KafkaConsumer實例的commitSync()方法提交(ps:這裏指的是同步阻塞commit消費的偏移量,等待Broker端的返回響應,須要注意Broker端在對commit請求作出響應以前,消費端會處於阻塞狀態,從而限制消息的處理性能和總體吞吐量),以確保消息可以正常被消費。若是在消費過程當中,消費端忽然Crash,這時候消費偏移量沒有commit,等正常恢復後依然還會處理剛剛未commit的消息。this
若是Consumer端由於各類類型異常致使本次消費失敗,爲防止該消息丟失而須要將其從新回發給Broker端保存,保存這種由於異常沒法正常消費而回發給MQ的消息隊列稱之爲重試隊列。RocketMQ會爲每一個消費組都設置一個Topic名稱爲「%RETRY%+consumerGroup」的重試隊列(這裏須要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每一個Topic設置的),用於暫時保存由於各類異常而致使Consumer端沒法消費的消息。考慮到異常恢復起來須要一些時間,會爲重試隊列設置多個重試級別,每一個重試級別都有與之對應的從新投遞延時,重試次數越多投遞延時就越大。RocketMQ對於重試消息的處理是先保存至Topic名稱爲「SCHEDULE_TOPIC_XXXX」的延遲隊列中,後臺定時任務按照對應的時間進行Delay後從新保存至「%RETRY%+consumerGroup」的重試隊列中(具體細節後面會詳細闡述)。spa
因爲有些緣由致使Consumer端長時間的沒法正常消費從Broker端Pull過來的業務消息,爲了確保消息不會被無端的丟棄,那麼超過配置的「最大重試消費次數」後就會移入到這個死信隊列中。在RocketMQ中,SubscriptionGroupConfig配置常量默認地設置了兩個參數,一個是retryQueueNums爲1(重試隊列數量爲1個),另一個是retryMaxTimes爲16(最大重試消費的次數爲16次)。Broker端經過校驗判斷,若是超過了最大重試消費次數則會將消息移至這裏所說的死信隊列。這裏,RocketMQ會爲每一個消費組都設置一個Topic命名爲「%DLQ%+consumerGroup"的死信隊列。通常在實際應用中,移入至死信隊列的消息,須要人工干預處理。
在業務工程中的Consumer端(Push消費模式下),若是消息可以正常消費須要在註冊的消息監聽回調方法中返回CONSUME_SUCCESS的消費狀態,不然由於各種異常消費失敗則返回RECONSUME_LATER的消費狀態。
若是業務工程對消息消費失敗了,那麼則會拋出異常而且返回這裏的RECONSUME_LATER狀態。這裏,在消費消息的服務線程—consumeMessageService中,將封裝好的消息消費任務ConsumeRequest提交至線程池—consumeExecutor異步執行。從消息消費任務ConsumeRequest的run()方法中會執行業務工程中註冊的消息監聽回調方法,並在processConsumeResult方法中根據業務工程返回的狀態(CONSUME_SUCCESS或者RECONSUME_LATER)進行判斷和作對應的處理(下面講的都是在消費通訊模式爲集羣模型下的,廣播模型下的比較簡單就再也不分析了)。
正常狀況下,設置ackIndex的值爲consumeRequest.getMsgs().size() - 1,所以後面的遍歷consumeRequest.getMsgs()消息集合條件不成立,不會調用回發消費失敗消息至Broker端的方法—sendMessageBack(msg, context)。最後,更新消費的偏移量。
異常狀況下,設置ackIndex的值爲-1,這時就會進入到遍歷consumeRequest.getMsgs()消息集合的for循環中,執行回發消息的方法—sendMessageBack(msg, context)。這裏,首先會根據brokerName獲得Broker端的地址信息,而後經過網絡通訊的Remoting模塊發送RPC請求到指定的Broker上,若是上述過程失敗,則建立一條新的消息從新發送給Broker,此時新消息的Topic爲「%RETRY%+ConsumeGroupName」—重試隊列的主題。其中,在MQClientAPIImpl實例的consumerSendMessageBack()方法中封裝了ConsumerSendMsgBackRequestHeader的請求體,隨後完成回發消費失敗消息的RPC通訊請求(業務請求碼爲:CONSUMER_SEND_MSG_BACK)。假若上面的回發消息流程失敗,則會延遲5S後從新在Consumer端進行從新消費。與正常消費的狀況同樣,在最後更新消費的偏移量。
Broker端收到這條Consumer端回發過來的消息後,經過業務請求碼(CONSUMER_SEND_MSG_BACK)匹配業務處理器—SendMessageProcessor來處理。在完成一系列的前置校驗(這裏主要是「消費分組是否存在」、「檢查Broker是否有寫入權限」、「檢查重試隊列數是否大於0」等)後,嘗試獲取重試隊列的TopicConfig對象(若是是第一次沒法獲取到,則調用createTopicInSendMessageBackMethod()方法進行建立)。根據回發過來的消息偏移量嘗試從commitlog日誌文件中查詢消息內容,若不存在則返回異常錯誤。
而後,設置重試隊列的Topic—「%RETRY%+consumerGroup」至MessageExt的擴展屬性「RETRY_TOPIC」中,並對根據延遲級別delayLevel和最大重試消費次數maxReconsumeTimes進行判斷,若是超過最大重試消費次數(默認16次),則會建立死信隊列的TopicConfig對象(用於後面將回發過來的消息移入死信隊列)。在構建完成須要落盤的MessageExtBrokerInner對象後,調用「commitLog.putMessage(msg)」方法作消息持久化。這裏,須要注意的是,在putMessage(msg)的方法裏會使用「SCHEDULE_TOPIC_XXXX」和對應的延遲級別隊列Id分別替換MessageExtBrokerInner對象的Topic和QueueId屬性值,並將原來設置的重試隊列主題(「%RETRY%+consumerGroup」)的Topic和QueueId屬性值作一個備份分別存入擴展屬性properties的「REAL_TOPIC」和「REAL_QID」屬性中。看到這裏也就大體明白了,回發給Broker端的消費失敗的消息並不是直接保存至重試隊列中,而是會先存至Topic爲「SCHEDULE_TOPIC_XXXX」的定時延遲隊列中。
RocketMQ的重試隊列的Topic是「%RETRY%+consumerGroup」,爲啥這裏要保存至Topic是「SCHEDULE_TOPIC_XXXX」的這個延遲隊列中呢?
在源碼中搜索下關鍵字—「SCHEDULE_TOPIC_XXXX」,會發現Broker端還存在着一個後臺服務線程—ScheduleMessageService(經過消息存儲服務—DefaultMessageStore啓動),經過查看源碼能夠知道其中有一個DeliverDelayedMessageTimerTask定時任務線程會根據Topic(「SCHEDULE_TOPIC_XXXX」)與QueueId,先查到邏輯消費隊列ConsumeQueue,而後根據偏移量,找到ConsumeQueue中的內存映射對象,從commitlog日誌中找到消息對象MessageExt,並作一個消息體的轉換(messageTimeup()方法,由定時延遲隊列消息轉化爲重試隊列的消息),再次作持久化落盤,這時候纔會真正的保存至重試隊列中。看到這裏就能夠解釋上面的疑問了,定時延遲隊列只是爲了用於暫存的,而後延遲一段時間再將消息移入至重試隊列中。
每一個Consumer實例在啓動的時候就默認訂閱了該消費組的重試隊列主題,DefaultMQPushConsumerImpl的copySubscription()方法中的相關代碼以下
private void copySubscription() throws MQClientException { //省略其餘代碼... switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING://若是消息消費模式爲集羣模式,還須要爲該消費組對應一個重試主題 final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } //省略其餘代碼... }
所以,這裏也就清楚了,Consumer端會一直訂閱該重試隊列主題的消息,向Broker端發送以下的拉取消息的PullRequest請求,以嘗試從新再次消費重試隊列中積壓的消息。