RocketMQ做爲一款純java、分佈式、隊列模型的開源消息中間件,支持事務消息、順序消息、批量消息、定時消息、消息回溯等。java
由開源社區killme2008維護,開源社區很是活躍。https://github.com/killme2008/Metamorphosisgit
於2012年10月份上線,在淘寶內部被普遍使用。github
Metaq 3.0發佈時,產品名稱改成RocketMQ。基於公司內部開源共建原則,RocketMQ項目只維護核心功能,且去除了全部其餘運行時的依賴,核心功能最簡化。每一個BU的個性化需求都在RocketMQ項目之上進行深度定製。RocketMQ向其餘BU提供的僅僅是jar包,例如要定製一個Broker,那麼只須要依賴rocketmq-broker這個jar包便可,可經過API進行交互,若是定製client,則依賴rocketmq-client這個jar包,對其提供的api進行再封裝。docker
在Metaq1.x/2.x的版本中,分佈式協調採用的是Zookeeper,而RocketMQ本身實現了一個NameServer,更加輕量級,性能更好!數據庫
消息生產者,生產者的做用就是將消息發送到 MQ,生產者自己既能夠產生消息,如讀取文本信息等。也能夠對外提供接口,由外部應用來調用接口,再由生產者將收到的消息發送到 MQ。api
生產者組,簡單來講就是多個發送同一類消息的生產者稱之爲一個生產者組。在這裏能夠不用關心,只要知道有這麼一個概念便可。緩存
消息消費者,簡單來講,消費 MQ 上的消息的應用程序就是消費者,至於消息是否進行邏輯處理,仍是直接存儲到數據庫等取決於業務須要。網絡
消費者組,和生產者相似,消費同一類消息的多個 consumer 實例組成一個消費者組。數據結構
Topic 是一種消息的邏輯分類,好比說你有訂單類的消息,也有庫存類的消息,那麼就須要進行分類,一個是訂單 Topic 存放訂單相關的消息,一個是庫存 Topic 存儲庫存相關的消息。架構
Message 是消息的載體。一個 Message 必須指定 topic,至關於寄信的地址。Message 還有一個可選的 tag 設置,以便消費端能夠基於 tag 進行過濾消息。也能夠添加額外的鍵值對,例如你須要一個業務 key 來查找 broker 上的消息,方便在開發過程當中診斷問題。
標籤能夠被認爲是對 Topic 進一步細化。通常在相同業務模塊中經過引入標籤來標記不一樣用途的消息。
Broker 是 RocketMQ 系統的主要角色,其實就是前面一直說的 MQ。Broker 接收來自生產者的消息,儲存以及爲消費者拉取消息的請求作好準備。
Name Server 爲 producer 和 consumer 提供路由信息。
RocketMQ是一個分佈式消息中間件,並支持事務消息、順序消息、批量消息、定時消息、消息回溯等。它裏面有幾個區別於標準消息中件間的概念,如Group、Topic、Queue等。系統組成則由Producer、Consumer、Broker、NameServer等。
RocketMQ以Topic來管理不一樣應用的消息,對於生產者(producer)而言,發送消息時須要指定消息的Topic,對於消費者(consumer)而言,在啓動後須要訂閱相應的Topic,而後能夠消費相應的消息。Topic是邏輯上的概念,在物理實現上,一個Topic由多個Queue組成,採用多個Queue的好處是能夠將Broker存儲分佈式化,提升系統性能。
NameServer的功能,在RocketMQ的前身是使用ZooKeeper。NameServer用於管理全部Broker節點信息,接收Broker的註冊/註銷請求,此外還記錄了Topic與Broker、Queue的對應關係,Broker主備信息。BrokerId爲0表明是MasterBroker,不然BrokerId大於0的表示爲SlaveBroker,Master和Slave組成一個Broker,具備相同的brokerName。Broker在啓動的時候會去NameServer進行註冊,會維護Broker的存活狀態,Broker每次發送心跳過來的時候都會把Topic信息帶上。NamesrvStartUp爲啓動類、NamesrvController爲控制類、RouteInfoManager存放了Topic隊列信息以及地址列表等一系列重要數據結構並提供了對應的數據變動接口、DefaultRequestProcessor負責處理所broker發過來的全部網絡消息。各NameServer之間是相互獨立且沒有通訊的,經過給Broker的namesrvAddr配置多個NameServer地址,同時向多個NameServer註冊信息來實現NameServer集羣。由於NameServer讀寫壓力比較小,因此穩定性較高。相應的生產者/消費者中的namesrvAddr也是配置多個。
Broker,每一個Broker都會和NameServer創建一個長鏈接保持心跳。一個Topic分佈在多個Broker上,一個Broker能夠配置多個Topic。因爲消息分佈在各個Broker上,一旦某個Broker宕機,則該Broker上的消息讀寫都會受到影響。因此須要HA機制,RocketMQ的實現方式是master/slave,salve定時從master同步數據,若是master宕機,則slave提供消費服務,可是不能寫入消息。一旦某個broker master宕機,生產者和消費者多久才能發現?受限於rocketmq的網絡鏈接機制,默認狀況下,最多須要30秒,但這個時間可由應用設定參數來縮短期。這個時間段內,發往該broker的消息都是失敗的,並且該broker的消息沒法消費,由於此時消費者不知道該broker已經掛掉。消費者獲得master宕機通知後,轉向slave消費,可是slave不能保證master的消息100%都同步過來了,所以會有少許的消息丟失。可是消息最終不會丟的,一旦master恢復,未同步過去的消息會被消費掉。
SendMessageProcessor處理全部發往broker的消息,由BrokerController調用DefaultMessageStore來保存消息(processRequest -> sendMessage/sendBatchMessage -> getMessageStore().putMessage)。消息體由CommitLog記錄,首先會判斷是否爲延遲消息,若是是則會改寫Topic,並保存好真實的Topic信息,而後寫入對應的MappedFile(MappedByteBuffer)。若是是異步刷盤,異步同步Slave則消息到這裏就算是記錄完了,直接返回producer成功。異步刷盤時,只有機器宕機,纔會產生消息丟失,broker掛掉可能會發生,可是機器宕機崩潰是不多發生的,除非忽然斷電。若是是同步刷盤,消息寫入物理文件纔會返回成功,刷盤本質其實就是調用MappedByteBuffer.force。HA是由master/slave實現,這個也分同步仍是異步。而後還有後臺線程異步的把CommitLog文件同步到ConsumeQueue中,ConsumeQueue是CommitLog的索引,它記錄了消息在CommitLog中的位置。Producer對應CommitLog,發送的消息寫入CommitLog,Consumer對應ConsumeQueue,消費對應的ConsumeQueue隊列。
不管CommitLog,仍是ConsumeQueue,都有一個對應的MappedFileQueue,也就是對應的內存映射文件的鏈表,對外提供一個邏輯上的文件。MapedFileQueue包含了不少MapedFile(AllocateMappedFileService負責建立MappedFile),以及MapedFile的真實大小,MapedFile包含了具體的文件信息,包括文件路徑、文件名、文件起始偏移、寫位移、讀位移,刷盤位移等等信息,同時使用了虛擬內存映射來提升IO效率(MappedByteBuffer)。MapedFile的文件名就是消息在此文件的中初始偏移量(文件的起始偏移量),MapedFile鏈表邏輯上是連續的,就是靠這個機制實現。一個PageSize默認爲4k,對應Linux的PageCache緩存大小,一個MapedFile默認最大爲1G(因此一個消息最大也是1G,在MessageStoreConfig中配置),異步刷盤線程默認1s觸發一次,可是要滿4頁(16k)纔會刷盤,或10s作一次強制刷盤(FlushRealTimeService異步CommitLog刷盤,GroupCommitService同步CommitLog刷盤)。讀寫時根據offset定位到鏈表中,對應的MappedFile進行讀寫。經過MappedFile,就很好的解決了大文件隨機讀的性能問題。MappedFile繼承自ReferenceResource,它裏面實現了一個引用計數,獲取和釋放都要增減這個計數,當引用計數爲0的時候就會回調cleanup方法。MappedFile的cleanup實現就是經過反射調用cleaner().clean()以釋放映射內存,cleaner方法是在DirectByteBuffer裏,MappedByteBuffer實現類就爲DirectByteBuffer,但DirectByteBuffer是package的,外面並不能訪問到。
一臺Broker上全部消息(不論是什麼Topic)都是記錄在一個CommitLog上,CommitLog裏面記錄了每條消息的消費狀況,是否被消費,由誰消費(queueid),該消息是否持久化等信息,每條消息的長度是不同的。同步Slave是由一個單獨的線程順序的同步CommitLog文件,所謂的等待同步Slave成功後才返回,實質是等待同步線程下標到了指定下標而已,因此Master和Slave的CommitLog文件內容及順序都是一致的。CommitLog中存儲的消息格式已經指定好該消息對應的topic及存到consumeQueue中對應的topic的那個隊列(queueid),究竟寫入那個consumequeue的那個queueid,這是由客戶端投遞消息的時候指定的,客戶端作的負載均衡,選擇不一樣queueid投遞。通常來講客戶端是輪詢queue投遞消息,但若是要保證消息順序,原理就是客戶端把相關消息投遞到同一個queueid,這樣消費者消費的時候就是順序讀取了。只要消息到了CommitLog,發送的消息也就不會丟,有後臺線程異步的同步到ConsumeQueue,再由Consumer進行消費。ConsumeQueue是消息的邏輯隊列,至關於字典的目錄,用來指定消息在物理文件CommitLog上的位置。也就是說CommitLog只有一個(順序寫),但ConsumeQueue確有多個(隨機讀),ConsumeQueue與Topic對應。
CommitLog只有一個,寫入消息體的時候爲保證消息順序寫入是會加鎖的,加鎖有兩種方式,一種是ReentrantLock,一種是自旋compareAndSet,加鎖的範圍只限定在寫入MappedFile中,刷盤及同步並不在加鎖範圍。
消息寫入CommitLog以後,會有單獨的線程任務(ReputMessageService)每隔1毫秒讀取CommitLog文件,把新的消息信息分別調用CommitLogDispatcherBuildConsumeQueue及CommitLogDispatcherBuildIndex的dispatch方法加入到ConsumeQueue及IndexService中去。IndexService以Message Key構建了索引,能夠經過Key來過濾查詢消息。ConsumeQueuek中的消息都是定長的(20字節),消息數量也是固定的(也就是物理文件是固定大小),物理文件名字和CommitLog同樣都是開始偏移量。一個ConsumeQueue只對應一個Topic,包含了消息在CommitLog的開始位置、大小以及tagsCode,只因此使用tag的HashCode就是爲了保持長度固定,爲保證準確在後面使用tag過濾的時候還會再進行一次字符串比較。消費者消費的時候是先從指定的ConsumeQueue中拉取消息ID以及進行一次簡單的Tag過濾(若是須要的話),而後再一次的讀取CommitLog文件獲取真正的消息體。
若是消息是事務消息且狀態是PREPARED或ROLLBACK的,則不會同步到ConsumeQueue中,見CommitLogDispatcherBuildConsumeQueue.dispatch,只有當消息不是事務消息或事務狀態爲COMMIT時纔會同步到ConsumeQueue。事務消息是基於二階段提交:一階段,向Broker發送一條PREPARED,記錄在CommitLog中並返回消息偏移位置,本地事務須要提供一個RocketMQ的回調(LocalTransactionExecuter),用於回調執行本地事務。二階段,處理完本地事務後,返回本地事務狀態,根據狀態(COMMIT或ROLLBACK)去設置消息,而後添加到CommitLog中,最後同步到ConsumeQueue進行消費(事務消息記錄了兩次CommitLog一次ConsumeQueue)。
若是消息是延遲消息,則消息會先投遞到SCHEDULE_TOPIC_XXXX中(消息內容也同樣是記錄在CommitLog),這個topic有若干隊列, 每個隊列對應了一個延遲level(延遲時間並非精準的),會有一個任務(ScheduleMessageService)去輪詢這些隊列,等時間到了則把消息從新寫入到原來的Topic,而後同步到ConsumeQueue中(延遲消息投遞了兩次,即兩次CommitLog兩次ConsumeQueue)。
PullMessageProcessor處理消費者的請求,由BrokerController調用DefaultMessageStore.getMessage來從指定的Topic下的QueueId隊列的QueueOffset下標開始拉取一批消息。首先根據Topic及QueueId定位到ConsumeQueue,而後根據QueueOffset獲取到MappedFile而且返回指定位置開始的內存映射對象SelectMappedBufferResult。而後開始從指定位置遍歷ConsumeQueue,通過濾Tags後,從CommitLog指定位置獲取消息體,若是有過濾表達式則過濾,經過後把消息加入到結果列表。若是消息體過大,Master剩餘物理內存不夠,或者開啓Slave讀取消息,則會設置讓客戶端從Slave拉取消息。若是須要在發送消費消息前進行什麼處理,能夠註冊ConsumeMessageHook,默認沒有。最後向客戶端寫入消息內容,寫入消息內容有兩種方式:一種是把消息內容讀取出來返回,還有一種是使用Netty的FileRegion領拷貝機制直接把內容從堆外內存中返回,默認爲第一種讀取到堆內存返回,這裏是否是由於節省的大量小消息的複製還不如堆外內存建立的開銷。RocketMQ拉取消息是長輪詢,若是沒有查詢到消息,條件知足的話會掛起請求。