Apache RocketMQ是一個分佈式消息和流處理平臺,具備低延遲,高性能和高可靠性,億萬級容量和靈活的可擴展性。它由四部分組成:名稱服務器,代理服務器,生產者和消費者。它們中的每個均可以水平擴展,而不會出現單點故障。如上圖所示。git
名稱服務器集羣github
名稱服務器提供輕量級服務發現和路由。每一個名稱服務器記錄完整的路由信息,提供相應的讀寫服務,支持快速的存儲擴展。shell
代理集羣apache
代理關注的是消息存儲,它經過提供輕量級主題(TOPIC)和隊列(QUEUE)機制來處理消息存儲。他們支持推,拉模型,包含容錯機制(2個副本或3個副本), 可以抵禦強峯值,而且按序積壓千億條消息的的功能。此外,代理還提供容災,豐富的度量統計數據和報警機制,這些都是傳統消息系統所缺乏的。編程
生產者集羣設計模式
生產者支持分佈式部署,分佈式生產者經過多種負載平衡模式向代理集羣發送消息,發送進程支持快速故障和低延遲。bash
消費者集羣服務器
消費者集羣也支持推,拉模式的分佈式部署。它還支持集羣消費和消息廣播。它提供了實時消息訂閱機制,能夠知足大多數消費者的需求,RocketMQ的網站爲感興趣的用戶提供了一個很是簡單的快速入門指南。架構
名稱服務器是一個功能齊全的服務,主要包含兩個功能:併發
如咱們所知,RocketMQ客戶端(生產者/消費者)將從NameServer查詢隊列路由信息,可是客戶端如何找到NameServer地址的呢?
有四種方式向客戶端提供名稱服務器地址列表,以下:
producer.setNamesrvAddr("ip:port")
.rocket.namesrv.addr
.NAMESRV_ADDR
.關於更深刻的介紹客戶端如何找到NameServer地址的,請查看這裏
代理服務器負責消息存儲和傳遞,消息查詢,高可用保證等。
以下圖所示, 代理服務器有如下幾個重要的子模塊:
本節介紹生產就緒,部署解決方案。通常來講,咱們正在部署一個沒有單點故障的彈性RocketMQ集羣。
在開始本節以前,請確保您已經閱讀了快速上手部分,而且熟悉RocketMQ的核心概念和組件。
生產就緒部署
爲了確保集羣在一個實例宕機時仍然可以正常工做,建議使用兩個或多個名稱服務器實例,只要有一個名稱服務器實例處於存活狀態,整個集羣就保持服務狀態。
名稱服務器遵循無共享設計模式,代理服務器將心跳數據發送到全部名稱服務器,生產者和消費者能夠在發送/消費消息時從任何可用的名稱服務器查詢元數據。
代理能夠根據其角色分爲兩類:主代理和從代理。主代理提供RW(讀寫)訪問,而從代理只接收讀訪問。
要在沒有單點故障的狀況下部署高可用RockeMQ集羣,應該部署一系列代理集。一個代理集包含一個主代理和幾個從代理,其中主代理brokerid設置爲0,從代理brokerid設置爲非0便可。一組代理集中都代理有相同的代理名稱(brokerName)。在極端狀況下,在一個代理集中至少須要設置兩個代理。每一個主題駐留在兩個或多個代理中。
部署RocketMQ集羣時,建議使用如下配置:
Broker configuration
Property Name | Default value | Details |
---|---|---|
listenPort | 10911 | listen port for client |
namesrvAddr | null | name server address |
brokerIP1 | InetAddress for network interface | Should be configured if having multiple addresses |
brokerName | null | broker name |
brokerClusterName | DefaultCluster | this broker belongs to which cluster |
brokerId | 0 | broker id, 0 means master, positive integers mean slave |
storePathCommitLog | $HOME/store/commitlog/ | file path for commit log |
storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue |
mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |
deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |
fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLVAE |
flushDiskType | ASYNC_FLUSH | {SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance. |
RocketMQ提供了一個CLI(命令行界面)管理工具,用於查詢,管理和診斷各類問題。
如何得到
管理工具是隨RocketMQ一塊兒提供,你要麼下載一個預構建的二進制版本,要麼本身從源代碼構建,這樣你就擁有它了。
若是您須要源代碼, RocketMQ工具模塊包含其源代碼。
如何使用
管理工具很是容易使用,這裏處於演示的目的,假設爲Linux的環境。在mq安裝目錄下的/bin目錄中,使用bash命令: mqadmin, 就能夠看到如下的幫助菜單:
The most commonly used mqadmin commands are:
updateTopic Update or create topic
deleteTopic Delete topic from broker and NameServer
updateSubGroup Update or create subscription group
deleteSubGroup Delete subscription group from broker
updateBrokerConfig Update broker's config
updateTopicPerm Update topic perm
topicRoute Examine topic route info
topicStatus Examine topic Status info
topicClusterList get cluster info for topic
brokerStatus Fetch broker runtime status data
queryMsgById Query Message by Id
queryMsgByKey Query Message by Key
queryMsgByUniqueKey Query Message by Unique key
queryMsgByOffset Query Message by offset
queryMsgByUniqueKey Query Message by Unique key
printMsg Print Message Detail
sendMsgStatus Send msg to broker
brokerConsumeStats Fetch broker consume stats data
producerConnection Query producer's socket connection and client version
consumerConnection Query consumer's socket connection, client version and subscription
consumerProgress Query consumers's progress, speed
consumerStatus Query consumer's internal data structure
cloneGroupOffset Clone offset from other group
clusterList List all of clusters
topicList Fetch all topic list from name server
updateKvConfig Create or update KV config
deleteKvConfig Delete KV config
wipeWritePerm Wipe write perm of broker in all name server
resetOffsetByTime Reset consumer offset by timestamp(without client restart)
updateOrderConf Create or update or delete order conf
cleanExpiredCQ Clean expired ConsumeQueue on broker.
cleanUnusedTopic Clean unused topic on broker
startMonitoring Start Monitoring
statsAll Topic and Consumer tps stats
syncDocs Synchronize wiki and issue to github.com
allocateMQ Allocate MQ
checkMsgSendRT Check message send response time
clusterRT List All clusters Message Send RT
複製代碼
爲了確保不會丟失任何成功發佈的消息,RocketMQ提供了一種複製模式,經過兩種複製方式: 同步和異步,以得到更強的持久性和高可用性。
主從複製: 同步/異步代理
與許多複製系統同樣,同步代理要等到提交日誌被複制到從服務器後才能確認。相反,異步代理在主服務器上處理消息後當即返回。
如何配置
在conf文件夾下的rocketmq發行版附帶了三個預構建的配置供您參考。
2m-2s-sync
2m-2s-async
2m-noslave
複製代碼
注意: 全部的配置使用異步刷新的方式.
部署
以2M-2S-SYNC的部署爲例,首先,啓動兩個名稱服務器,如快速啓動部分所示: 假設他們的IP爲192.168.0.2和192.168.0.3
開啓代理(假設二進制rocketmq位於/home/rocketmq/dist)
>cd /home/rocketmq/dist/bin
>bash mqbroker -c ../conf/2m-2s-sync/broker-a.properties -n 192.168.0.2:9876,192.168.0.3:9876
>bash mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties -n 192.168.0.2:9876,192.168.0.3:9876
>bash mqbroker -c ../conf/2m-2s-sync/broker-b.properties -n 192.168.0.2:9876,192.168.0.3:9876
>bash mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties -n 192.168.0.2:9876,192.168.0.3:9876
How to verify
Execute the following command to verify according to the CLI section:
> bash mqadmin clusterlist
複製代碼
瞭解了MQ的一些基本模型和概念以後,咱們能夠深刻探討消息傳遞系統設計的一些問題:
生產者將業務應用程序系統生成的消息發送給代理服務器,RocketMQ提供了多種發送模式: 同步,異步和單向傳輸。
生產組
相同角色的生產者被分組在一塊兒。若是一臺生產者實例在處理事務時宕機了,代理能夠聯繫同一輩子產者組的不一樣生產者實例來提交或者回滾事務。
考慮到所提供的生產者在發送消息時足夠強大,每一個生產組只容許一實例,以免沒必要要的生產者實例初始化。
消費者從代理服務器中拉取消並將消息輸入應用程序。從用戶應用的角度來看,提供了兩種類型的消費者:
推送消費者
另外一方面,punsh-consumer封裝了消息拉取,消耗進度和維護內部的其餘工做,爲最終用戶留下一個回調接口來實現,該接口將在消息到達時執行。
拉取消費者
拉消費者積極從代理服務器中拉取消息,一旦一批消息被拉取出來,用戶應用程序就會啓動消費過程。
消費組
與前面提到的生產者組相似,具備徹底相同角色的消費者被分組在一塊兒,並命名爲消費者組。
消費組是一個很好的概念,使得在消息消費方面,實現負載均衡和容錯的目標很是容易。
注意:消費者組的消費實例必須具備徹底相同的主題訂閱.
主題是生產者投遞消息,消費者拉取消息的一個類別。主題的生產者,消費者的關係很是鬆散。具體來講,一個主題能夠有0個,1個或者多個向其發送消息的生產者;相反,生產者能夠發送不一樣主題的消息。從消費者角度來看,一個主題能夠由0個,1個或多個消費者羣體訂閱。一樣,只要消費組的實例保持訂閱一致,用戶組就能夠訂閱一個或多個主題。
消息是要傳遞的信息。消息必須有一個主題,能夠將其解釋爲要郵寄信件的地址。消息還能夠具備可選的標記和額外的鍵值對。例如,您能夠爲消息設置業務ke,並在代理服務器上查找消息,以診斷開發過程當中的問題。
消息隊列
主題被劃分爲一個或多個子主題:"消息隊列"。
標籤
換句話說,標籤子主題爲使用者提供了額外的靈活性。對於標籤,來自同一業務模塊的具備不一樣目的的消息,可能具備相同的主題和不一樣標記。標籤將有助於保持代碼的整潔和一致,並且標籤還能夠幫助RocketMQ提供的查詢系統。
代理
代理是RocketMQ系統的主要組成部分,它接收來自生產者的消息,存儲它們,並準備處理來自消費者的拉取請求。它還存儲與消息相關的元數據,包括消費組,消費進度偏移量和主題/隊列信息。
名稱服務器用做路由信息提供者。生產者/消費者客戶端查找主題以查找相應的代理列表。
當使用DefaultMQPushConsumer時,您能夠決定是有序的或者是併發的消費消息。
按順序消費消息意味着,消息的消費順序與生產者爲每一個消息隊列發送的順序相同,若是您正在處理全局順序是必需的場景,請確保您使用的主題只有一個消息隊列。
注意:若是指定了按順序消費,則消息消費的最大併發性是消費組訂閱的消息隊列數。
當併發消費消息時,消費的最大併發性僅僅受每一個消費者客戶端指定的線程池的限制。
注意:在此模式下,再也不保證消息的順序