1:特色前端
1:支持集羣模型,強調集羣無單點,負載均衡以及水平擴展能力
2:億級別的消息堆積能力
3:採用零拷貝原理 順序寫盤隨機讀
4:豐富的api
5:底層通訊框架採用netty nio
6: nameserver 代替zookpeer
7:消息失敗重試機制,消息可查詢
靈活可擴展性
RocketMQ 自然支持集羣,其核心四組件(Name Server、Broker、Producer、Consumer)每個均可以在沒有單點故障的狀況下進行水平擴展。java
海量消息堆積能力
RocketMQ 採用零拷貝原理實現超大的消息的堆積能力,聽說單機已能夠支持億級消息堆積,並且在堆積了這麼多消息後依然保持寫入低延遲。nginx
支持順序消息
能夠保證消息消費者按照消息發送的順序對消息進行消費。順序消息分爲全局有序和局部有序,通常推薦使用局部有序,即生產者經過將某一類消息按順序發送至同一個隊列來實現。git
多種消息過濾方式
消息過濾分爲在服務器端過濾和在消費端過濾。服務器端過濾時能夠按照消息消費者的要求作過濾,優勢是減小沒必要要消息傳輸,缺點是增長了消息服務器的負擔,實現相對複雜。消費端過濾則徹底由具體應用自定義實現,這種方式更加靈活,缺點是不少無用的消息會傳輸給消息消費者。github
支持事務消息
RocketMQ 除了支持普通消息,順序消息以外還支持事務消息,這個特性對於分佈式事務來講提供了又一種解決思路。redis
回溯消費
回溯消費是指消費者已經消費成功的消息,因爲業務上需求須要從新消費,RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒,能夠向前回溯,也能夠向後回溯。
算法
下面是一張 RocketMQ 的部署結構圖,裏面涉及了 RocketMQ 核心的四大組件:Name Server、Broker、Producer、Consumer ,每一個組件均可以部署成集羣模式進行水平擴展。sql
同步發送
同步發送指消息發送方發出數據後會在收到接收方發回響應以後才發下一個數據包。通常用於重要通知消息,例如重要通知郵件、營銷短信。shell
異步發送
異步發送指發送方發出數據後,不等接收方發回響應,接着發送下個數據包,通常用於可能鏈路耗時較長而對響應時間敏感的業務場景,例如用戶視頻上傳後通知啓動轉碼服務。數據庫
單向發送
單向發送是指只負責發送消息而不等待服務器迴應且沒有回調函數觸發,適用於某些耗時很是短但對可靠性要求並不高的場景,例如日誌收集。
生產者組
生產者組(Producer Group)是一類 Producer 的集合,這類 Producer 一般發送一類消息而且發送邏輯一致,因此將這些 Producer 分組在一塊兒。從部署結構上看生產者經過 Producer Group 的名字來標記本身是一個集羣。
消費者
消費者(Consumer)負責消費消息,消費者從消息服務器拉取信息並將其輸入用戶應用程序。站在用戶應用的角度消費者有兩種類型:拉取型消費者、推送型消費者。
拉取型消費者
拉取型消費者(Pull Consumer)主動從消息服務器拉取信息,只要批量拉取到消息,用戶應用就會啓動消費過程,因此 Pull 稱爲主動消費型。
推送型消費者
推送型消費者(Push Consumer)封裝了消息的拉取、消費進度和其餘的內部維護工做,將消息到達時執行的回調接口留給用戶應用程序來實現。因此 Push 稱爲被動消費類型,但從實現上看仍是從消息服務器中拉取消息,不一樣於 Pull 的是 Push 首先要註冊消費監聽器,當監聽器處觸發後纔開始消費消息。
消費者組
消費者組(Consumer Group)一類 Consumer 的集合名稱,這類 Consumer 一般消費同一類消息而且消費邏輯一致,因此將這些 Consumer 分組在一塊兒。消費者組與生產者組相似,都是將相同角色的分組在一塊兒並命名,分組是個很精妙的概念設計,RocketMQ 正是經過這種分組機制,實現了自然的消息負載均衡。消費消息時經過 Consumer Group 實現了將消息分發到多個消費者服務器實例,好比某個 Topic 有9條消息,其中一個 Consumer Group 有3個實例(3個進程或3臺機器),那麼每一個實例將均攤3條消息,這也意味着咱們能夠很方便的經過加機器來實現水平擴展。
消息服務器
消息服務器(Broker)是消息存儲中心,主要做用是接收來自 Producer 的消息並存儲, Consumer 從這裏取得消息。它還存儲與消息相關的元數據,包括用戶組、消費進度偏移量、隊列信息等。從部署結構圖中能夠看出 Broker 有 Master 和 Slave 兩種類型,Master 既能夠寫又能夠讀,Slave 不能夠寫只能夠讀。從物理結構上看 Broker 的集羣部署方式有四種:單 Master 、多 Master 、多 Master 多 Slave(同步刷盤)、多 Master多 Slave(異步刷盤)。
單 Master
這種方式一旦 Broker 重啓或宕機會致使整個服務不可用,這種方式風險較大,因此顯然不建議線上環境使用。
多 Master
全部消息服務器都是 Master ,沒有 Slave 。這種方式優勢是配置簡單,單個 Master 宕機或重啓維護對應用無影響。缺點是單臺機器宕機期間,該機器上未被消費的消息在機器恢復以前不可訂閱,消息實時性會受影響。
多 Master 多 Slave(異步複製)
每一個 Master 配置一個 Slave,因此有多對 Master-Slave,消息採用異步複製方式,主備之間有毫秒級消息延遲。這種方式優勢是消息丟失的很是少,且消息實時性不會受影響,Master 宕機後消費者能夠繼續從 Slave 消費,中間的過程對用戶應用程序透明,不須要人工干預,性能同多 Master 方式幾乎同樣。缺點是 Master 宕機時在磁盤損壞狀況下會丟失極少許消息。
多 Master 多 Slave(同步雙寫)
每一個 Master 配置一個 Slave,因此有多對 Master-Slave ,消息採用同步雙寫方式,主備都寫成功才返回成功。這種方式優勢是數據與服務都沒有單點問題,Master 宕機時消息無延遲,服務與數據的可用性很是高。缺點是性能相對異步複製方式略低,發送消息的延遲會略高。
名稱服務器
名稱服務器(NameServer)用來保存 Broker 相關元信息並給 Producer 和 Consumer 查找 Broker 信息。NameServer 被設計成幾乎無狀態的,能夠橫向擴展,節點之間相互之間無通訊,經過部署多臺機器來標記本身是一個僞集羣。每一個 Broker 在啓動的時候會到 NameServer 註冊,Producer 在發送消息前會根據 Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。因此從功能上看應該是和 ZooKeeper 差很少,聽說 RocketMQ 的早期版本確實是使用的 ZooKeeper ,後來改成了本身實現的 NameServer 。
消息
消息(Message)就是要傳輸的信息。一條消息必須有一個主題(Topic),主題能夠看作是你的信件要郵寄的地址。一條消息也能夠擁有一個可選的標籤(Tag)和額處的鍵值對,它們能夠用於設置一個業務 key 並在 Broker 上查找此消息以便在開發期間查找問題。
主題
主題(Topic)能夠看作消息的規類,它是消息的第一級類型。好比一個電商系統能夠分爲:交易消息、物流消息等,一條消息必須有一個 Topic 。Topic 與生產者和消費者的關係很是鬆散,一個 Topic 能夠有0個、1個、多個生產者向其發送消息,一個生產者也能夠同時向不一樣的 Topic 發送消息。一個 Topic 也能夠被 0個、1個、多個消費者訂閱。
標籤
標籤(Tag)能夠看做子主題,它是消息的第二級類型,用於爲用戶提供額外的靈活性。使用標籤,同一業務模塊不一樣目的的消息就能夠用相同 Topic 而不一樣的 Tag 來標識。好比交易消息又能夠分爲:交易建立消息、交易完成消息等,一條消息能夠沒有 Tag 。標籤有助於保持您的代碼乾淨和連貫,而且還能夠爲 RocketMQ 提供的查詢系統提供幫助。
消息隊列
消息隊列(Message Queue),主題被劃分爲一個或多個子主題,即消息隊列。一個 Topic 下能夠設置多個消息隊列,發送消息時執行該消息的 Topic ,RocketMQ 會輪詢該 Topic 下的全部隊列將消息發出去。下圖 Broker 內部消息狀況:
Broker 內部消息
消息消費模式
消息消費模式有兩種:集羣消費(Clustering)和廣播消費(Broadcasting)。默認狀況下就是集羣消費,該模式下一個消費者集羣共同消費一個主題的多個隊列,一個隊列只會被一個消費者消費,若是某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費。而廣播消費消息會發給消費者組中的每個消費者進行消費。
消息順序
消息順序(Message Order)有兩種:順序消費(Orderly)和並行消費(Concurrently)。順序消費表示消息消費的順序同生產者爲每一個消息隊列發送的順序一致,因此若是正在處理全局順序是強制性的場景,須要確保使用的主題只有一個消息隊列。並行消費再也不保證消息順序,消費的最大並行數量受每一個消費者客戶端指定的線程池限制。
工程實例
3:
調整
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
</plugin> 的位置
按照官方文檔編譯軟件包生成apache-rocketmq.tar.gz
4:在vi/etc/hosts Hosts添加信息
192.168.11.128 rocketmq-nameserver1
192.168.11.128 rocketmq-master1
上傳文件
建立存儲路徑
配置文件
修改日誌配置文
修改啓動腳本參
啓動NameServer
啓動BrokerServer A
5:RocketMQ-Console
進入rocketmq-externals項目GitHub地址,以下圖,可看到RocketMQ項目的諸多擴展項目,其中就包含咱們須要下載的rocketmq-console。
進入項目文件夾並修改配置文件
將項目打成jar包,並運行jar文件。
$ mvn clean package -Dmaven.test.skip=true $ java -jar target/rocketmq-console-ng-1.0.0.jar #若是配置文件沒有填寫Name Server $ java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr='10.0.74.198:9876;10.0.74.199:9876'
啓動成功後,訪問地址http://localhost:8080/rocketmq, 便可進入管理後臺操做。
6:引入依賴包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
product 端代碼
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
producer.start();
for(int i = 0 ; i <5; i ++)
{
// 1. 建立消息
Message
message = new Message("test_quick_topic", // 主題
"TagA", // 標籤
"key" + i, // 用戶自定義的key ,惟一的標識
("Hello RocketMQ" + i).getBytes()); // 消息內容實體(byte[])
SendResult sr = producer.send(message);
System.err.println("消息發出: " + sr);
}
producer.shutdown();
}}
7:consumer端代碼
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("test_quick_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try {
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
// if(keys.equals("key1")) {
// System.err.println("消息消費失敗..");
// int a = 1/0;
// }
String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
} catch (Exception e) {
e.printStackTrace();
// int recousumeTimes = me.getReconsumeTimes();
// System.err.println("recousumeTimes: " + recousumeTimes);
// if(recousumeTimes == 3) {
// // 記錄日誌....
// // 作補償處理
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// }
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.err.println("consumer start...");
}
}
亦能夠這樣寫
consumer.registerMessageListener(new Listener());
class Listener implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
//if(tags.equals("TagB")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
8:集羣類型
單節點
主從
雙主
雙主雙從
多主多從
主節點消息收發,同步到從節點,主節點掛了,從節點能夠保證消息不丟失
投遞一條消息後,關閉主節點,故障演練,數據一致性可否保證
從節點能夠繼續提供消費者繼續消費,不能接收消息
主節點從新上線後進行消費進度的offset同步
9:rocketmq服務關閉
關閉namesrv服務:sh bin/mqshutdown namesrv
關閉broker服務 :sh bin/mqshutdown broker
10:rocketmq生產者核心參數講解//執行 jps 查看進程 > jps 25913 NamesrvStartup
producerGroup*
配置說明:生產組的名稱,一類Producer的標識
createTopicKey
配置說明:發送消息的時候,若是沒有找到topic,若想自動建立該topic,須要一個key topic,這個值便是key topic的值
defaultTopicQueueNums
配置說明:自動建立topic的話,默認queue數量是多少
默認值:4
sendMsgTimeout
配置說明:默認的發送超時時間 3000
默認值:單位毫秒
若發送的時候不顯示指定timeout,則使用此設置的值做爲超時時間。
對於異步發送,超時後會進入回調的onException,對於同步發送,超時則會獲得一個RemotingTimeoutException。
compressMsgBodyOverHowmuch
配置說明:消息body須要壓縮的閾值
默認值:1024 * 4,4K
retryTimesWhenSendFailed
配置說明:同步發送失敗的話,rocketmq內部重試多少次,默認值:2
retryTimesWhenSendAsyncFailed
配置說明:異步發送失敗的話,rocketmq內部重試多少次,默認值:2
maxMessageSize
配置說明:客戶端驗證,容許發送的最大消息體大小
默認值:1024 * 1024 * 4,4M
若消息體大小超過此,會獲得一個響應碼13(MESSAGE_ILLEGAL)的MQClientException異常
retryAnotherBrokerWhenNotStoreOK
product和consumer在啓動的時候會經過nameserver 拉取元數據信息
因此發送大於配置的消息體的時候實際上是在生產端就會被拒絕掉的
配置說明:發送的結果若是不是SEND_OK狀態,是否看成失敗處理而嘗試重發
默認值:false
發送結果總共有4鍾:
SEND_OK, //狀態成功,不管同步仍是存儲
FLUSH_DISK_TIMEOUT, // broker刷盤策略爲同步刷盤(SYNC_FLUSH)的話時候,等待刷盤的時候超時
FLUSH_SLAVE_TIMEOUT, // master role採起同步複製策略(SYNC_MASTER)的時候,消息嘗試同步到slave超時
SLAVE_NOT_AVAILABLE, //slave不可用
注:從源碼上看,此配置項只對同步發送有效,異步、oneway(因爲沒法獲取結果,確定無效)均無效
11:master slave 主從同步機制
主要同步的是元數據和消息兩大實體
元數據採用的是定時任務同步底層採用netty技術實現
消息commitlog主要是採用實時同步底層採用的時候socket
12:延遲消息
RocketMQ 支持發送延遲消息,但不支持任意時間的延遲消息的設置,
僅支持內置預設值的延遲時間間隔的延遲消息。
預設值的延遲時間間隔爲:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
在消息建立的時候,調用 setDelayTimeLevel(int level) 方法設置延遲時間。
broker在接收到延遲消息的時候會把對應延遲級別的消息先存儲到對應的延遲隊列中,
等延遲消息時間到達時,會把消息從新存儲到對應的topic的queue裏面。
(1)延遲消息正常提交給 CommitLog 保存
(2)由於是延遲消息,因此變動爲延時隊列指定的 Topic 和 queueId,這樣就轉換爲 ConsumerQueue(Scheduler),從而不會像 ConsumerQueue(Normal)被正常消費
(3)延時隊列調度器,輪詢查看相應的隊列中消息是否到了要執行的時間
(4)到了執行時間的消息,恢復原來消息的 topic 和 queueId,發給 broker 就變爲 ConsumerQueue(nornal)。這樣就能正常消費了
使用了 Level 的方式,不一樣時間放進不一樣 queue,這樣就避免了排序問題,成爲了一個 O(1) 的隊列插入
13:發送給指定的隊列
SendResult sr = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer queueNumber = (Integer)arg;
return mqs.get(queueNumber);
}
}, 2);
System.err.println(sr);
14:consumer核心參數配置
consumeFromWhere*
配置說明:啓動消費點策略
默認值:ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
可選值有三個:
CONSUME_FROM_LAST_OFFSET //隊列尾消費
CONSUME_FROM_FIRST_OFFSET //隊列頭消費
CONSUME_FROM_TIMESTAMP //按照日期選擇某個位置消費
注:此策略只生效於新在線測consumer group,若是是老的已存在的consumer group,broker已經上報過了 offset
都降按照已經持久化的consume offset進行消費
consumeTimestamp:
配置說明:CONSUME_FROM_LAST_OFFSET的時候使用,從哪一個時間點開始消費
默認值:半小時前
格式爲yyyyMMddhhmmss 如 20131223171201
allocateMessageQueueStrategy*
配置說明:負載均衡策略算法
默認值:AllocateMessageQueueAveragely(取模平均分配)
這個算法能夠自行擴展以使用自定義的算法,目前內置的有如下算法可使用
AllocateMessageQueueAveragely //取模平均
AllocateMessageQueueAveragelyByCircle //環形平均
AllocateMessageQueueByConfig // 按照配置,傳入聽死的messageQueueList
AllocateMessageQueueByMachineRoom //按機房,從源碼上看,必須和阿里的某些broker命名一致才行
AllocateMessageQueueConsistentHash //一致性哈希算法,本人於4.1提交的特性。用於解決「驚羣效應」。
須要自行擴展的算法的,須要實現org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueStrategy
subscription
配置說明:訂閱關係(topic->sub expression)
messageListener
配置說明:消息處理監聽器(回調)
默認值:null
不建議設置,註冊監聽的時候應調用registerMessageListener
offsetStore
配置說明:消息消費進度存儲器
默認值:null
不建議設置,offsetStore 有兩個策略:LocalFileOffsetStore 和 RemoteBrokerOffsetStore。
若沒有顯示設置的狀況下,廣播模式將使用LocalFileOffsetStore,集羣模式將使用RemoteBrokerOffsetStore,不建議修改。
consumeThreadMin*
配置說明:消費線程池的core size
默認值:20
PushConsumer會內置一個消費線程池,這個配置控制此線程池的core size
consumeThreadMax*
配置說明:消費線程池的max size
默認值:64
PushConsumer會內置一個消費線程池,這個配置控制此線程池的max size
messageModel*
配置說明:消費模式
默認值:MessageModel.CLUSTERING
可選值有兩個:
CLUSTERING //集羣消費模式
BROADCASTING //廣播消費模式
consumeConcurrentlyMaxSpan
配置說明:併發消費下,單條consume queue隊列容許的最大offset跨度,達到則觸發流控
默認值:2000
注:只對併發消費(ConsumeMessageConcurrentlyService)生效
每次發起pull請求到broker,客戶端須要指定一個最大batch size,表示此次拉取消息最多批量拉取多少條。
consumeMessageBatchMaxSize
配置說明:批量消費的最大消息條數
默認值:1
consumeTimeout
配置說明:消費的最長超時時間
默認值:15,單位分鐘
若是消費超時,RocketMQ會等同於消費失敗來處理
consumerGroup*
配置說明:消費組的名稱,用於標識一類消費者,
用於把多個consumer組織到一塊兒,消費某一類型的消費達到自然的負載均衡
15:RocketMQ有兩種消費模式
RocketMQ有兩種消費模式:BROADCASTING廣播模式,CLUSTERING集羣模式,默認的是 集羣消費模式。
廣播消費指的是:一條消息被多個consumer消費,即便這些consumer屬於同一個ConsumerGroup,消息也會被ConsumerGroup中的每一個Consumer都消費一次
集羣消費模式:一個ConsumerGroup中的Consumer實例平均分攤消費消息。例如某個Topic有9條消息,
其中一個ConsumerGroup有3個實例(多是3個進程,或者3臺機器),那麼每一個實例只消費其中部分,消費完的消息不能被其餘實例消費。
16:
offset是消息消費進度的核心,指的是某個topic下的某一條消息在messagequeque中的位置
經過offset能夠定位到這條消息
offset的存儲能夠分爲遠程文件存儲和本地文件存儲
默認的集羣模式採用遠程文件存儲 本質上是多個consumer消費某一個主題,這種狀況須要broker控制offset,
使用rmotebrokeroffsetstore
廣播模式採用本地文件存儲,每一個consumer相互獨立沒任何干擾,因此能夠把offset存儲在本地
17:MQ中Pull和Push的兩種消費方式
對於任何一款消息中間件而言,消費者客戶端通常有兩種方式從消息中間件獲取消息並消費:
(1)Push方式:由消息中間件(MQ消息服務器代理)主動地將消息推送給消費者;採用Push方式,
能夠儘量實時地將消息發送給消費者進行消費。可是,在消費者的處理消息的能力較弱的時候(好比,消費者端的業務系統處理一條消息的流程比較複雜,
其中的調用鏈路比較多致使消費時間比較久。歸納起來地說就是「慢消費問題」),
而MQ不斷地向消費者Push消息,消費者端的緩衝區可能會溢出,致使異常;
(2)Pull方式:由消費者客戶端主動向消息中間件(MQ消息服務器代理)拉取消息;採用Pull方式,
如何設置Pull消息的頻率須要重點去考慮,舉個例子來講,可能1分鐘內連續來了1000條消息,而後2小時內沒有新消息產生(歸納起來講就是「消息延遲與忙等待」)。
若是每次Pull的時間間隔比較久,會增長消息的延遲,即消息到達消費者的時間加長,MQ中消息的堆積量變大;若每次Pull的時間間隔較短,可是在一段時間內MQ中並無任何消息能夠消費,那麼會產生不少無效的Pull請求的RPC開銷,影響MQ總體的網絡性能;
1.2 RocketMQ消息消費的長輪詢機制
思考題:
上面簡要說明了Push和Pull兩種消息消費方式的概念和各自特色。若是長時間沒有消息,而消費者端又不停的發送Pull請求不就會致使RocketMQ中Broker端負載很高嗎?那麼在RocketMQ中如何解決以作到高效的消息消費呢?
經過研究源碼可知,RocketMQ的消費方式都是基於拉模式拉取消息的,而在這其中有一種長輪詢機制(對普通輪詢的一種優化),來平衡上面Push/Pull模型的各自缺點。基本設計思路是:消費者若是第一次嘗試Pull消息失敗(好比:Broker端沒有能夠消費的消息),
並不當即給消費者客戶端返回Response的響應,而是先hold住而且掛起請求(將請求保存至pullRequestTable本地緩存變量中),而後Broker端的後臺獨立線程—PullRequestHoldService會從pullRequestTable本地緩存變量中不斷地去取,
具體的作法是查詢待拉取消息的偏移量是否小於消費隊列最大偏移量,若是條件成立則說明有新消息達到Broker端(這裏,在RocketMQ的Broker端會有一個後臺獨立線程—ReputMessageService不停地構建ConsumeQueue/IndexFile數據,同時取出hold住的請求並進行二次處理),則經過從新調用一次業務處理器—PullMessageProcessor的處理請求方法—processRequest()來從新嘗試拉取消息(此處,每隔5S重試一次,默認長輪詢總體的時間設置爲30s)。
RocketMQ消息Pull的長輪詢機制的關鍵在於Broker端的PullRequestHoldService和ReputMessageService兩個後臺線程。對於RocketMQ的長輪詢(LongPolling)消費模式後面會專門詳細介紹。
18:消息存儲
RocketMQ的消息存儲是由consume queue和commit log配合完成的。
一、Consume Queue
consume queue是消息的邏輯隊列,至關於字典的目錄,用來指定消息在物理文件commit log上的位置。
咱們能夠在配置中指定consumequeue與commitlog存儲的目錄
每一個topic下的每一個queue都有一個對應的consumequeue文件,好比:
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
1.根據topic和queueId來組織文件,圖中TopicA有兩個隊列0,1,那麼TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另外一個ConsumeQueue。
Consume Queue中存儲單元是一個20字節定長的二進制數據,順序寫順序讀
consumequeue文件存儲單元格式
CommitLog Offset是指這條消息在Commit Log文件中的實際偏移量
Size存儲中消息的大小
Message Tag HashCode存儲消息的Tag的哈希值:主要用於訂閱時消息過濾(訂閱時若是指定了Tag,會根據HashCode來快速查找到訂閱的消息)
二、Commit Log
CommitLog:消息存放的物理文件,每臺broker上的commitlog被本機全部的queue共享,不作任何區分。
文件的默認位置以下,仍然可經過配置文件修改:
${user.home} \store\${commitlog}\${fileName}
CommitLog的消息存儲單元長度不固定,文件順序寫,隨機讀。消息的存儲結構以下表所示,按照編號順序以及編號對應的內容依次存儲。
三、消息的索引文件
若是一個消息包含key值的話,會使用IndexFile存儲消息索引,文件的內容結構如圖:
消息索引
索引文件主要用於根據key來查詢消息的
19:異步複製,同步雙寫
異步複製和同步雙寫主要是主和從的關係。消息須要實時消費的,就須要採用主從模式部署
異步複製:好比這裏有一主一從,咱們發送一條消息到主節點以後,這樣消息就算從producer端發送成功了,而後經過異步複製的方法將數據複製到從節點
同步雙寫:好比這裏有一主一從,咱們發送一條消息到主節點以後,這樣消息就並不算從producer端發送成功了,須要經過同步雙寫的方法將數據同步到從節點後, 纔算數據發送成功。
同步刷盤:在消息到達MQ後,RocketMQ須要將數據持久化,同步刷盤是指數據到達內存以後,必須刷到commitlog日誌以後纔算成功,而後返回producer數據已經發送成功。
異步刷盤:,同步刷盤是指數據到達內存以後,返回producer說數據已經發送成功。,而後再寫入commitlog日誌。
commitlog
commitlog就是來存儲全部的元信息,包含消息體,相似於Mysql、Oracle的redolog,因此主要有CommitLog在,Consume Queue即便數據丟失,仍然能夠恢復出來。
consumequeue:記錄數據的位置,以便Consume快速經過consumequeue找到commitlog中的數據
20:RocketMQ 高可用機制
master slave 配合,master 支持讀、寫,slave 只讀,producer 只能和 master 鏈接寫入消息,consumer 能夠鏈接 master 和 slave。
consumer 高可用
當 master 不可用或者繁忙時,consumer 會被自動切換到 slave 讀。因此,即便 master 出現故障,consumer 仍然能夠從 slave 讀消息,不受影響。
producer 高可用
建立 topic 時,把 message queue 建立在多個 broker 組上(brokerName 同樣,brokerId 不一樣),當一個 broker 組的 master 不可用後,其餘組的 master 仍然能夠用,producer 能夠繼續發消息。
21:namesrv存在乎義
是整個集羣的狀態服務器 nameserver部署相互獨立 zookpeer太笨重了 因此才使用nameserver來管理狀態
在RocketMQ網絡部署圖中,broker至關於服務端,而Producer、Consumer都是至關於其客戶端,若是broker固定死永遠不變,那麼namesrv存在就沒有任何同樣的,可是因爲服務端自動伸縮、故障以及升級等,服務端會變更,所以namesrv就有存在的意義了。
下面簡單說明:
所以須要一個相似namesrv的東西存在,通常存在兩種機制:客戶端發現機制和服務端發現機制。
當發出請求服務時,客戶端經過註冊中心服務知道全部的服務實例。客戶端接着使用負載均衡算法選擇可用的服務實例中的一個並進行發送。
發出請求服務時,客戶端經過請求負載平衡器,負載均衡器經過註冊中心服務知道全部的服務實例。負載均衡器接着使用負載均衡算法選擇可用的服務實例中的一個並進行發送。
備註: Nginx HTTP服務器和反向代理服務器就是這種。
客戶端發現機制:客戶端有全部可用的服務實例,能夠靈活方便的特定應用進行特定的負載均衡決策。
服務端發現機制:客戶端只須要給負載均衡器發請求便可,客戶端屏蔽掉了一些細節。
22:雙十一抗壓
前端dns解析,軟硬負載均衡設施進行分流,限流
lvs nginx haproxy負載均衡
openresty 防刷限流
緩存按照業務維度拆分
微服務流控
Guava RateLimiter,jdk Semaphore ,Netflix Hystrix
微服務熔斷 降級 兜底
微服務接口的冪等性保證
數據庫分庫分表策略
冷熱數據讀寫分離
23:
24:Dubbo,zookeeper與SpringBoot2.x進行實戰整合見代碼
25:hysrtix 降級代碼
超時降級
@HystrixCommand(
commandKey = "createOrder",
commandProperties = {
@HystrixProperty(name="execution.timeout.enabled", value="true"),
@HystrixProperty(name="execution.isolation.thread.timeoutInMilliseconds", value="3000"),
},
fallbackMethod = "createOrderFallbackMethod4Timeout"
)
限流策略:線程池方式
@HystrixCommand(
commandKey = "createOrder",
commandProperties = {
@HystrixProperty(name="execution.isolation.strategy", value="THREAD")
},
threadPoolKey = "createOrderThreadPool",
threadPoolProperties = {
@HystrixProperty(name="coreSize", value="10"),
@HystrixProperty(name="maxQueueSize", value="20000"),
@HystrixProperty(name="queueSizeRejectionThreshold", value="30")
},
fallbackMethod="createOrderFallbackMethod4Thread"
)
限流策略:信號量方式
@HystrixCommand(
commandKey="createOrder",
commandProperties= {
@HystrixProperty(name="execution.isolation.strategy", value="SEMAPHORE"),
@HystrixProperty(name="execution.isolation.semaphore.maxConcurrentRequests", value="3")
},
fallbackMethod = "createOrderFallbackMethod4semaphore"
)
@RequestMapping("/createOrder")
public String createOrder(@RequestParam("cityId")String cityId,
@RequestParam("platformId")String platformId,
@RequestParam("userId")String userId,
@RequestParam("supplierId")String supplierId,
@RequestParam("goodsId")String goodsId) throws Exception {
return orderService.createOrder(cityId, platformId, userId, supplierId, goodsId) ? "下單成功!" : "下單失敗!";
}
public String createOrderFallbackMethod4Timeout(@RequestParam("cityId")String cityId,
@RequestParam("platformId")String platformId,
@RequestParam("userId")String userId,
@RequestParam("suppliedId")String suppliedId,
@RequestParam("goodsId")String goodsId) throws Exception {
System.err.println("-------超時降級策略執行------------");
return "hysrtix timeout !";
}
@Configuration
public class HystrixConfig {
// 用來攔截處理HystrixCommand註解
@Bean
public HystrixCommandAspect hystrixAspect() {
return new HystrixCommandAspect();
}
// 用來像監控中心Dashboard發送stream信息
@Bean
public ServletRegistrationBean hystrixMetricsStreamServlet() {
ServletRegistrationBean registration = new ServletRegistrationBean(new HystrixMetricsStreamServlet());
registration.addUrlMappings("/hystrix.stream");
return registration;
}
}
26:請求合併
27:分佈式事務
分佈式事務執行流程
product並行執行向broker發送消息和執行本地事務,發送到broker的消息爲不可見狀態
當本地事務執行成功後product會向broker發送一個成功確認消息,此時會把broker的消息更改成可見狀態,以供消費者消費
當本地事務執行失敗後product會向broker發送一個失敗確認消息,此時會把broker的消息更改成失敗狀態,broker會有個定時任務刪除失敗的消息
若是其中某個環節出現問題broker沒接收到product發送的確認消息,broker會間隔回調一個check回調函數,
以便查看product本地事務執行的結果來再次發送確認消息到broker
28:分佈式事務代碼
product端代碼
TransactionListenerImpl代碼
測試本地事務返回LocalTransactionState.UNKNOW 執行checkLocalTransaction代碼
消費端代碼
29:FastJsonConvertUtil對象轉換json工具類
public class FastJsonConvertUtil {
private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };
/**
* <B>方法名稱:</B>將JSON字符串轉換爲實體對象<BR>
* <B>概要說明:</B>將JSON字符串轉換爲實體對象<BR>
* @param data JSON字符串
* @param clzss 轉換對象
* @return T
*/
public static <T> T convertJSONToObject(String data, Class<T> clzss) {
try {
T t = JSON.parseObject(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名稱:</B>將JSONObject對象轉換爲實體對象<BR>
* <B>概要說明:</B>將JSONObject對象轉換爲實體對象<BR>
* @param data JSONObject對象
* @param clzss 轉換對象
* @return T
*/
public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) {
try {
T t = JSONObject.toJavaObject(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名稱:</B>將JSON字符串數組轉爲List集合對象<BR>
* <B>概要說明:</B>將JSON字符串數組轉爲List集合對象<BR>
* @param data JSON字符串數組
* @param clzss 轉換對象
* @return List<T>集合對象
*/
public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) {
try {
List<T> t = JSON.parseArray(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名稱:</B>將List<JSONObject>轉爲List集合對象<BR>
* <B>概要說明:</B>將List<JSONObject>轉爲List集合對象<BR>
* @param data List<JSONObject>
* @param clzss 轉換對象
* @return List<T>集合對象
*/
public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) {
try {
List<T> t = new ArrayList<T>();
for (JSONObject jsonObject : data) {
t.add(convertJSONToObject(jsonObject, clzss));
}
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名稱:</B>將對象轉爲JSON字符串<BR>
* <B>概要說明:</B>將對象轉爲JSON字符串<BR>
* @param obj 任意對象
* @return JSON字符串
*/
public static String convertObjectToJSON(Object obj) {
try {
String text = JSON.toJSONString(obj);
return text;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名稱:</B>將對象轉爲JSONObject對象<BR>
* <B>概要說明:</B>將對象轉爲JSONObject對象<BR>
* @param obj 任意對象
* @return JSONObject對象
*/
public static JSONObject convertObjectToJSONObject(Object obj){
try {
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
return jsonObject;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名稱:</B><BR>
* <B>概要說明:</B><BR>
* @param obj
* @return
*/
public static String convertObjectToJSONWithNullValue(Object obj) {
try {
String text = JSON.toJSONString(obj, featuresWithNullValue);
return text;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
30:實戰代碼
支付生產端代碼paya
TransactionProducer
@Component
public class TransactionProducer implements InitializingBean {
private TransactionMQProducer producer;
private ExecutorService executorService;
@Autowired
private TransactionListenerImpl transactionListenerImpl;
private static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";
private static final String PRODUCER_GROUP_NAME = "tx_pay_producer_group_name";
private TransactionProducer() {
this.producer = new TransactionMQProducer(PRODUCER_GROUP_NAME);
this.executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(PRODUCER_GROUP_NAME + "-check-thread");
return thread;
}
});
this.producer.setExecutorService(executorService);
this.producer.setNamesrvAddr(NAMESERVER);
}
@Override
public void afterPropertiesSet() throws Exception {
this.producer.setTransactionListener(transactionListenerImpl);
start();
}
private void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public void shutdown() {
this.producer.shutdown();
}
public TransactionSendResult sendMessage(Message message, Object argument) {
TransactionSendResult sendResult = null;
try {
sendResult = this.producer.sendMessageInTransaction(message, argument);
} catch (Exception e) {
e.printStackTrace();
}
return sendResult;
}
}
TransactionListenerImpl
@Component
public class TransactionListenerImpl implements TransactionListener {
@Autowired
private CustomerAccountMapper customerAccountMapper;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.err.println("執行本地事務單元------------");
CountDownLatch currentCountDown = null;
try {
Map<String, Object> params = (Map<String, Object>) arg;
String userId = (String)params.get("userId");
String accountId = (String)params.get("accountId");
String orderId = (String)params.get("orderId");
BigDecimal payMoney = (BigDecimal)params.get("payMoney"); // 當前的支付款
BigDecimal newBalance = (BigDecimal)params.get("newBalance"); // 前置扣款成功的餘額
int currentVersion = (int)params.get("currentVersion");
currentCountDown = (CountDownLatch)params.get("currentCountDown");
//updateBalance 傳遞當前的支付款 數據庫操做:
Date currentTime = new Date();
int count = this.customerAccountMapper.updateBalance(accountId, newBalance, currentVersion, currentTime);
if(count == 1) {
currentCountDown.countDown();
return LocalTransactionState.COMMIT_MESSAGE;
} else {
currentCountDown.countDown();
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
e.printStackTrace();
currentCountDown.countDown();
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// TODO Auto-generated method stub
return null;
}
}
支付流程代碼
@Service
public class PayServiceImpl implements PayService {
public static final String TX_PAY_TOPIC = "tx_pay_topic";
public static final String TX_PAY_TAGS = "pay";
@Autowired
private CustomerAccountMapper customerAccountMapper;
@Autowired
private TransactionProducer transactionProducer;
@Autowired
private CallbackService callbackService;
@Override
public String payment(String userId, String orderId, String accountId, double money) {
String paymentRet = "";
try {
// 最開始有一步 token驗證操做(重複提單問題)
BigDecimal payMoney = new BigDecimal(money);
//加鎖開始(獲取)
CustomerAccount old = customerAccountMapper.selectByPrimaryKey(accountId);
BigDecimal currentBalance = old.getCurrentBalance();
int currentVersion = old.getVersion();
// 要對大機率事件進行提早預判(小几率事件咱們作放過,可是最後保障數據的一致性便可)
//業務出發:
//當前一個用戶帳戶 只容許一個線程(一個應用端訪問)
//技術出發:
//1 redis去重 分佈式鎖
//2 數據庫樂觀鎖去重
// 作扣款操做的時候:得到分佈式鎖,看一下可否得到
BigDecimal newBalance = currentBalance.subtract(payMoney);
//加鎖結束(釋放)
if(newBalance.doubleValue() > 0 ) { // 或者一種狀況獲取鎖失敗
// 1.組裝消息
// 1.執行本地事務
String keys = UUID.randomUUID().toString() + "$" + System.currentTimeMillis();
Map<String, Object> params = new HashMap<>();
params.put("userId", userId);
params.put("orderId", orderId);
params.put("accountId", accountId);
params.put("money", money); //100
Message message = new Message(TX_PAY_TOPIC, TX_PAY_TAGS, keys, FastJsonConvertUtil.convertObjectToJSON(params).getBytes());
// 可能須要用到的參數
params.put("payMoney", payMoney);
params.put("newBalance", newBalance);
params.put("currentVersion", currentVersion);
// 同步阻塞
CountDownLatch countDownLatch = new CountDownLatch(1);
params.put("currentCountDown", countDownLatch);
// 消息發送而且 本地的事務執行
TransactionSendResult sendResult = transactionProducer.sendMessage(message, params);
countDownLatch.await();
if(sendResult.getSendStatus() == SendStatus.SEND_OK
&& sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
// 回調order通知支付成功消息
callbackService.sendOKMessage(orderId, userId);
paymentRet = "支付成功!";
} else {
paymentRet = "支付失敗!";
}
} else {
paymentRet = "餘額不足!";
}
} catch (Exception e) {
e.printStackTrace();
paymentRet = "支付失敗!";
}
return paymentRet;
}
}
支付消費端代碼payb
@Component
public class PayConsumer {
private DefaultMQPushConsumer consumer;
private static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";
private static final String CONSUMER_GROUP_NAME = "tx_pay_consumer_group_name";
public static final String TX_PAY_TOPIC = "tx_pay_topic";
public static final String TX_PAY_TAGS = "pay";
@Autowired
private PlatformAccountMapper platformAccountMapper;
private PayConsumer() {
try {
this.consumer = new DefaultMQPushConsumer(CONSUMER_GROUP_NAME);
this.consumer.setConsumeThreadMin(10);
this.consumer.setConsumeThreadMax(30);
this.consumer.setNamesrvAddr(NAMESERVER);
this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
this.consumer.subscribe(TX_PAY_TOPIC, TX_PAY_TAGS);
this.consumer.registerMessageListener(new MessageListenerConcurrently4Pay());
this.consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
class MessageListenerConcurrently4Pay implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
try {
String topic = msg.getTopic();
String tags = msg.getTags();
String keys = msg.getKeys();
String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("收到事務消息, topic: " + topic + ", tags: " + tags + ", keys: " + keys + ", body: " + body);
// 消息一單過來的時候(去重 冪等操做)
// 數據庫主鍵去重<去重表 keys>
// insert table --> insert ok & primary key
Map<String, Object> paramsBody = FastJsonConvertUtil.convertJSONToObject(body, Map.class);
String userId = (String)paramsBody.get("userId"); // customer userId
String accountId = (String)paramsBody.get("accountId"); //customer accountId
String orderId = (String)paramsBody.get("orderId"); // 統一的訂單
BigDecimal money = (BigDecimal)paramsBody.get("money"); // 當前的收益款
PlatformAccount pa = platformAccountMapper.selectByPrimaryKey("platform001"); // 當前平臺的一個帳號
pa.setCurrentBalance(pa.getCurrentBalance().add(money));
Date currentTime = new Date();
pa.setVersion(pa.getVersion() + 1);
pa.setDateTime(currentTime);
pa.setUpdateTime(currentTime);
platformAccountMapper.updateByPrimaryKeySelective(pa);
} catch (Exception e) {
e.printStackTrace();
//msg.getReconsumeTimes();
// 若是處理屢次操做仍是失敗, 記錄失敗日誌(作補償 回顧 人工處理)
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
當支付成功 回調訂單修改訂單狀態代碼
paya代碼(所屬)
@Service
public class CallbackService {
public static final String CALLBACK_PAY_TOPIC = "callback_pay_topic";
public static final String CALLBACK_PAY_TAGS = "callback_pay";
public static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";
@Autowired
private SyncProducer syncProducer;
public void sendOKMessage(String orderId, String userId) {
Map<String, Object> params = new HashMap<>();
params.put("userId", userId);
params.put("orderId", orderId);
params.put("status", "2"); //ok
String keys = UUID.randomUUID().toString() + "$" + System.currentTimeMillis();
Message message = new Message(CALLBACK_PAY_TOPIC, CALLBACK_PAY_TAGS, keys, FastJsonConvertUtil.convertObjectToJSON(params).getBytes());
SendResult ret = syncProducer.sendMessage(message);
}
}
發送修改訂單狀態消息的代碼
paya代碼(所屬)
@Component
public class SyncProducer {
private DefaultMQProducer producer;
private static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";
private static final String PRODUCER_GROUP_NAME = "callback_pay_producer_group_name";
private SyncProducer() {
this.producer = new DefaultMQProducer(PRODUCER_GROUP_NAME);
this.producer.setNamesrvAddr(NAMESERVER);
this.producer.setRetryTimesWhenSendFailed(3);
start();
}
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public SendResult sendMessage(Message message) {
SendResult sendResult = null;
try {
sendResult = this.producer.send(message);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return sendResult;
}
public void shutdown() {
this.producer.shutdown();
}
}
消費端修改訂單狀態的代碼
order端代碼
@Component
public class OrderConsumer {
private DefaultMQPushConsumer consumer;
public static final String CALLBACK_PAY_TOPIC = "callback_pay_topic";
public static final String CALLBACK_PAY_TAGS = "callback_pay";
public static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";
@Autowired
private OrderMapper orderMapper;
@Autowired
private OrderService orderService;
public OrderConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer("callback_pay_consumer_group");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(50);
consumer.setNamesrvAddr(NAMESERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(CALLBACK_PAY_TOPIC, CALLBACK_PAY_TAGS);
consumer.registerMessageListener(new MessageListenerConcurrently4Pay());
consumer.start();
}
class MessageListenerConcurrently4Pay implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
try {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
String keys = msg.getKeys();
System.err.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + "keys :" + keys + ", msg : " + msgBody);
String orignMsgId = msg.getProperties().get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
System.err.println("orignMsgId: " + orignMsgId);
//經過keys 進行去重表去重 或者使用redis進行去重???? --> 不須要
Map<String, Object> body = FastJsonConvertUtil.convertJSONToObject(msgBody, Map.class);
String orderId = (String) body.get("orderId");
String userId = (String) body.get("userId");
String status = (String)body.get("status");
Date currentTime = new Date();
if(status.equals(OrderStatus.ORDER_PAYED.getValue())) {
int count = orderMapper.updateOrderStatus(orderId, status, "admin", currentTime);
if(count == 1) {
orderService.sendOrderlyMessage4Pkg(userId, orderId);
}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
31:順序消息
順序消息是指消息的消費順序和消費的生產順序相同
全局順序:在某個topic下的全部消息都保持順序
局部順序:只保證某一個隊列queue是按順序的便可,這樣能夠有多個隊列queue同時消費提升併發
順序消費場景
在網購的時候,咱們須要下單,那麼下單須要假若有三個順序,第1、建立訂單 ,第二:訂單付款,第三:訂單完成。也就是這個三個環節要有順序,這個訂單纔有意義。RocketMQ能夠保證順序消費。
rocketMq實現順序消費的原理
produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者註冊消息監聽器爲MessageListenerOrderly,這樣就能夠保證消費端只有一個線程去消費消息
注意:是把把消息發到同一個隊列(queue),不是同一個topic,默認狀況下一個topic包括4個queue
順序消費代碼
public static final String PKG_TOPIC = "pkg_topic";
public static final String PKG_TAGS = "pkg";
@Override
public void sendOrderlyMessage4Pkg(String userId, String orderId) {
List<Message> messageList = new ArrayList<>();
Map<String, Object> param1 = new HashMap<String, Object>();
param1.put("userId", userId);
param1.put("orderId", orderId);
param1.put("text", "建立包裹操做---1");
String key1 = UUID.randomUUID().toString() + "$" +System.currentTimeMillis();
Message message1 = new Message(PKG_TOPIC, PKG_TAGS, key1, FastJsonConvertUtil.convertObjectToJSON(param1).getBytes());
messageList.add(message1);
Map<String, Object> param2 = new HashMap<String, Object>();
param2.put("userId", userId);
param2.put("orderId", orderId);
param2.put("text", "發送物流通知操做---2");
String key2 = UUID.randomUUID().toString() + "$" +System.currentTimeMillis();
Message message2 = new Message(PKG_TOPIC, PKG_TAGS, key2, FastJsonConvertUtil.convertObjectToJSON(param2).getBytes());
messageList.add(message2);
// 順序消息投遞 是應該按照 供應商ID 與topic 和 messagequeueId 進行綁定對應的
// supplier_id
Order order = orderMapper.selectByPrimaryKey(orderId);
int messageQueueNumber = Integer.parseInt(order.getSupplierId());
//對應的順序消息的生產者 把messageList 發出去
orderlyProducer.sendOrderlyMessages(messageList, messageQueueNumber);
}
生產端代碼
@Component
public class OrderlyProducer {
private DefaultMQProducer producer;
public static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";
public static final String PRODUCER_GROUP_NAME = "orderly_producer_group_name";
private OrderlyProducer() {
this.producer = new DefaultMQProducer(PRODUCER_GROUP_NAME);
this.producer.setNamesrvAddr(NAMESERVER);
this.producer.setSendMsgTimeout(3000);
start();
}
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public void shutdown() {
this.producer.shutdown();
}
public void sendOrderlyMessages(List<Message> messageList, int messageQueueNumber) {
for(Message me : messageList) {
try {
this.producer.send(me, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer)arg;
return mqs.get(id);
}
}, messageQueueNumber);
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemotingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MQBrokerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
消費端代碼
@Component
public class PkgOrderlyConsumer {
private DefaultMQPushConsumer consumer;
public static final String PKG_TOPIC = "pkg_topic";
public static final String PKG_TAGS = "pkg";
public static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";
public static final String CONSUMER_GROUP_NAME = "orderly_consumer_group_name";
private PkgOrderlyConsumer() throws MQClientException {
this.consumer = new DefaultMQPushConsumer(CONSUMER_GROUP_NAME);
this.consumer.setConsumeThreadMin(10);
this.consumer.setConsumeThreadMin(30);
this.consumer.setNamesrvAddr(NAMESERVER);
this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
this.consumer.subscribe(PKG_TOPIC, PKG_TAGS);
this.consumer.setMessageListener(new PkgOrderlyListener());
this.consumer.start();
}
class PkgOrderlyListener implements MessageListenerOrderly {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for(MessageExt msg: msgs) {
try {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
String keys = msg.getKeys();
System.err.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + "keys :" + keys + ", msg : " + msgBody);
Map<String, Object> body = FastJsonConvertUtil.convertJSONToObject(msgBody, Map.class);
String orderId = (String) body.get("orderId");
String userId = (String) body.get("userId");
String text = (String)body.get("text");
// 模擬實際的業務耗時操做
// PS: 建立包裹信息 、對物流的服務調用(異步調用)
TimeUnit.SECONDS.sleep(random.nextInt(3) + 1);
System.err.println("業務操做: " + text);
} catch (Exception e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
32:rocketmq消息過濾機制
使用tag進行消息過濾 在broker端過濾
使用sql表達式進行消息過濾
使用filter server進行消息過濾 用cpu資源換取網卡流量在broker端過濾 最新版本不支持 比sql更爲靈活
tag 能夠幫助咱們方便的選擇咱們想要的消息,例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
但 tag 有個限制,一個消息只能設置一個tag,在某些場景下這就很不方便了,這時就可使用 filter,rocketmq 支持使用 sql 語句的方式來進行消息過濾。
Message msg = new Message("FilterTest", "tagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg);
消費端加入以下代碼便可
consumer.subscribe("FilterTest", MessageSelector.bySql("a between 0 and 3"));
broker的配置文件中須要指定對filter的支持,不然報錯:
enablePropertyFilter = true
33:提升吞吐量和性能的方案
增長機器數量提供多個consumer消費實例或者增長同一個consumer內部線程的並行度
設置批量獲取消息進行消費
topic下的隊列queue數量應該和消費者數量契合
生產者發送oneway消息
多生產者同時發送消息
34:源碼結構