RocketMQ是一款分佈式、隊列模型的消息中間件。
特徵及實現原理:
特色:
1. 支持嚴格的消息順序;
2. 支持Topic與Queue兩種模式;
3. 支持事物;
4. 億級消息堆積能力;
5. 比較友好的分佈式特性;
6. 同時支持Push與Pull方式消費消息;
分佈式消息系統做爲實現分佈式系統可擴展、可伸縮性的關鍵組件,須要具備高吞吐量、高可用等特色。而談到消息系統的設計,就回避不了兩個問題:
1. 消息的順序問題
2. 消息的重複問題
世界上解決一個計算機問題最簡單的方法:「剛好」不須要解決它!--沈詢
解決這兩個問題的方式剛好是不解決,業務端經過設計就能夠規避掉這些問題。
1. 消息順序:
通常消息是經過輪詢全部隊列來發送的(負載均衡策略),順序消息能夠根據業務,好比說訂單號相同的消息發送到同一個隊列。因此RocketMQ的消息順序由Producer保證消息在一個隊列上,Consumer保證消費同一個隊列。
2. 重複消息
形成消息的重複的根本緣由是:網絡不可達。只要經過網絡交換數據,就沒法避免這個問題。因此解決這個問題的辦法就是不解決。消費端處理消息重複的問題便可。
a) 消費端處理消息的業務邏輯保持冪等性
b) 保證每條消息都有惟一編號且保證消息處理成功與去重表的日誌同時出現
3. 事物消息
RocketMQ將事務拆分紅小事務異步執行的方式來完成,消息發送者先發送消息,再執行本地事務,再確根據本地事物執行結果認消息發送成功或者消息回滾;若是消息發送成功,而消息確認發送失敗,RocketMQ會按期掃描事務,並找發送者進行確認,由發送者肯定是回滾仍是繼續發送,消息進入消息系統後,若是消息消費失敗或者超時,則一直重發確保消息被消費,若是消息真的消費失敗,RocketMQ一樣剛好不解決,從而避免提升系統的複雜度。
物理結構:
1. Name Server。提供topic的路由信息,Producer和Consumer都會經過Name Server回去路由信息到Broker,路由信息數據存儲在內存中,broker會定時發送路由信息給Name Server的每一臺機器,來盡心更新,Name Server是無狀態的,能夠橫向擴展。
2. Broker。消息的中轉者,負責存儲和轉發消息。Broker分爲master和slave,一個master能夠對應多個slave,BrokerId=0表示master,其餘的爲slave,Broker長連到Name Server,定時發送Topic信息到全部Name Server。
3. Producer。Producer與Name Server集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server取Topic路由信息,並向提供Topic 服務的Master創建長鏈接,且定時向Master發送心跳。Producer 徹底無狀態,可集羣部署。
4. Consumer。Consumer與Name Server集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server取Topic 路由信息,並向提供Topic服務的Master、Slave創建長鏈接,且定時向Master、Slave發送心跳。Consumer既能夠從Master訂閱消息,也能夠從Slave訂閱消息,訂閱規則由Broker配置決定。
工做方式:java
Producer向Topic的隊列輪流發送消息,Consumer若是作廣播消費,則一個consumer實例消費這個Topic對應的全部隊列;若是作集羣消費,則多個Consumer實例平均消費這個Topic對應的隊列集合。
部署:
安裝包:https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz
設計文檔:http://alibaba.github.io/RocketMQ-docs/document/design/RocketMQ_design.pdf
配置Java環境:
export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
啓動Name Server: 到bin目錄下 nohup mqnamesrv &
啓動Broker: nohup mqbroker -n "192.168.1.111:9876" &
sh mqadmin topicList -n 192.168.1.111::9876 //查看topic列表
sh mqadmin topicStatus -n 192.168.1.111::9876 -t Topic1 //查看Topic1詳情
sh mqadmin consumerProgress -n 192.168.1.111:9876 //查看消費組
broker配置:
在conf目錄下有三個文件夾,對應broker配置,分別是2m-noslave, 2m-2s-async, 2m-2s-sync,分別對應2 master無slave配置,2 master 2 slave異步複製,2 master 2 slave同步雙寫。
使用默認配置:sh mqbroker -n 「192.168.1.111:9876"
生成模板配置:sh mqbroker -m > broker.p
使用配置啓動:sh mqbroker -c broker.p
使用:
Producer:git
DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("rmq-instance"); producer.setVipChannelEnabled(false); // 必須設爲false不然鏈接broker10909端口 producer.start(); try { for (int i = 0; i < 3; i++) { Message msg = new Message("TopicA-test」, "TagA」, "OrderId" + i, ("Body" + i).getBytes()); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); }
Consumer:github
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setVipChannelEnabled(false); // 必須設爲false不然鏈接broker10909端口 consumer.setInstanceName("rmq-instance"); consumer.subscribe("TopicA-test", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getKeys() + " " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started.");