Java工程師的進階之路 RocketMQ篇(一)

白菜Java自習室 涵蓋核心知識mysql

Java工程師的進階之路 RocketMQ篇(一)
Java工程師的進階之路 RocketMQ篇(二)
sql

1. RocketMQ 簡介

RocketMQ 前身叫作 MetaQ, 在 MeataQ 發佈 3.0 版本的時候更名爲 RocketMQ,其本質上的設計思路和 Kafka 相似,可是和 Kafka 不一樣的是其使用 Java 進行開發,因爲在國內的 Java 受衆羣體遠遠多於 Scala,因此 RocketMQ 是不少以 Java 語言爲主的公司的首選。一樣的 RocketMQ 和 Kafka 都是 Apache 基金會中的頂級項目,他們社區的活躍度都很是高,項目更新迭代也很是快。markdown

2. RocketMQ 架構圖

對於 RocketMQ 的架構圖,在大致上來看和 Kafka 並無太多的差異,可是在不少細節上是有不少差異的,接下來會一一進行講述。架構

3. RocketMQ 名詞解釋

RocketMQ 架構圖中多個 Producer,多個主 Broker,多個從 Broker,每一個 Producer 能夠對應多個 Topic,每一個 Consumer 也能夠消費多個 Topic。ide

Broker 信息會上報至 NameServer,Consumer 會從 NameServer 中拉取 Broker 和 Topic 的信息。post

  • Producer:消息生產者,向 Broker 發送消息的客戶端spa

  • Consumer:消息消費者,從 Broker 讀取消息的客戶端設計

  • Broker:消息中間的處理節點,這裏和 kafka 不一樣,kafka 的 Broker 沒有主從的概念,均可以寫入請求以及備份其餘節點數據,RocketMQ 只有主 Broker 節點才能寫,通常也經過主節點讀,當主節點有故障或者一些其餘特殊狀況纔會使用從節點讀,有點相似- 於 mysql 的主從架構。3d

  • Topic:消息主題,一級消息類型,生產者向其發送消息, 消費者讀取其消息。code

  • Group:分爲 ProducerGroup, ConsumerGroup, 表明某一類的生產者和消費者,通常來講同一個服務能夠做爲 Group, 同一個 Group 通常來講發送和消費的消息都是同樣的。

  • Tag:Kafka 中沒有這個概念,Tag 是屬於二級消息類型,通常來講業務有關聯的可使用同一個 Tag, 好比訂單消息隊列,使用 Topic_Order, Tag 能夠分爲 Tag_食品訂單, Tag_服裝訂單等等。

  • Queue: 在 kafka 中叫 Partition, 每一個 Queue 內部是有序的,在 RocketMQ 中分爲讀和寫兩種隊列,通常來講讀寫隊列數量一致,若是不一致就會出現不少問題。

  • NameServer:Kafka 中使用的是 ZooKeeper 保存 Broker 的地址信息,以及 Broker 的 Leader 的選舉,在 RocketMQ 中並無採用選舉 Broker 的策略,因此採用了無狀態的 NameServer 來存儲,因爲NameServer 是無狀態的,集羣節點之間並不會通訊,因此上傳數據的時候都須要向全部節點進行發送。

不少朋友都在問什麼是無狀態呢?狀態的有無實際上就是數據是否會作存儲,有狀態的話數據會被持久化,無狀態的服務能夠理解就是一個內存服務,NameServer 自己也是一個內存服務,全部數據都存儲在內存中,重啓以後都會丟失。

4. RocketMQ Topic和Queue

在 RocketMQ 中的每一條消息,都有一個 Topic,用來區分不一樣的消息。一個主題通常會有多個消息的訂閱者,當生產者發佈消息到某個主題時,訂閱了這個主題的消費者均可以接收到生產者寫入的新消息。

在 Topic 中有分爲了多個 Queue,這實際上是咱們發送/讀取消息通道的最小單位,咱們發送消息都須要指定某個寫入某個 Queue,拉取消息的時候也須要指定拉取某個 Queue,因此咱們的順序消息能夠基於咱們的 Queue 維度保持隊列有序,若是想作到全局有序那麼須要將 Queue 大小設置爲1,這樣全部的數據都會在Queue 中有序。

在上圖中咱們的 Producer 會經過一些策略進行 Queue 的選擇:

  • 非順序消息:非順序消息通常直接採用輪訓發送的方式進行發送。

  • 順序消息:根據某個 Key 好比咱們常見的訂單Id, 用戶Id,進行 Hash,將同一類數據放在同一個隊列中,保證咱們的順序性。

咱們同一組 Consumer 也會根據一些策略來選 Queue,常見的好比平均分配或者一致性 Hash 分配。 要注意的是當 Consumer 出現下線或者上線的時候,這裏須要作重平衡,也就是 Rebalance,RocketMQ 的重平衡機制以下:

  1. 定時拉取 broker, topic 的最新信息
  2. 每隔 20s 作重平衡
  3. 隨機選取當前 Topic 的一個主 Broker,這裏要注意的是否是每次重平衡全部主 Broker 都會被選中,由於會存在一個 Broker 再多個 Broker 的狀況。
  4. 獲取當前 Broker,當前 ConsumerGroup 的全部機器ID。
  5. 而後進行策略分配。

因爲重平衡是定時作的,因此這裏有可能會出現某個 Queue 同時被兩個 Consumer 消費,因此會出現消息重複投遞

Kafka 的重平衡機制和 RocketMQ 不一樣,Kafka 的重平衡是經過 Consumer 和 Coordinator 聯繫來完成的,當 Coordinator 感知到消費組的變化,會在心跳過程當中發送重平衡的信號,而後由一個ConsumerLeader 進行重平衡選擇,而後再由 Coordinator 將結果通知給全部的消費者。

Queue 讀寫數量不一致怎麼辦?

在 RocketMQ 中 Queue 被分爲讀和寫兩種,在最開始接觸 RocketMQ 的時候一直覺得讀寫隊列數量配置不一致不會出現什麼問題的,好比當消費者機器不少的時候咱們配置不少讀的隊列,可是實際過程當中發現會出現消息沒法消費和根本沒有消息消費的狀況。

  1. 當寫的隊列數量大於讀的隊列的數量,當大於讀隊列這部分ID的寫隊列的數據會沒法消費,由於不會將其分配給消費者。
  2. 當讀的隊列數量大於寫的隊列數量,那麼多的隊列數量就不會有消息被投遞進來。

5. RocketMQ 入門實例

5.1. RocketMQ 生產者

直接定義好一個producer,建立好Message,調用send方法便可。

public class Producer {

    public static void main(String[] args)
            throws MQClientException,
            InterruptedException {
        DefaultMQProducer producer = new
                DefaultMQProducer("ProducerGroupName");
        producer.start();
        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes
                                    (RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
    
}
複製代碼

5.2. RocketMQ 消費者

public class PushConsumer {

    public static void main(String[] args)
            throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere
                (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener
                (new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus
                    consumeMessage(List<MessageExt> msgs,
                                   ConsumeConcurrentlyContext context) {
                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
    
}
複製代碼

Java工程師的進階之路 RocketMQ篇(一)
Java工程師的進階之路 RocketMQ篇(二)

相關文章
相關標籤/搜索