閱讀rocketmq技術內幕、實戰與原理雜記 - 設計

最近正在研究rocketmq,簡單記錄下設計的不一樣數據庫

互聯網系統中Rpc、服務治理、消息中間件基本都是標配,消息中間件能解耦,削峯,高可用並能間接提供達到最終一致性服務器

消息中間件中,消息消費分爲最多一次,至少一次和恰好一次,若是須要實現恰好一次,則系統設計難度增大,系統性能損失增長,權衡利弊,rocket實現的是最少一次,消費端可能會重複接收消息(ACK模式下,ACK消息可能丟失),由消費端冪等消費網絡

爲何不用zk,仍是從實際需求出發,Topic路由信息無需在集羣之間保持強一致性,最終一致便可,從而減小對zk的依賴和性能的損失併發

消息存儲方面,rocket引入文件組,無限循環使用,commitlog文件每一個1G,以第一個偏移值爲文件名,爲了和consumequeue一致,log中還包含了tag,key等信息便於恢復,順序寫,引入內存映射,相同主題的消息被順序存儲在同一文件中,還提供定時清理等防止過分堆積,利用消費隊列文件和索引文件及pagecache等提高讀性能,ConsumeQueue是消息的邏輯隊列,相似數據庫的索引文件,存儲的是指向物理存儲的地址。每一個Topic下的每一個Message Queue都有一個對應的ConsumeQueue文件, 裏面有一部分是存儲了tag對應的hashcode,通過對比,符合要求的消息被從commitlog中讀取出來,消息在消費前,會對比完整的Message Tag字符串,清除hash衝突形成的誤讀負載均衡

消息過濾,基於tag等,在存儲設計上基於hash等方式提高過濾效率,能夠從Broker或者消費端過濾,broker端過濾能夠減小傳遞到消費端的消息,減小網絡損失,消費端過濾能夠由消費者任意定義性能

定時消息,若是要支持任意精度的定時消息消費,必須在消息服務端對消息進行排序,勢必帶來很大的性能損耗,rocketmq設計不支持任意進度的定時消息,只支持特定延遲級別this

客戶端支持Push(被推送)、pull(自主控制messagequeue的遍歷及消息的讀取)兩種模式spa

線程池設計,rocketmq會根據不一樣的任務類型建立不一樣的線程池,若是該類型沒註冊,則由other之類的線程池統一處理線程

Namesrv之間數據能夠不一致,彼此之間互不通訊設計

消息發送端提供容錯機制,這個地方以前我就有疑問,爲何在客戶端或者消費端獲取消息存儲meta信息以後,namesrv發現變化後不會通知他們。。。原來是由meta使用端的容錯機制來保證高可用,下降namesrv的複雜性

 消息的順序性保證,若是要全局一致,必須單一topic,單一輩子產者及消費者,清除一切併發,可行性比較低,性能和吞吐量沒法接受,結合業務,通常是部分順序消息,發送端將同一業務ID的消息發送到同一個Message Queue,在消費過程當中,不併發處理

CommitLog同步,不是通過netty命令的方式,而是直接TCP鏈接,效率更高,鏈接成功後,經過對比master和slave的offset,不斷進行同步

從broker得到的消息,由於是提交到線程池裏並行執行,很難監控和控制執行狀態,RocketMQ定義了一個快照類ProcessQueue來解決

負載均衡或消息分配是在消費者端代碼中完成,Consumer從broker處獲取全局消息,而後本身作負載均衡,只處理分給本身的部分

跟kafka同樣,總的消費者數量不要超過topic的隊列數,不然多餘的消費者收不到消息

Namesrv自己無狀態,其中的Broker,topic等狀態信息不會持久存儲,都是由各個角色按期上報並存儲到內存中

事物消息的實現:發送方向RocketMQ發送「待確認」消息,RocketMQ將收到的「待確認」消息持久化後,向發送方回覆消息已經發送成功,發送方開始執行本地事件邏輯,發送方根據本地事件邏輯想RocketMQ發送二次確認,RocketMQ收到commit狀態則將第一階段消息標記爲可投遞,訂閱方將能收到該消息,收到rollback狀態則刪除第一階段的消息,若是出現異常,服務器在一段時間後未收到確認消息,則服務器將對「待確認」消息發起回查請求,發送方收到回查請求後經過檢查對應消息的本地事件執行結果返回對應的狀態,RocketMQ收到後繼續處理

服務端接受到新請求後,若是隊列沒有新消息,並不急於返回,經過一個循環不斷查看狀態,長輪詢的核心是,broker端hold住客戶端過來的請求一小段時間,在這個時間內有新消息到達,就利用現有的連接當即返回給消息的consumer,長輪詢主動權仍是掌握在消費端手中,即便broker消息大量積壓,也不會主動推送給消費者

 

在同步刷盤過程種,有一個設計,避免了任務提交與任務執行的鎖衝突,因爲避免同步刷盤消費任務與其餘消費生產者提交任務直接的鎖競爭,GroupCommitService提供讀容器與寫容器,這兩個容器每執行完一次任務後,交互,繼續消費任務。

        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

        public synchronized void putRequest(final GroupCommitRequest request) {
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
        }

        private void swapRequests() {
            List<GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        }
相關文章
相關標籤/搜索