本文代碼示例參見:https://gitee.com/imlichao/RocketMQ-examplehtml
Apache RocketMQ文檔:http://rocketmq.apache.org/docs/quick-start/java
阿里雲RocketMQ文檔:https://help.aliyun.com/product/29530.htmlgit
消息隊列 RocketMQ 是阿里巴巴集團自主研發的專業消息中間件,基於高可用分佈式集羣技術,提供消息訂閱和發佈、消息軌跡查詢以及定時(延時)消息、資源統計、監控報警等一系列消息雲服務,是企業級互聯網架構的核心產品。 消息隊列 RocketMQ 歷史超過9年,爲分佈式應用系統提供異步解耦、削峯填谷的能力,同時具有海量消息堆積、高吞吐、可靠重試等互聯網應用所需的特性,是阿里巴巴雙11使用的核心產品。web
消息隊列 RocketMQ 是阿里雲正式商用的產品,目前在阿里雲多個地域(Region)提供了高可用消息雲服務,單個域內採用多機房部署,可用性極高,即便整個機房都不可用,仍然能夠爲應用提供消息發佈服務,產品穩定性及可用性徹底按照阿里巴巴內部標準來實施,無單點。spring
消息隊列 RocketMQ 目前提供 TCP 和 HTTP 協議層面的接入方式,支持 Java、C++、 .NET、Go、Python、Nodejs、PHP 這七種編程語言,方便不一樣編程語言開發的應用快速接入消息隊列 RocketMQ 消息雲服務。 用戶能夠將應用部署在阿里雲 ECS、企業自建雲,或者嵌入到移動端、物聯網設備中與消息隊列 RocketMQ 創建鏈接進行消息收發,同時本地開發者也能夠經過公網接入消息隊列 RocketMQ 服務進行消息收發。apache
消息隊列 RocketMQ 支持「發佈/訂閱」模型,消息發佈者(生產者)能夠將一條消息發送服務端的某個主題(Topic),多個消息接收方(消費者)訂閱這個主題以接收該消息,以下圖所示:編程
本例使用阿里雲RocketMQ產品,其中用戶名密碼地址等使用「XXXXXX」表示。api
增長maven依賴服務器
<!-- 增長RocketMQ依賴 --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.0.Final</version> </dependency>
配置類架構
本例使用兩個同組下的消費者共同消費,而且消息採用exactly-once語義保證每一個消息只被消費一次
package pub.imlichao.config; import com.aliyun.openservices.ons.api.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Properties; /** * RocketMQ配置 */ @Configuration public class RocketMQConfig { /** * 生產者配置 * @return */ @Bean public Producer producer () { Properties properties = new Properties(); // 鑑權用 AccessKey,在阿里雲服務器管理控制檯建立 properties.put(PropertyKeyConst.AccessKey,"XXXXXX"); // 鑑權用 SecretKey,在阿里雲服務器管理控制檯建立 properties.put(PropertyKeyConst.SecretKey, "XXXXXX"); // 設置 TCP 接入域名,進入控制檯的實例管理頁面,在頁面上方選擇實例後,在實例信息中的「獲取接入點信息」區域查看 properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXXXXX"); //經過 PropertyKeyConst.EXACTLYONCE_DELIVERY 開啓 exactly-once 投遞語義(保證擁有多個消費者時消息只被消費一次) properties.put(PropertyKeyConst.EXACTLYONCE_DELIVERY, "true"); Producer producer = ONSFactory.createProducer(properties); // 在發送消息前,必須調用 start 方法來啓動 Producer,只需調用一次便可 producer.start(); return producer; } /** * 消費者1配置 * @return */ @Bean public Consumer consumer1 () { Properties properties = new Properties(); // 您在控制檯建立的 Group ID properties.put(PropertyKeyConst.GROUP_ID, "GID_pmall_consumer"); // 鑑權用 AccessKey,在阿里雲服務器管理控制檯建立 properties.put(PropertyKeyConst.AccessKey,"XXXXXX"); // 鑑權用 SecretKey,在阿里雲服務器管理控制檯建立 properties.put(PropertyKeyConst.SecretKey, "XXXXXX"); // 設置 TCP 接入域名,進入控制檯的實例管理頁面,在頁面上方選擇實例後,在實例信息中的「獲取接入點信息」區域查看 properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXXXXX"); Consumer consumer = ONSFactory.createConsumer(properties); //建立消息監聽和消息處理邏輯 consumer.subscribe("pmall_message", "data_storage", new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { System.out.println("Receive1: " + new String (message.getBody()) + " " + message.getMsgID()); return Action.CommitMessage; } }); //啓動監聽 consumer.start(); return consumer; } /** * 消費者2配置 * @return */ @Bean public Consumer consumer2 () { Properties properties = new Properties(); // 您在控制檯建立的 Group ID properties.put(PropertyKeyConst.GROUP_ID, "GID_pmall_consumer"); // 鑑權用 AccessKey,在阿里雲服務器管理控制檯建立 properties.put(PropertyKeyConst.AccessKey,"XXXXXX"); // 鑑權用 SecretKey,在阿里雲服務器管理控制檯建立 properties.put(PropertyKeyConst.SecretKey, "XXXXXX"); // 設置 TCP 接入域名,進入控制檯的實例管理頁面,在頁面上方選擇實例後,在實例信息中的「獲取接入點信息」區域查看 properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXXXXX"); Consumer consumer = ONSFactory.createConsumer(properties); //建立消息監聽和消息處理邏輯 consumer.subscribe("pmall_message", "data_storage", new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { System.out.println("Receive2: " + new String (message.getBody()) + " " + message.getMsgID()); return Action.CommitMessage; } }); //啓動監聽 consumer.start(); return consumer; } }
發送消息
package pub.imlichao; import com.aliyun.openservices.ons.api.*; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import pub.imlichao.config.RocketMQConfig; import javax.annotation.Resource; import java.util.Date; @Controller public class ProducerTest { @Resource private RocketMQConfig rocketMQConfig; /** * 向RocketMQ發送消息 * @return */ @GetMapping(value = "/send") public String send(){ //循環發送消息100次 for (int i =0;i<100;i++){ //建立消息 Message msg = new Message( // 在控制檯建立的 Topic,即該消息所屬的 Topic 名稱 "pmall_message", // Message Tag。可理解爲 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在消息隊列 RocketMQ 服務器過濾 "data_storage", // Message Body。任何二進制形式的數據,消息隊列 RocketMQ 不作任何干預。須要 Producer 與 Consumer 協商好一致的序列化和反序列化方式 ("pmall MQ "+ new Date()).getBytes()); // 設置表明消息的業務關鍵屬性,請儘量全局惟一,以方便您在沒法正常收到消息狀況下,可經過控制檯查詢消息並補發。注意:不設置也不會影響消息正常收發 msg.setKey("data_id"); // 發送消息,只要不拋異常就是成功 SendResult sendResult = rocketMQConfig.producer().send(msg); //打印 Message ID,以便用於消息發送狀態查詢 System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId()); } return "redirect:/"; } }