1、概念html
1. 中間件:位於系統之間的服務服務器
2. 消息中間件:消息隊列MQ,用於接收消息、存儲消息、轉發消息的中間件架構
3. Rocket MQ: 分佈式的消息中間件,生產者、消費者、隊列均可以分佈式分佈式
4. 基於Netty開發ide
2、RocketMQ使用spa
1. 在服務器上安裝Rocket MQ.net
2. 啓動rocket mq,即name server,啓動以後監聽端口,等待broker\producer\consumer鏈接code
3. 啓動broker, 設置對應的name server,broker用於收取和存儲消息server
4. 手動/自動建立Topichtm
5. 消費者代碼
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { //聲明並初始化一個consumer //須要一個consumer group名字做爲構造方法的參數,這裏爲consumer1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1"); //一樣也要設置NameServer地址 consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //這裏設置的是一個consumer的消費策略 //CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息 //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)所有消費一遍 //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時之前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //設置consumer所訂閱的Topic和Tag,*表明所有的Tag consumer.subscribe("TopicTest", "*"); //設置一個Listener,主要進行消息的邏輯處理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); //返回消費狀態 //CONSUME_SUCCESS 消費成功 //RECONSUME_LATER 消費失敗,須要稍後從新消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //調用start()方法啓動consumer consumer.start(); System.out.println("Consumer Started."); } }
6. 生產者代碼
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //聲明並初始化一個producer //須要一個producer group名字做爲構造方法的參數,這裏爲producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); //設置NameServer地址,此處應改成實際NameServer地址,多個地址之間用;分隔 //NameServer的地址必須有,可是也能夠經過環境變量的方式設置,不必定非得寫死在代碼裏 producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //調用start()方法啓動一個producer實例 producer.start(); //發送10條消息到Topic爲TopicTest,tag爲TagA,消息內容爲「Hello RocketMQ」拼接上i的值 for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); //調用producer的send()方法發送消息 //這裏調用的是同步的方式,因此會有返回結果 SendResult sendResult = producer.send(msg); //打印返回結果,能夠看到消息發送的狀態以及一些相關信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } //發送完消息以後,調用shutdown()方法關閉producer producer.shutdown(); } }
3、架構和名詞
1. NameServer:名稱服務器,爲Producer和Consumer提供路由信息,用於管理Broker節點信息,記錄Broker與Topic的對應關係
2. ConsumerGroup:消費同一類消息的多個 consumer 實例組成一個消費者組
3. Topic:消息的邏輯分類,物理實現上,一個Topic由多個隊列組成
4. Message:消息,指定topic,有消息內容
5. Tag:標籤,是對Topic的進一步細化,能夠用來過濾
6. Broker:消息服務器,就是MQ,分爲Master和Slave節點,每一個Broker與全部NameServer集羣中全部節點創建鏈接,定時註冊Topic到全部的NameServer
7. Producer與一個NameServer創建長鏈接,按期從NameServer獲取Topic信息,向Broker發送消息
8. Consumer與一個NameServer創建長鏈接,按期從NameServer獲取Topic信息,從Broker消費消息
4、特性
1. 發佈/訂閱,點對點(P2P)
2. 消息優先級:Rocket MQ沒有特地支持消息優先級,但能夠配置優先級不一樣的兩個隊列
3. 消息順序:Rocket MQ嚴格保證消息順序,先進先出
4. 消息過濾:生產端和消費端均可以過濾,各有優缺點
5. 消息持久化:Rocket MQ以文件形式持久化
6. 消息可靠性:避免消息丟失,須要生產者、消費者和MQ隊列都保證
7. 消息延遲:Rocket MQ使用長輪詢pull方式,保證明時
8. 消息堆積:由於須要削峯填谷,須要支持消息堆積,億級別的消息堆積能力
9. 消息重試:消費失敗後,從新再消費一次
10. 每一個消息必須投遞一次
11. 不運行重複的消息,須要業務保證冪等
12. 隊列大小,按期刪除數據
13. 定時消息
14. 事務機制
5、消費者的消費模式
1. 集羣消費:一條消息只會被投遞到一個consumer group下面的一個實例
2. 廣播消費:一條消息會被投遞到一個consumer group下面的全部實例
6、消費者獲取消息的模式
1. 推送模式:能及時消費
2. 拉取模式:能夠主動控制拉取時機
7、Rocket MQ和其餘消息隊列比較(ActiveMQ, RabbitMQ, ZeroMQ, Kafka),爲何選擇Rocket MQ?
1. 嚴格的順序消息
2. 億級消息堆積能力
3. Pull/Push消費模式
4. 歷經屢次天貓雙十一海量消息考驗
參考:
http://www.javashuo.com/article/p-ektqdoib-ke.html
http://www.javashuo.com/article/p-qfeldccx-kg.html
https://blog.csdn.net/tototuzuoquan/article/details/78325192