RocketMq中MessageQueue的分配

都知道Rocketmq中有ConsumerGroup的概念。在集羣模式下,多臺服務器配置相同的ConsumerGroup,可以使得每次只有一臺服務器消費消息(注意,但不保證只消費一次,存在網絡抖動的狀況)。那麼,筆者就很疑惑,Rocketmq是如何實現這個模式的?如何保證只有一臺服務器消費?java

雖然答案很簡單,但倒是一個很好的帶着問題看源碼的機會。web

RocketMq結構

RocketMQ架構示意圖

從圖中能夠看到,MQ主要投遞消息和拉取消息兩個環節。安全

衆多的架構都是順應時代潮流而來,Rocketmq的結構體系固然也不是阿里所首創的,而是依據AMQP協議而來。Rocketmq中的Producer,Broker,以及Consumer都是依據AMQP中的概念衍生出來的。因此這裏不妨講講AMQP(Advanced Message Queuing Protocal,高級消息隊列協議),便於你們更好的理解技術的發展過程。服務器

paper下載 http://www.amqp.org/specification/0-9-1/amqp-org-download網絡

  • Broker: 接收和分發的應用
  • Virtual host:出於多租戶和安全因素,把AMQP的基本組件劃分到一個虛擬分組中。各個租戶之間是網絡隔離的,相似Linux中的namespace概念(可自行Google)
  • Connection:publisher/consumer 和broker之間的TCP鏈接
  • Channel:是相較於Connection更加輕量的鏈接,是Connection上的邏輯鏈接
  • Exchange: 負責將message分發到不一樣的Queue中
  • Queue: 消息最終會落到Queue中,消息由Broker push給Consumer或者由Consumer來pull消息
  • Binding:exchange和queue之間的消息路由策略
AMQP架構示意圖

消息隊列的3大類型

固然基於這樣一個協議,不僅僅是RocketMq一個閃耀在消息隊列選型中,還有不一樣的消息隊列。架構

https://mp.weixin.qq.com/s/B1D-J_1wpaqj0sxcmaArbQ框架

主要分爲了3大陣營:dom

  • 有Broker 重Topic流:kafka,JMS
  • 有Broker 輕Topic流: RocketMQ
  • 無Broker: ZeroMQ

固然,若是熟悉了AMQP協議,你也能夠選擇自研一個消息隊列ui

https://zhuanlan.zhihu.com/p/28967866this

瞭解了一些背景,來看下RocketMQ中消息的投遞過程。仍是那個具體的問題,RocketMQ是如何選擇一個隊列來投遞的呢?

Producer如何投遞消息到不一樣隊列

這裏提一下,RocketMq中全部關於生產者和消費者的代碼都在client包下。打開源碼,能夠看到Procuder下有個selector包,看到這個包是否是感到就是它的感受。

能夠看到selector下的三個類都是實現了MessageQueueSelector,來看下MessageQueueSelector的代碼。

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

public class MessageQueue {
	private String topic;
	private String brokerName;
	private int queueId;
}
複製代碼

看一下哪裏調用了MessageQueueSelector.select(),發現是DefaultMQProducerImpl,那麼能夠確認就是由MessageQueueSelector提供了選擇哪一個隊列。

RocketMq提供了3種不一樣的選擇隊列方式:

  • SelectMessageQueueByHash
  • SelectMessageQueueByMachineRoom
  • SelectMessageQueueByRandom

默認隊列數量

細心的同窗確定會問那麼隊列數量是無限大的嗎?這個能夠查閱RocketMq的使用手冊,默認的隊列數量是4 (defaultTopicQueueNums: 4),固然你也能夠選擇本身配置。

同時不知道有沒有同窗找錯地兒,筆者剛開始是找錯地兒了,在TopicPublishInfo中也找到了個selectOneMessageQueue,代碼以下。

public class TopicPublishInfo{
    // 不一樣版本,代碼有些不一樣,邏輯相似
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName != null) {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }

            return null;
        }
        else {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            return this.messageQueueList.get(pos);
        }
    }
}
複製代碼

查了下調用方發現是MQFaultStrategy,看來是Rocketmq消費失敗時候,會將消息從新投遞到不一樣的隊列,這樣在集羣模式下可以保證分佈到不一樣機器消費。(是否是還有疑惑,爲何能保證到不一樣機器,請往下看)

Consumer如何從消息隊列獲取消息

這裏是比較難理解的一步,首先查閱RocketMQ手冊能夠看到:

RocketMQ 的 Consumer 都是從 Broker 拉消息來消費,可是爲了能作到實時收消息,RocketMQ 使用長輪詢方式,能夠保證消息實時性同 Push 方式一致。返種長輪詢方式相似亍 Web QQ 收収消息機制。請參考如下信息瞭解更多。http://www.ibm.com/developerworks/cn/web/wa-lo-comet/

雖然解釋的很詳細,可是對新手仍是不是很友好。簡單的來講,就是使用長輪詢,客戶端發起請求和服務端先鏈接上,可是若是服務端沒有數據,這是鏈接仍是hold住,當有數據push給客戶端的時候才關閉鏈接。這樣不但保證了消費者不會被上游的消息打垮,也保證了消息的實時性。

那麼還有個問題,Consumer如何從MessageQueue上拉取消息呢?是隨機拉嗎?

不妨來看下MQPullConsumer,DefaultMQPullConsumer就是繼承於它。

public class MQPullConsumer {

    // 拉消息,非阻塞
    // 
    // @param mq from which message queue
    // @param subExpression 訂閱的tag,只支持"tag1 || tag2 || tag3"
    // @param offset 標誌位
    // @param maxNums 消費最大數量
    PullResult pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
複製代碼

能夠看到MessageQueue是傳進來的,這就比較尷尬了,實在沒法理解是何時決定好從哪一個隊列拉取消息的。幸好有萬能的搜索引擎,

https://zhuanlan.zhihu.com/p/25140744

RocketMq有專門的類AllocateMessageQueueStrategy.class,就藏在Client.Consumer.rebalance包下。

  • AllocateMessageQueueAveragely
  • AllocateMessageQueueAveragelyByCircle
  • AllocateMessageQueueByConfig
  • AllocateMessageQueueByMachineRoom
  • AllocateMessageQueueConsistentHash

每一次Consumer數量的變動,都會觸發AllocateMessageQueueStrategy。也就是每一次Consumer拉取的隊列都是固定好的。

如今,在回過頭來看看第一張RocketMQ的架構圖,是否是以爲畫的很透徹。

總結

  1. 任何的框架都有它衍生變化的歷史,瞭解架構變化的歷史,才能更好的理解一個框架
  2. 好好研讀使用手冊,包含了不少架構的細節
  3. 帶着問題去研讀源碼
相關文章
相關標籤/搜索