消息隊列 RocketMQ 是阿里巴巴集團自主研發的專業消息中間件。 產品基於高可用分佈式集羣技術,提供消息訂閱和發佈、消息軌跡查詢、定時(延時)消息、資源統計、監控報警等一系列消息雲服務,是企業級互聯網架構的核心產品。 消息隊列 RocketMQ 歷史超過9年,爲分佈式應用系統提供異步解耦、削峯填谷的能力,同時具有海量消息堆積、高吞吐、可靠重試等互聯網應用所需的特性,是阿里巴巴雙11使用的核心產品。java
打開阿里雲產品,找到 rocketMQspring
這裏須要咱們根據須要開通包年仍是包月服務,開通成功後進入控制檯json
根據提示建立實例、建立Topics、建立Groupspringboot
建立好了以後,打開 Topic 管理,手動發送一條消息架構
能夠看到發送成功後會返回信息的 messageIDapp
首先引入 pom 異步
<!--消息隊列 RocketMQ--> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.7.9.Final</version> </dependency>
定義 rocketMQ 配置分佈式
@Configuration public class RocketMQConfig { public Properties getProperties(){ Properties properties=new Properties(); /** * 鍵的首字母必須大寫 */ properties.setProperty("AccessKey","**"); // properties.setProperty("SecretKey","**"); // properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 順序消息消費失敗進行重試前的等待時間,單位(毫秒) properties.put(PropertyKeyConst.SuspendTimeMillis, "100"); // 消息消費失敗時的最大重試次數 properties.put(PropertyKeyConst.MaxReconsumeTimes, "20"); // properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_1944503281593155_BaOTPbFU.mq-internet-access.mq-internet.aliyuncs.com:80"); return properties; } }
AccessKey、SecretKey 可在阿里雲我的信息中找到ide
NAMESRV_ADDR 是實例的接入點測試
定義消息發送者
@Component public class RocketMQProducer { @Autowired private RocketMQConfig rocketMQConfig; /** * 一、發送普通消息 * * @param message * @return */ public boolean sendNormalMessage(Message message,String groupId) { Properties properties=rocketMQConfig.getProperties(); properties.setProperty(PropertyKeyConst.GROUP_ID,groupId); Producer producer = ONSFactory.createProducer(properties); // 在發送消息前,必須調用 start 方法來啓動 Producer,只需調用一次便可 producer.start(); try { SendResult sendResult = producer.send(message); // 同步發送消息,只要不拋異常就是成功 if (sendResult != null) { System.out.println("消息發送成功:messageID:"+sendResult.getMessageId()); return true; } } catch (Exception e) { // 消息發送失敗,須要進行重試處理,可從新發送這條消息或持久化這條數據進行補償處理 e.printStackTrace(); } return false; } }
定義消息消費者
@Component public class RocketMQConsumer { @Autowired private RocketMQConfig rocketMQConfig; /** * 一、普通訂閱 * * @param */ public void normalSubscribe( ) { Properties properties = rocketMQConfig.getProperties(); properties.put(PropertyKeyConst.GROUP_ID, "GID-test"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("test", "*", new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + new String(message.getBody())); //把消息轉化爲java對象 //JSONObject jsonObject=JSONObject.parseObject(jsonString); //Book book= jsonObject.toJavaObject(Book.class); return Action.CommitMessage; } }); consumer.start(); } }
測試類
@Autowired private RocketMQProducer rocketMQProducer; @Autowired private RocketMQConsumer rocketMQConsumer; //發送信息 @RequestMapping("/send") public String send(String msg){ // test 是建立的topic是名稱, tag 是消息的二級分類,能夠填空 Message message=new Message("test","tag",msg.getBytes()); // GID-test 是 發送信息組ID rocketMQProducer.sendNormalMessage(message,"GID-test"); return "ok"; } //接收信息 @RequestMapping("/receive") public String receive(){ rocketMQConsumer.normalSubscribe(); return "ok"; }
啓動項目,訪問 send 和 receive,控制檯打印以下
消息發送成功:messageID:C0A83292361818B4AAC23C548787000F Receive: 測試
到這裏說明整合成功。最後咱們只須要在啓動項目的時候啓動消費者。spring 監聽器能夠實現,或者能夠經過實現接口 CommandLineRunner
@Component public class RocketConsumerListener implements CommandLineRunner { @Autowired private RocketMQConsumer rocketMQConsumer; @Override public void run(String... args) throws Exception { System.out.println("========rocketMQ消費者啓動=========="); rocketMQConsumer.normalSubscribe(); } }
這樣在啓動項目的時候消費者也被啓動。到此springboot和rocketMQ的整合就完成啦。