都知道Rocketmq中有ConsumerGroup的概念。在集羣模式下,多臺服務器配置相同的ConsumerGroup,可以使得每次只有一臺服務器消費消息(注意,但不保證只消費一次,存在網絡抖動的狀況)。那麼,筆者就很疑惑,Rocketmq是如何實現這個模式的?如何保證只有一臺服務器消費?java
雖然答案很簡單,但倒是一個很好的帶着問題看源碼的機會。web
從圖中能夠看到,MQ主要投遞消息和拉取消息兩個環節。安全
衆多的架構都是順應時代潮流而來,Rocketmq的結構體系固然也不是阿里所首創的,而是依據AMQP協議而來。Rocketmq中的Producer,Broker,以及Consumer都是依據AMQP中的概念衍生出來的。因此這裏不妨講講AMQP(Advanced Message Queuing Protocal,高級消息隊列協議),便於你們更好的理解技術的發展過程。服務器
paper下載 http://www.amqp.org/specification/0-9-1/amqp-org-download網絡
固然基於這樣一個協議,不僅僅是RocketMq一個閃耀在消息隊列選型中,還有不一樣的消息隊列。架構
https://mp.weixin.qq.com/s/B1D-J_1wpaqj0sxcmaArbQ框架
主要分爲了3大陣營:dom
固然,若是熟悉了AMQP協議,你也能夠選擇自研一個消息隊列ui
https://zhuanlan.zhihu.com/p/28967866this
瞭解了一些背景,來看下RocketMQ中消息的投遞過程。仍是那個具體的問題,RocketMQ是如何選擇一個隊列來投遞的呢?
這裏提一下,RocketMq中全部關於生產者和消費者的代碼都在client包下。打開源碼,能夠看到Procuder下有個selector包,看到這個包是否是感到就是它的感受。
能夠看到selector下的三個類都是實現了MessageQueueSelector,來看下MessageQueueSelector的代碼。
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
public class MessageQueue {
private String topic;
private String brokerName;
private int queueId;
}
複製代碼
看一下哪裏調用了MessageQueueSelector.select(),發現是DefaultMQProducerImpl,那麼能夠確認就是由MessageQueueSelector提供了選擇哪一個隊列。
RocketMq提供了3種不一樣的選擇隊列方式:
細心的同窗確定會問那麼隊列數量是無限大的嗎?這個能夠查閱RocketMq的使用手冊,默認的隊列數量是4 (defaultTopicQueueNums: 4),固然你也能夠選擇本身配置。
同時不知道有沒有同窗找錯地兒,筆者剛開始是找錯地兒了,在TopicPublishInfo中也找到了個selectOneMessageQueue,代碼以下。
public class TopicPublishInfo{
// 不一樣版本,代碼有些不一樣,邏輯相似
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName != null) {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return null;
}
else {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
return this.messageQueueList.get(pos);
}
}
}
複製代碼
查了下調用方發現是MQFaultStrategy,看來是Rocketmq消費失敗時候,會將消息從新投遞到不一樣的隊列,這樣在集羣模式下可以保證分佈到不一樣機器消費。(是否是還有疑惑,爲何能保證到不一樣機器,請往下看)
這裏是比較難理解的一步,首先查閱RocketMQ手冊能夠看到:
RocketMQ 的 Consumer 都是從 Broker 拉消息來消費,可是爲了能作到實時收消息,RocketMQ 使用長輪詢方式,能夠保證消息實時性同 Push 方式一致。返種長輪詢方式相似亍 Web QQ 收収消息機制。請參考如下信息瞭解更多。http://www.ibm.com/developerworks/cn/web/wa-lo-comet/
雖然解釋的很詳細,可是對新手仍是不是很友好。簡單的來講,就是使用長輪詢,客戶端發起請求和服務端先鏈接上,可是若是服務端沒有數據,這是鏈接仍是hold住,當有數據push給客戶端的時候才關閉鏈接。這樣不但保證了消費者不會被上游的消息打垮,也保證了消息的實時性。
那麼還有個問題,Consumer如何從MessageQueue上拉取消息呢?是隨機拉嗎?
不妨來看下MQPullConsumer,DefaultMQPullConsumer就是繼承於它。
public class MQPullConsumer {
// 拉消息,非阻塞
//
// @param mq from which message queue
// @param subExpression 訂閱的tag,只支持"tag1 || tag2 || tag3"
// @param offset 標誌位
// @param maxNums 消費最大數量
PullResult pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
複製代碼
能夠看到MessageQueue是傳進來的,這就比較尷尬了,實在沒法理解是何時決定好從哪一個隊列拉取消息的。幸好有萬能的搜索引擎,
https://zhuanlan.zhihu.com/p/25140744
RocketMq有專門的類AllocateMessageQueueStrategy.class,就藏在Client.Consumer.rebalance包下。
每一次Consumer數量的變動,都會觸發AllocateMessageQueueStrategy。也就是每一次Consumer拉取的隊列都是固定好的。
如今,在回過頭來看看第一張RocketMQ的架構圖,是否是以爲畫的很透徹。