RocketMQ 是阿里巴巴在2012年開源的分佈式消息中間件,目前已經捐贈給 Apache 軟件基金會,並於2017年9月25日成爲 Apache 的頂級項目。做爲經歷過屢次阿里巴巴雙十一這種「超級工程」的洗禮並有穩定出色表現的國產中間件,以其高性能、低延時和高可靠等特性近年來已經也被愈來愈多的國內企業使用。其主要特色有:java
RocketMQ 自然支持集羣,其核心四組件(Name Server、Broker、Producer、Consumer)每個均可以在沒有單點故障的狀況下進行水平擴展。git
RocketMQ 採用零拷貝原理實現超大的消息的堆積能力,聽說單機已能夠支持億級消息堆積,並且在堆積了這麼多消息後依然保持寫入低延遲。github
能夠保證消息消費者按照消息發送的順序對消息進行消費。順序消息分爲全局有序和局部有序,通常推薦使用局部有序,即生產者經過將某一類消息按順序發送至同一個隊列來實現。spring
消息過濾分爲在服務器端過濾和在消費端過濾。服務器端過濾時能夠按照消息消費者的要求作過濾,優勢是減小沒必要要消息傳輸,缺點是增長了消息服務器的負擔,實現相對複雜。消費端過濾則徹底由具體應用自定義實現,這種方式更加靈活,缺點是不少無用的消息會傳輸給消息消費者。apache
RocketMQ 除了支持普通消息,順序消息以外還支持事務消息,這個特性對於分佈式事務來講提供了又一種解決思路。後端
回溯消費是指消費者已經消費成功的消息,因爲業務上需求須要從新消費,RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒,能夠向前回溯,也能夠向後回溯。bash
下面是一張 RocketMQ 的部署結構圖,裏面涉及了 RocketMQ 核心的四大組件:Name Server、Broker、Producer、Consumer ,每一個組件均可以部署成集羣模式進行水平擴展。服務器
生產者(Producer)負責產生消息,生產者向消息服務器發送由業務應用程序系統生成的消息。 RocketMQ 提供了三種方式發送消息:同步、異步和單向。網絡
同步發送指消息發送方發出數據後會在收到接收方發回響應以後才發下一個數據包。通常用於重要通知消息,例如重要通知郵件、營銷短信。負載均衡
異步發送指發送方發出數據後,不等接收方發回響應,接着發送下個數據包,通常用於可能鏈路耗時較長而對響應時間敏感的業務場景,例如用戶視頻上傳後通知啓動轉碼服務。
單向發送是指只負責發送消息而不等待服務器迴應且沒有回調函數觸發,適用於某些耗時很是短但對可靠性要求並不高的場景,例如日誌收集。
生產者組(Producer Group)是一類 Producer 的集合,這類 Producer 一般發送一類消息而且發送邏輯一致,因此將這些 Producer 分組在一塊兒。從部署結構上看生產者經過 Producer Group 的名字來標記本身是一個集羣。
消費者(Consumer)負責消費消息,消費者從消息服務器拉取信息並將其輸入用戶應用程序。站在用戶應用的角度消費者有兩種類型:拉取型消費者、推送型消費者。
拉取型消費者(Pull Consumer)主動從消息服務器拉取信息,只要批量拉取到消息,用戶應用就會啓動消費過程,因此 Pull 稱爲主動消費型。
推送型消費者(Push Consumer)封裝了消息的拉取、消費進度和其餘的內部維護工做,將消息到達時執行的回調接口留給用戶應用程序來實現。因此 Push 稱爲被動消費類型,但從實現上看仍是從消息服務器中拉取消息,不一樣於 Pull 的是 Push 首先要註冊消費監聽器,當監聽器處觸發後纔開始消費消息。
消費者組(Consumer Group)一類 Consumer 的集合名稱,這類 Consumer 一般消費同一類消息而且消費邏輯一致,因此將這些 Consumer 分組在一塊兒。消費者組與生產者組相似,都是將相同角色的分組在一塊兒並命名,分組是個很精妙的概念設計,RocketMQ 正是經過這種分組機制,實現了自然的消息負載均衡。消費消息時經過 Consumer Group 實現了將消息分發到多個消費者服務器實例,好比某個 Topic 有9條消息,其中一個 Consumer Group 有3個實例(3個進程或3臺機器),那麼每一個實例將均攤3條消息,這也意味着咱們能夠很方便的經過加機器來實現水平擴展。
消息服務器(Broker)是消息存儲中心,主要做用是接收來自 Producer 的消息並存儲, Consumer 從這裏取得消息。它還存儲與消息相關的元數據,包括用戶組、消費進度偏移量、隊列信息等。從部署結構圖中能夠看出 Broker 有 Master 和 Slave 兩種類型,Master 既能夠寫又能夠讀,Slave 不能夠寫只能夠讀。從物理結構上看 Broker 的集羣部署方式有四種:單 Master 、多 Master 、多 Master 多 Slave(同步刷盤)、多 Master多 Slave(異步刷盤)。
這種方式一旦 Broker 重啓或宕機會致使整個服務不可用,這種方式風險較大,因此顯然不建議線上環境使用。
全部消息服務器都是 Master ,沒有 Slave 。這種方式優勢是配置簡單,單個 Master 宕機或重啓維護對應用無影響。缺點是單臺機器宕機期間,該機器上未被消費的消息在機器恢復以前不可訂閱,消息實時性會受影響。
每一個 Master 配置一個 Slave,因此有多對 Master-Slave,消息採用異步複製方式,主備之間有毫秒級消息延遲。這種方式優勢是消息丟失的很是少,且消息實時性不會受影響,Master 宕機後消費者能夠繼續從 Slave 消費,中間的過程對用戶應用程序透明,不須要人工干預,性能同多 Master 方式幾乎同樣。缺點是 Master 宕機時在磁盤損壞狀況下會丟失極少許消息。
每一個 Master 配置一個 Slave,因此有多對 Master-Slave ,消息採用同步雙寫方式,主備都寫成功才返回成功。這種方式優勢是數據與服務都沒有單點問題,Master 宕機時消息無延遲,服務與數據的可用性很是高。缺點是性能相對異步複製方式略低,發送消息的延遲會略高。
名稱服務器(NameServer)用來保存 Broker 相關元信息並給 Producer 和 Consumer 查找 Broker 信息。NameServer 被設計成幾乎無狀態的,能夠橫向擴展,節點之間相互之間無通訊,經過部署多臺機器來標記本身是一個僞集羣。每一個 Broker 在啓動的時候會到 NameServer 註冊,Producer 在發送消息前會根據 Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。因此從功能上看應該是和 ZooKeeper 差很少,聽說 RocketMQ 的早期版本確實是使用的 ZooKeeper ,後來改成了本身實現的 NameServer 。
消息(Message)就是要傳輸的信息。一條消息必須有一個主題(Topic),主題能夠看作是你的信件要郵寄的地址。一條消息也能夠擁有一個可選的標籤(Tag)和額處的鍵值對,它們能夠用於設置一個業務 key 並在 Broker 上查找此消息以便在開發期間查找問題。
主題(Topic)能夠看作消息的規類,它是消息的第一級類型。好比一個電商系統能夠分爲:交易消息、物流消息等,一條消息必須有一個 Topic 。Topic 與生產者和消費者的關係很是鬆散,一個 Topic 能夠有0個、1個、多個生產者向其發送消息,一個生產者也能夠同時向不一樣的 Topic 發送消息。一個 Topic 也能夠被 0個、1個、多個消費者訂閱。
標籤(Tag)能夠看做子主題,它是消息的第二級類型,用於爲用戶提供額外的靈活性。使用標籤,同一業務模塊不一樣目的的消息就能夠用相同 Topic 而不一樣的 Tag 來標識。好比交易消息又能夠分爲:交易建立消息、交易完成消息等,一條消息能夠沒有 Tag 。標籤有助於保持您的代碼乾淨和連貫,而且還能夠爲 RocketMQ 提供的查詢系統提供幫助。
消息隊列(Message Queue),主題被劃分爲一個或多個子主題,即消息隊列。一個 Topic 下能夠設置多個消息隊列,發送消息時執行該消息的 Topic ,RocketMQ 會輪詢該 Topic 下的全部隊列將消息發出去。下圖 Broker 內部消息狀況:
消息消費模式有兩種:集羣消費(Clustering)和廣播消費(Broadcasting)。默認狀況下就是集羣消費,該模式下一個消費者集羣共同消費一個主題的多個隊列,一個隊列只會被一個消費者消費,若是某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費。而廣播消費消息會發給消費者組中的每個消費者進行消費。
消息順序(Message Order)有兩種:順序消費(Orderly)和並行消費(Concurrently)。順序消費表示消息消費的順序同生產者爲每一個消息隊列發送的順序一致,因此若是正在處理全局順序是強制性的場景,須要確保使用的主題只有一個消息隊列。並行消費再也不保證消息順序,消費的最大並行數量受每一個消費者客戶端指定的線程池限制。
RocketMQ 目前支持 Java、C++、Go 三種語言訪問,按慣例以 Java 語言爲例看下如何用 RocketMQ 來收發消息的。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
複製代碼
添加 RocketMQ 客戶端訪問支持,具體版本和安裝的 RocketMQ 版本一致便可。
package org.study.mq.rocketMQ.java;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws Exception {
//建立一個消息生產者,並設置一個消息生產者組
DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");
//指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
//初始化 Producer,整個應用生命週期內只須要初始化一次
producer.start();
for (int i = 0; i < 100; i++) {
//建立一條消息對象,指定其主題、標籤和消息內容
Message msg = new Message(
"topic_example_java" /* 消息主題名 */,
"TagA" /* 消息標籤 */,
("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內容 */
);
//發送消息並返回結果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 一旦生產者實例再也不被使用則將其關閉,包括清理資源,關閉網絡鏈接等
producer.shutdown();
}
}
複製代碼
示例中用 DefaultMQProducer 類來建立一個消息生產者,一般一個應用建立一個 DefaultMQProducer 對象,因此通常由應用來維護生產者對象,能夠其設置爲全局對象或者單例。該類構造函數入參 producerGroup 是消息生產者組的名字,不管生產者仍是消費者都必須給出 GroupName ,並保證該名字的惟一性,ProducerGroup 發送普通的消息時做用不大,後面介紹分佈式事務消息時會用到。
接下來指定 NameServer 地址和調用 start 方法初始化,在整個應用生命週期內只須要調用一次 start 方法。
初始化完成後,調用 send 方法發送消息,示例中只是簡單的構造了100條一樣的消息發送,其實一個 Producer 對象能夠發送多個主題多個標籤的消息,消息對象的標籤能夠爲空。send 方法是同步調用,只要不拋異常就標識成功。
最後應用退出時調用 shutdown 方法清理資源、關閉網絡鏈接,從服務器上註銷本身,一般建議應用在 JBOSS、Tomcat 等容器的退出鉤子裏調用 shutdown 方法。
package org.study.mq.rocketMQ.java;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
//建立一個消息消費者,並設置一個消息消費者組
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
//設置 Consumer 第一次啓動時從隊列頭部開始消費仍是隊列尾部開始消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//訂閱指定 Topic 下的全部消息
consumer.subscribe("topic_example_java", "*");
//註冊消息監聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默認 list 裏只有一條消息,能夠經過設置參數來批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消費者對象在使用以前必需要調用 start 初始化
consumer.start();
System.out.println("消息消費者已啓動");
}
}
複製代碼
示例中用 DefaultMQPushConsumer 類來建立一個消息消費者,通生產者同樣一個應用通常建立一個 DefaultMQPushConsumer 對象,該對象通常由應用來維護,能夠其設置爲全局對象或者單例。該類構造函數入參 consumerGroup 是消息消費者組的名字,須要保證該名字的惟一性。
接下來指定 NameServer 地址和設置消費者應用程序第一次啓動時從隊列頭部開始消費仍是隊列尾部開始消費。
接着調用 subscribe 方法給消費者對象訂閱指定主題下的消息,該方法第一個參數是主題名,第二個擦書是標籤名,示例表示訂閱了主題名 topic_example_java 下全部標籤的消息。
最主要的是註冊消息監聽器才能消費消息,示例中用的是 Consumer Push 的方式,即設置監聽器回調的方式消費消息,默認監聽回調方法中 List 裏只有一條消息,能夠經過設置參數來批量接收消息。
最後調用 start 方法初始化,在整個應用生命週期內只須要調用一次 start 方法。
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
複製代碼
RocketMQ 核心的四大組件中 Name Server 和 Broker 都是由 RocketMQ 安裝包提供的,因此要啓動這兩個應用才能提供消息服務。首先啓動 Name Server,先確保你的機器中已經安裝了與 RocketMQ 相匹配的 JDK ,並設置了環境變量 JAVA_HOME ,而後在 RocketMQ 的安裝目錄下執行 bin 目錄下的 mqnamesrv ,默認會將該命令的執行狀況輸出到當前目錄的 nohup.out 文件,最後跟蹤日誌文件查看 Name Server 的實際運行狀況。
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
複製代碼
一樣也要確保你的機器中已經安裝了與 RocketMQ 相匹配的 JDK ,並設置了環境變量 JAVA_HOME ,而後在 RocketMQ 的安裝目錄下執行 bin 目錄下的 mqbroker ,默認會將該命令的執行狀況輸出到當前目錄的 nohup.out 文件,最後跟蹤日誌文件查看 Broker 的實際運行狀況。
先運行 Consumer 類,這樣當生產者發送消息的時候能在消費者後端看到消息記錄。配置沒問題的話會看到在控制檯打印出消息消費者已啓動
最後運行 Producer 類,在 Consumer 的控制檯能看到接收的消息
不一樣於 RabbitMQ、ActiveMQ、Kafka 等消息中間件,Spring 社區已經經過多種方式提供了對這些中間件產品集成,例如經過 spring-jms 整合 ActiveMQ、經過 Spring AMQP 項目下的 spring-rabbit 整合 RabbitMQ、經過 spring-kafka 整合 kafka ,經過他們能夠在 Spring 項目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三種方式,一是將消息生產者和消費者定義成 bean 對象交由 Spring 容器管理,二是使用 RocketMQ 社區的外部項目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)而後經過 spring-jms 方式集成使用,三是若是你的應用是基於 spring-boot 的,可使用 RocketMQ 的外部項目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比較方便的收發消息。
總的來說 rocketmq-jms 項目實現了 JMS 1.1 規範的部份內容,目前支持 JMS 中的發佈/訂閱模型收發消息。rocketmq-spring-boot-starter 項目目前已經支持同步發送、異步發送、單向發送、順序消費、並行消費、集羣消費、廣播消費等特性,若是比較喜歡 Spring Boot 這種全家桶的快速開發框架而且現有特性已知足業務要求可使用該項目。固然從 API 使用上最靈活的仍是第一種方式,下面以第一種方式爲例簡單看下Spring 如何集成 RocketMQ 的。
package org.study.mq.rocketMQ.spring;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
public class SpringProducer {
private Logger logger = Logger.getLogger(getClass());
private String producerGroupName;
private String nameServerAddr;
private DefaultMQProducer producer;
public SpringProducer(String producerGroupName, String nameServerAddr) {
this.producerGroupName = producerGroupName;
this.nameServerAddr = nameServerAddr;
}
public void init() throws Exception {
logger.info("開始啓動消息生產者服務...");
//建立一個消息生產者,並設置一個消息生產者組
producer = new DefaultMQProducer(producerGroupName);
//指定 NameServer 地址
producer.setNamesrvAddr(nameServerAddr);
//初始化 SpringProducer,整個應用生命週期內只須要初始化一次
producer.start();
logger.info("消息生產者服務啓動成功.");
}
public void destroy() {
logger.info("開始關閉消息生產者服務...");
producer.shutdown();
logger.info("消息生產者服務已關閉.");
}
public DefaultMQProducer getProducer() {
return producer;
}
}
複製代碼
消息生產者就是把生產者 DefaultMQProducer 對象的生命週期分紅構造函數、init、destroy 三個方法,構造函數中將生產者組名、NameServer 地址做爲變量由 Spring 容器在配置時提供,init 方法中實例化 DefaultMQProducer 對象、設置 NameServer 地址、初始化生產者對象,destroy 方法用於生產者對象銷燬時清理資源。
package org.study.mq.rocketMQ.spring;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
public class SpringConsumer {
private Logger logger = Logger.getLogger(getClass());
private String consumerGroupName;
private String nameServerAddr;
private String topicName;
private DefaultMQPushConsumer consumer;
private MessageListenerConcurrently messageListener;
public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {
this.consumerGroupName = consumerGroupName;
this.nameServerAddr = nameServerAddr;
this.topicName = topicName;
this.messageListener = messageListener;
}
public void init() throws Exception {
logger.info("開始啓動消息消費者服務...");
//建立一個消息消費者,並設置一個消息消費者組
consumer = new DefaultMQPushConsumer(consumerGroupName);
//指定 NameServer 地址
consumer.setNamesrvAddr(nameServerAddr);
//設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//訂閱指定 Topic 下的全部消息
consumer.subscribe(topicName, "*");
//註冊消息監聽器
consumer.registerMessageListener(messageListener);
// 消費者對象在使用以前必需要調用 start 初始化
consumer.start();
logger.info("消息消費者服務啓動成功.");
}
public void destroy(){
logger.info("開始關閉消息消費者服務...");
consumer.shutdown();
logger.info("消息消費者服務已關閉.");
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
}
複製代碼
同消息生產者相似,消息消費者是把生產者 DefaultMQPushConsumer 對象的生命週期分紅構造函數、init、destroy 三個方法,具體含義在介紹 Java 訪問 RocketMQ 實例時已經介紹過了,再也不贅述。固然,有了消費者對象還須要消息監聽器在接收到消息後執行具體的處理邏輯。
package org.study.mq.rocketMQ.spring;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class MessageListener implements MessageListenerConcurrently {
private Logger logger = Logger.getLogger(getClass());
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (list != null) {
for (MessageExt ext : list) {
try {
logger.info("監聽到消息 : " + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
複製代碼
消息監聽器類就是把前面 Java 示例中註冊消息監聽器時聲明的匿名內部類代碼抽取出來定義成單獨一個類而已。
由於只使用 Spring 框架集成,因此除了 Sping 框架核心 jar 包外不須要額外添加依賴包了。本例中將消息生產者和消息消費者分紅兩個配置文件,這樣能更好的演示收發消息的效果。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
<bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy">
<constructor-arg name="nameServerAddr" value="localhost:9876"/>
<constructor-arg name="producerGroupName" value="spring_producer_group"/>
</bean>
</beans>
複製代碼
消息生產者配置很簡單,定義了一個消息生產者對象,該對象初始化時調用 init 方法,對象銷燬前執行 destroy 方法,將 Name Server 地址和生產者組配置好。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
<bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" />
<bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy">
<constructor-arg name="nameServerAddr" value="localhost:9876"/>
<constructor-arg name="consumerGroupName" value="spring_consumer_group"/>
<constructor-arg name="topicName" value="spring-rocketMQ-topic" />
<constructor-arg name="messageListener" ref="messageListener" />
</bean>
</beans>
複製代碼
消息消費者同消息生產者配置相似,多了一個消息監聽器對象的定義和綁定。
按前述步驟 啓動 Name Server 和 Broker,接着運行消息生產者和消息消費者程序,簡化起見咱們用兩個單元測試類模擬這兩個程序:
package org.study.mq.rocketMQ.spring;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringProducerTest {
private ApplicationContext container;
@Before
public void setup() {
container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");
}
@Test
public void sendMessage() throws Exception {
SpringProducer producer = container.getBean(SpringProducer.class);
for (int i = 0; i < 20; i++) {
//建立一條消息對象,指定其主題、標籤和消息內容
Message msg = new Message(
"spring-rocketMQ-topic",
null,
("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內容 */
);
//發送消息並返回結果
SendResult sendResult = producer.getProducer().send(msg);
System.out.printf("%s%n", sendResult);
}
}
}
複製代碼
SpringProducerTest 類模擬消息生產者發送消息。
package org.study.mq.rocketMQ.spring;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringConsumerTest {
private ApplicationContext container;
@Before
public void setup() {
container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");
}
@Test
public void consume() throws Exception {
SpringConsumer consumer = container.getBean(SpringConsumer.class);
Thread.sleep(200 * 1000);
consumer.destroy();
}
}
複製代碼
SpringConsumerTest 類模擬消息消費者者接收消息,在 consume 方法返回以前須要讓當前線程睡眠一段時間,使消費者程序繼續存活才能監聽到生產者發送的消息。
分別運行 SpringProducerTest 類 和 SpringConsumerTest 類,在 SpringConsumerTest 的控制檯能看到接收的消息:
假如啓動兩個 SpringConsumerTest 類進程,由於它們屬於同一消費者組,在 SpringConsumerTest 的控制檯能看到它們均攤到了消息: