必須先理解的RocketMQ入門手冊,才能再次深刻解讀

RocketMQ入門手冊

RocketMQ是一個分佈式、隊列模型的開源消息中間件,前身是MetaQ,是阿里研發的一個隊列模型的消息中間件,後開源給apache基金會成爲了apache的頂級開源項目,具備高性能、高可靠、高實時、分佈式特色,java

同時,普遍應用於多個領域,包括異步通訊解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通訊、移動應用、手遊、視頻、物聯網、車聯網等。數據庫

具備如下特色:apache

  • 可以保證嚴格的消息順序
  • 提供豐富的消息拉取模式
  • 高效的訂閱者水平擴展能力
  • 實時的消息訂閱機制
  • 億級消息堆積能力

RocketMQ 架構原理分析

RocketMQ 架構

img

NameServer (名稱服務器):服務器

  • 提供輕量級的服務發現和路由。NameServer接受來自Broker羣集的註冊,並提供檢測信號機制以檢查Broker是否還存在
  • 每一個NameServer記錄完整的路由信息(Broker 相關 Topic 等元信息,並給 Producer 提供 Consumer 查找 Broker 信息),提供相應的讀寫服務。

Broker(消息服務器): 消息存儲中心,接收來自 Producer 的消息並存儲, Consumer 從這裏取得消息架構

  • 單個Broker節點與全部的NameServer節點保持長鏈接及心跳,並會定時將Topic信息註冊到NameServer,(其底層通訊是基於Netty實現的)
  • Broker負責消息存儲,以Topic爲維度支持輕量級的隊列,單機能夠支撐上萬隊列規模,支持消息推拉模型。
  • 具備上億級消息堆積能力,同時可嚴格保證消息的有序性

Producer (生產者):併發

  • 負責產生消息,生產者向消息服務器發送由業務應用程序系統生成的消息
  • 生產者支持分佈式部署。 分佈式生產者經過多種負載平衡模式將消息發送到Broker集羣。 發送過程支持快速失敗而且延遲低
  • 三種方式發送消息:同步、異步和單向

Consumer(消費者):異步

  • 負責消費消息,消費者從消息服務器拉取信息並將其輸入用戶應用程序
  • 也支持「推和拉」模型中的分佈式部署。
  • 它還支持集羣使用和消息廣播。 它提供了實時消息訂閱機制,能夠知足大多數消費者的需求。

Broker Server

Broker Server負責消息的存儲和傳遞,消息查詢,HA高可用等,Broker Server幾個主要模塊組成:async

img

Remoting Module(遠程模塊):broker入口,處理來自客戶端的請求分佈式

Client Manager(客戶端管理):管理client(生產者/消費者)並維護消費者的主題訂閱性能

Store Service(存儲服務):提供簡單的API中數據庫中存儲或查詢消息

HA Service(高可用服務):提供master broker和slave broker之間的數據同步功能

Index Service(索引服務):將message創建索引來提供快速的查詢能力

RocketMQ 總體流程

總體流程

  1. 啓動 NameServer,NameServer啓動後進行端口監聽,等待 Broker、Producer、Consumer 連上來,至關於一個路由控制中心
  2. Broker 啓動,跟全部的 Namesrv 保持長鏈接,定時發送心跳包

    • 心跳包中,包含當前 Broker 信息(IP+端口等)以及存儲全部 Topic 信息
    • 註冊成功後,Namesrv 集羣中就有 Topic 跟 Broker 的映射關係
  3. 收發消息前,先建立 Topic 。建立 Topic 時,須要指定該 Topic 要存儲在哪些 Broker上。也能夠在發送消息時自動建立Topic
  4. Producer 發送消息

    • 啓動時,先跟 Namesrv 集羣中的其中一臺創建長鏈接,並從Namesrv 中獲取當前發送的 Topic 存在哪些 Broker 上
    • 而後跟對應的 Broker 創建長鏈接,直接向 Broker 發消息
  5. Consumer 消費消息

    • 跟其中一臺 Namesrv 創建長鏈接,獲取當前訂閱 Topic 存在哪些 Broker 上
    • 而後直接跟 Broker 創建鏈接通道,開始消費消息*RocketMQ的消息領域模型

RocketMQ Message

img

Topic(主題): 表示消息的第一級類型,是最細粒度的訂閱單位(生產者傳遞消息和消費者提取消息標識)

  • 一條消息必須有一個Topic
  • 一個Group能夠訂閱多個Topic的消息
  • Topic通常爲領域範圍,好比交易消息

