背景介紹
項目組使用阿里RocketMQ,對同一個消費組設置不一樣的tag訂閱關係,出現消息丟失的問題,本文從rocketmq源碼研究消息發佈與訂閱原理,並分析致使該問題的緣由。數據結構
官方說明
- 告訴使用者:同一個消費組,必須保持訂閱關係一致
- 爲何?它沒有說!只能從源碼找答案
問題復現
- 啓動消費者1,消費組爲group1,訂閱topicA的消息,tag設置爲tag1 || tag2
- 啓動消費者2,消費組也爲group1,也訂閱topicA的消息,可是tag設置爲tag3
- 啓動生產者,生產者發送含有tag1,tag2,tag3的消息各10條
- 消費者1沒有收到任何消息,消費者2收到部分消息
先上結論
- 同一個消費組中,設置不一樣tag時,後啓動的消費者會覆蓋先啓動的消費者設置的tag
- tag決定了消息過濾的條件,通過服務端和客戶端兩層過濾,最後只有後啓動的消費者才能收到部分消息
原理說明
消息如何保存
CommitLog
- 保存全部topic的原始消息
- CommitLog分爲多個文件,每一個文件默認最大爲1G
- 每條記錄包括:消息長度和消息文本(消息體,屬性,uid等等)
- 因每條消息長度不一致,每一個commitLog的記錄長度也不一致
ConsumerQueue
- 保存某個Topic下某個Queue的索引信息
- 每條記錄包括:消息在commitLog中的offset,消息大小,消息tag的哈希值
- 每條記錄長度固定爲20byte
- producer發送消息後,先保存到commitLog,再異步創建該條消息對應的topic + queue對應的ConsumerQueue索引
- 第三部分的Hash(tag)是服務端過濾消息的重要依據
consumer如何訂閱消息
註冊訂閱信息
- consumer訂閱時,會將訂閱信息註冊到到服務端
- 保存訂閱信息的是Map類,key爲topic,value主要是tag
- subVersion取當前時間。
這裏的key是topic,subVersion版本號,這兩點很關鍵!後面有用到!異步
拉取消息並過濾
- 拉取消息時,首先從服務端獲取訂閱關係,獲得tag的hash集合codeSet
- 而後從ConsumerQueue獲取一條記錄,判斷記錄的hashCode是否在codeSet中,以達到消息過濾的目的,決定是否將該消息發送給consumer
- 總之一句話:tag決定了消息是否發到客戶端
消息過濾
服務端過濾
客戶端過濾
- 服務端過濾存在不許確性,客戶端再次精確過濾
- 客戶度過濾:tag的字符串值作對比。不相等的不返回給消費者
緣由總結
- 同一個consumer group的訂閱關係,保存在RebalanceImpl類的Map中。key爲topic
- 不一樣的消費者啓動後,依次註冊訂閱關係,由於tag不同,致使Map中同一topic的tag被覆蓋。好比:消費者1訂閱tag1,消費者2訂閱tag2。最後map中只保存tag2.
- 過濾的核心是是tag,tag被更新,過濾條件被改變。服務端過濾後只返回tag2的消息
- 客戶端接收消息後,再次過濾。先啓動的消費者1訂閱tagA,可是服務端返回tag2,因此消費者1收不到任何消息。消費者2能收到一半的消息(集羣模式,假設消息平均分配,另一半分給tag2)
源碼分析
訂閱關係數據結構
消費者1啓動時註冊的訂閱關係
消費者2後啓動覆蓋訂閱關係
服務端過濾時取出ConsumerQueue的Hash(tag)
對比消息的Hash(tag)和以前保存的訂閱關係
客戶端過濾