消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。java
在這裏面,關鍵的部分是「消息傳遞」和「消息排隊」,能夠保證事件的順序性,也能夠在高併發下使用。git
執行過程長,且不須要返回結果的功能,能夠利用MQ傳遞(MQ的異步通訊特徵)github
JMS(Java Message Service),是一套接口規範,在jdk中已定義好接口(相似於JDBC,只有JDBC沒法操做數據庫,須要具體的驅動來實現功能)。web
JMS的傳遞模式很是像觀察者模式的思路:spring
定義對象間的一種一對多的依賴關係,讓多個觀察者同時監聽某一個主題現象,當一個對象的狀態發生改變時,會通知全部觀察者對象,全部依賴於它的對象都獲得通知並被自動更新。數據庫
觀察者模式——https://my.oschina.net/LinkedBear/blog/1791975apache
消息傳遞的方式有兩種:數組
|
|
|
引用文章圖片:http://www.javashuo.com/article/p-gqzlvfbx-p.html架構
|
選用阿里巴巴的RocketMQ(現已被Apache接手),搭建Demo工程併發
參考文檔:http://rocketmq.apache.org/docs/simple-example/
7.1 安裝RocketMQ
參考文章:https://www.jianshu.com/p/4a275e779afa
從Apache的官網上下載運行包 |
配置環境變量 |
依次運行mqnamesrv.cmd腳本和mqbroker.cmd腳本 |
從https://github.com/apache/rocketmq-externals.git下載監控插件,並解壓 |
進入「rocketmq-console\src\main\resources」文件夾,打開「application.properties」進行配置 |
進入「rocketmq-console」文件夾,執行「mvn clean package -Dmaven.test.skip=true」,編譯生成 |
進入「target」文件夾,執行「java -jar rocketmq-console-ng-1.0.0.jar」,啓動「rocketmq-console-ng-1.0.0.jar」(此jar爲SpringBoot項目) |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.linkedbear</groupId> <artifactId>MQ-Demo</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <rocketmq.version>4.3.0</rocketmq.version> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- RocketMQ --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version> </dependency> <!-- 熱部署 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
/** * 生產者Controller * @Title ProducerController * @author LinkedBear * @Time 2018年8月2日 下午3:22:02 */ @Controller public class ProducerController { //此分組名必須保證全局惟一(考慮到負載均衡等後續問題),故封裝爲靜態常量 public static final String PRODUCE_GROUP_NAME = "TestGroup"; //MQ的運行地址 public static final String MQ_IP = "127.0.0.1:9876"; @RequestMapping("/produceMessage") @ResponseBody public Map<String, Object> produceMessage() throws Exception { //1. 建立生產者鏈接(相似於JDBC中的Connection),要傳入MQ的分組名 DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_GROUP_NAME); //2. 設置MQ的運行地址 producer.setNamesrvAddr(MQ_IP); //3. 開啓鏈接 producer.start(); //4. 構造消息(重載方法較多,此處選擇topic, tag, message的三參數方法) Message message = new Message("test_topic", "test_tag", ("test_message。。。" + Math.random()).getBytes()); //5. 發送消息,該方法會返回一個發送結果的對象 SendResult result = producer.send(message); System.out.println(result.getSendStatus()); //6. 關閉鏈接 producer.shutdown(); //此處將發送結果顯示在頁面上,方便查看 Map<String, Object> map = new HashMap<>(); map.put("消息", result.getSendStatus()); return map; } }
/** * 消費者Controller * @Title ConsumerController * @author LinkedBear * @Time 2018年8月2日 下午3:22:11 */ @Controller public class ConsumerController { @RequestMapping("/getMessage") @ResponseBody public void getMessage() throws Exception { //1. 建立消費者鏈接,要傳入MQ的分組名,該分組名在ProducerController中 //此處建立的是pushConsumer,它使用監聽器,給人的感受是消息被推送的 //pullConsumer,取消息的過程須要本身寫 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ProducerController.PRODUCE_GROUP_NAME); //2. 設置MQ的運行地址 consumer.setNamesrvAddr(ProducerController.MQ_IP); //3. 設置消息的提取順序 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //4. 設置消費者接收消息的Topic和Tag,此處對Tag不做限制 consumer.subscribe("test_topic", "*"); //5. 使用監聽器接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt messageExt : msgs) { String message = new String(messageExt.getBody(), "utf-8"); System.out.println("收到消息【主題:" + messageExt.getTopic() + ", 正文:" + message + "】"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { //轉換出現問題,稍後從新發送 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); //6. 啓動消費者 consumer.start(); } }
執行http://localhost:8080/produceMessage:
|
|
執行http://localhost:8080/getMessage: |