Tag(標籤): 表示消息的第二級類型,能夠是使用相同的Topic不一樣的Tag來表示同一業務模塊的不一樣任務的消息,好比交易消息又能夠分爲:交易建立消息,交易完成消息等

  • 助於保持代碼整潔和一致
  • 簡化RocketMQ提供的查詢系統

Message(消息體): 消息是要傳遞的信息。 Message中必須包含一個Topic,可選Tag和key-vaule鍵值對

Message Queue(消息隊列): 全部消息隊列都是持久化

  • 一個Topic下能夠有多個Queue
  • Queue的引入使得消息的存儲能夠分佈式集羣化,具備了水平擴展能力

Group(組): 分爲Producer Group(生產者組)和Consumer Group(消費者組),具備相同角色組成Group

  • 原生產者在交易後崩潰,broker能夠聯繫同一輩子產者組的不一樣生產者實例以進行提交或回退交易。
  • 消費者組的消費者實例必須具備徹底相同的主題訂閱

RocketMQ 特性

Message Model(消息模式):

  • Clustering(集羣式):當使用集羣消費模式時,MQ 認爲任意一條消息只須要被集羣內的任意一個消費者處理便可
  • Broadcasting(廣播式):當使用廣播消費模式時,MQ 會將每條消息推送給集羣內全部註冊過的客戶端,保證消息至少被每臺機器消費一次

Message Order(消息順序)

  • 使用DefaultMQPushConsumer時,能夠決定按順序或同時使用消息

    • Orderly:有序地使用消息意味着消息的消費順序與生產者爲每一個消息隊列發送消息的順序相同。( 若是要處理必須強制執行全局順序的狀況,請確保您使用的主題只有一個消息隊列)
    若是指定按順序使用,則消息使用的最大併發度是使用者組訂閱的消息隊列數
    • Concurrently:同時使用消息時,消息使用的最大併發性僅受爲每一個使用方客戶端指定的線程池限制
    在此模式下再也不保證消息順序

Message Types(消息類型)

  • 事務消息
  • 順序消息
  • 延遲消息

RocketMQ單機版安裝

  1. 下載編譯源碼

    # 下載$ 
      > wget wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-source- > 
      # 解壓$
      >unzip rocketmq-all-4.7.0-source-release.zip
      > cd rocketmq-all-4.7.0/
      # 編譯$
      > mvn -Prelease-all -DskipTests clean install -U
      > cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0
  2. 啓動 Name Server

    # 啓動 Name Server 服務
     > nohup sh bin/mqnamesrv &
     # 啓動完成後,查看日誌$
     > tail -f ~/logs/rocketmqlogs/namesrv.log
      The Name Server boot success...
  3. 啓動 Broker

    conf 目錄下,RocketMQ 提供了多種 Broker 的配置文件:

    • broker.conf :單主,異步刷盤。
    • 2m/ :雙主,異步刷盤。
    • 2m-2s-async/ :兩主兩從,異步複製,異步刷盤。
    • 2m-2s-sync/ :兩主兩從,同步複製,異步刷盤。
    • dledger/ :Dledger 集羣,至少三節點
    # 啓動 Broker服務
     > nohup sh bin/mqbroker -n localhost:9876 &
     # 啓動完成後,查看日誌$
     > tail -f ~/logs/rocketmqlogs/broker.log 
      The broker[%s, 172.30.30.233:10911] boot success...

其中,參數:

  • 經過 -c 參數,配置讀取的主 Broker 配置
  • 經過 -n 參數,設置 RocketMQ Namesrv 地址
  1. Send & Receive Messages(消息發送與接收)

    在發送/接收消息以前,咱們須要告知client(生產者/消費者)Name Servers的地址。 RocketMQ提供了多種方法來實現:

    • 在代碼中設置:producer.setNamesrvAddr("ip:port")
    • java屬性配置:rocketmq.namesrv.addr
    • 環境變量配置:NAMESRV_ADDR
    • HTTP Endpoint

爲簡單起見,咱們使用環境變量:NAMESRV_ADDR,以下所示:

# 設置 Name Servers的地址$
 > export NAMESRV_ADDR=localhost:9876
 # 生產消息$
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...
 # 消費消息$ 
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...
各位看官還能夠嗎?喜歡的話,動動手指點個💗,點個關注唄!!謝謝支持!

歡迎關注公衆號【Ccww技術博客】,原創技術文章第一時間推出

相關文章
相關標籤/搜索