RocketMQ知識整理與總結

一、架構

 

  RocketMQ的master broker與master broker沒有任何消息通信,nameserver之間也一樣沒有消息通訊c++

  MQ歷史     

    由數據結構隊列發展而來   sql

  MQ使用場景

         異步處理
      解耦
      削峯填谷
      數據同步api

二、隊列

  rocketMQ一個主題(topic)包含多個隊列服務器

  

 


三、使用

  生產

    同步(sync)

      默認重試2次總共3次數據結構

      默認等待超時時間爲3s     架構

    異步(async)

      總共重試2次併發

    單向(oneway)

    Message

      topic:主題名稱異步

      tag:消息TAG,用於消息過濾對消息的總體分類,好比 topic爲物流跟蹤軌跡 ,軌跡包含 攬收 出庫 入庫 派送 簽收,能夠分別給這些相同topic不一樣類型的數據打標籤分類解析處理jvm

      keys:Message索引鍵,多個用空格隔開,RocketMQ能夠根據這些key快速檢索到消息對消息關鍵字的提取方便查詢,好比一條消息某個關鍵字是 運單號,以後咱們可使用這個運單號做爲關鍵字進行查詢
      waitStoreMsgOK:消息發送時是否等消息存儲完成後再返async

      delayTimeLevel:消息延遲級別,用於定時消息或消息重

      User property:自定義消息屬性

    批量發送

      單批次消息不能超過maxMessageSize大小(默認4M)

      客戶端instance:若是instance爲默認值DEFAULT的話,RocketMQ會自動將instance設置爲IP+進程ID(建議不要設置,默認生成就好),默認最大4M

      鉤子方法:能夠執行先後通知

    事物消息

      1分鐘回查一次,默認5次

      事物消息單獨一篇   

  消費

        批量消費總數爲32,broker設置

      若是消息消費次數超過maxReconsumeTimes還未成功,則將該消息轉移到一個失敗隊列,等待被刪除

      消息消費超時時間,默認爲15分鐘

      消息最大重試次數,默認爲16次

      consumeConcurrentlyMaxSpan,併發消息消費時處理隊列最大跨度,默認2000,表示若是消息處理隊列中偏移量最大的消息與偏移量最小的消息的跨度超過2000則延遲50毫秒後再拉取消息

      pullInterval=0,推模式下拉取任務間隔時間,默認一次拉取任務完成繼續拉取

      consumeMessageBatchMaxSize:消息併發消費時一次消費消息條數,通俗點說就是每次傳入MessageListtener#consumeMessage中的消息條數

      RocketMQ消息重試是以消費組爲單位,而不是主題,消息重試主題名爲%RETRY%+消費組名。消費者在啓動的時候會自動訂閱該主題,參與該主題的消息隊列負載

      同一個消息隊列只會分配給一個消費者,故若是消費者個數大於消息隊列數量,則有些消費者沒法消費消息。

      若是延遲級別大於0,則會將消息的主題設置爲SCHEDULE_TOPIC_XXXX

      transactionId 事物ID會本身生成

      ConsumeFromWhere

        CONSUME_FROM_FIRST_OFFSET:從頭開始消費

        ONSUME_FROM_TIMESTAMP:從消費者啓動的時間戳對應的消費進度開始消費

        CONSUME_FROM_LAST_OFFSET:從隊列最新偏移量開始消費

        CONSUME_SUCCESS:消費成功

        RECONSUME_LATER:延遲消費,放棄本批次消息消費 相似於continue,若是有重試次數沒有達到最大上限會再次消費

    消息消費模式

      集羣模式:默認模式,主題下的同一條消息只容許被其中一個消費者消費

             消費進度存儲在服務端

      廣播模式:主題下的同一條消息將被集羣內的全部消費者消費一次

           消費進度存儲在消費者本地

    消息傳輸模式

      拉取消息模式:消費端主動發起拉消息請求

            長輪詢模式使得消息拉取能實現準實時

            從服務器拉取消息->放入內存隊列->提交消息處處理線程池

    併發消費

       推送消息模式:RocketMQ消息推模式的實現基於拉模式

      RocketMQ並無真正實現推模式,而是消費者主動向消息服務器拉取消息,RocketMQ推模式是循環向消息服務端發送消息拉取請求

      單獨線程池拉取消息,而後調用監聽api接口

      單獨線程池拉取->內存隊列->消息處理線程池處理->移除客戶端內存隊列消息並更新進度

    順序消息

      消費過程 消息隊列負載->消息拉取->消息消費->消息消費進度存儲。

      支持局部順序消息消費,也就是保證同一個消息隊列上的消息順序消費

      若是要實現某一主題的全局順序消息消費,能夠將該主題的隊列數設置爲1,犧牲高性能和可用性

      順序消息在建立消息隊列拉取任務時須要在Broker服務器鎖定該消息隊列。

      MAX_TIME_CONSUME_CONTINUOUSLY:每次消費任務最大持續時間,默認爲60s,切換線程

      順序消息消費的併發度爲消息隊列。也就是一個消息消費隊列同一時刻只會被一個消費線程池中一個線程消費。

      達到重試次數上限,轉移到死信隊列,繼續後續消息的消費

    定時消息

      消息發送以後並不當即被消費者消費,而是要等到特定的時間以後才能被消費

      不支持任意時間精度定時發送,只支持配置級別的時間默認爲"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",delayLevel=1表示延遲1s,delayLevel=2表示延遲5s,依次類推。

      SCHEDULE_TOPIC_XXXX定時消息主題

    消息過濾

      tag

        tag服務端只是驗證了TAG的hashcode,客戶端再次對消息進行tag值對比過濾

      sql(SQL92表達式)

        (官方示例有bug)表達式沒有想象的好用,建議你們接收到消息本身判斷篩選

      類過濾:定製過濾消息

        消息過濾服務器(不講解)

        consumer->filterserver->broker

    單向消息(One-way)

      其實就是UDP協議的實現

      TCP協議是可靠消息傳輸協議,請求消息都會有相應和校驗,在會話層和傳輸層解決應答

四、填坑方案   

  若是有大量消息積壓

    增長消費者數量

  若是有大量消息積壓而且立刻就到了自動清理的時間

    從新消費導流到新的topic,增大新topic的隊列數量

五、bug   

  netty epoll 4.4.0以前版本沒有實現

六、問題   

  爲何某條消息報異常會阻塞整個隊列消費    

    ProcessQueue中隊列最大偏移量與最小偏離量的間距,不能超過consumeConcurrentlyMaxSpan,不然觸發流控。

    這裏主要的考量是擔憂一條消息堵塞,消息進度沒法向前推動,可能形成大量消息重複消費

七、初始化客戶端註解   

  使用@PostConstruct 由JSR-250提供,在構造函數執行完以後執行,等價於xml配置文件中bean的initMethod

  若是同一個jvm中同時注入生產者和消費者使用bean註解會有異常拋出

八、客戶端支持的驅動程序   

  Java

  Go

  .net

  Php

  c++

  Nodejs

相關文章
相關標籤/搜索