初試RocketMQ消息中間件

1. 爲何要用MQ

  1. 在使用SpringCloud或Dubbo進行SOA架構後,不一樣的應用層模塊(web)與業務層模塊(service)要創建調用關係,也就是依賴/耦合
  2. 當模塊變多時,模塊間的耦合度也會逐步上升,這就須要一個解耦工具:消息中間件
  3. 另外,若是某個業務流程分爲不少步,某一步特別耗時間且不穩定,整個業務的穩定性就會受很大影響,這時也須要用消息中間件來分離這些不穩定的業務過程

2. 到底什麼是MQ

消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。java

在這裏面,關鍵的部分是「消息傳遞」和「消息排隊」,能夠保證事件的順序性,也能夠在高併發下使用。git

3. 何時能夠用MQ

執行過程長,且不須要返回結果的功能,能夠利用MQ傳遞(MQ的異步通訊特徵)github

4. MQ與JMS

JMS(Java Message Service),是一套接口規範,在jdk中已定義好接口(相似於JDBC,只有JDBC沒法操做數據庫,須要具體的驅動來實現功能)。web

4.1 JMS預約義的五種消息正文格式

  1. TextMessage(String)——普通文本(用得最多)
  2. MapMessage(Map)——鍵值對集合(用的次多)
  3. ObjectMessage(Serializable Object)——可序列化的對象
  4. BytesMessage(byte[])——字節數組
  5. StreamMessage(Stream)——流數據

4.2 JMS的消息傳遞

JMS的傳遞模式很是像觀察者模式的思路:spring

定義對象間的一種一對多的依賴關係,讓多個觀察者同時監聽某一個主題現象,當一個對象的狀態發生改變時,會通知全部觀察者對象,全部依賴於它的對象都獲得通知並被自動更新。數據庫

觀察者模式——https://my.oschina.net/LinkedBear/blog/1791975apache

消息傳遞的方式有兩種:數組

4.2.1 Queue點對點(生產者與消費者的一對一關係)

4.3.2 Topic發佈-訂閱(生產者與消費者的一對多關係)

5. MQ的工做原理

6. 不一樣MQ之間的對比

引用文章圖片:http://www.javashuo.com/article/p-gqzlvfbx-p.html架構

7. 怎麼用MQ

選用阿里巴巴的RocketMQ(現已被Apache接手),搭建Demo工程併發

參考文檔:http://rocketmq.apache.org/docs/simple-example/

7.1 安裝RocketMQ

參考文章:https://www.jianshu.com/p/4a275e779afa

從Apache的官網上下載運行包

配置環境變量

依次運行mqnamesrv.cmd腳本和mqbroker.cmd腳本

https://github.com/apache/rocketmq-externals.git下載監控插件,並解壓

進入「rocketmq-console\src\main\resources」文件夾,打開「application.properties」進行配置

進入「rocketmq-console」文件夾,執行「mvn clean package -Dmaven.test.skip=true」,編譯生成

進入「target」文件夾,執行「java -jar rocketmq-console-ng-1.0.0.jar」,啓動「rocketmq-console-ng-1.0.0.jar」(此jar爲SpringBoot項目)

7.2 搭建Maven工程框架

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.linkedbear</groupId>
     <artifactId>MQ-Demo</artifactId>
     <version>0.0.1-SNAPSHOT</version>
    
     <properties>
        <rocketmq.version>4.3.0</rocketmq.version>
     </properties>

     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
         <version>2.0.0.RELEASE</version>
     </parent>

     <dependencies>
         <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
         <!-- RocketMQ -->
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
             <version>${rocketmq.version}</version>
         </dependency>
        
         <!-- 熱部署 -->
         <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-devtools</artifactId>
         </dependency>
     </dependencies>

     <build>
         <plugins>
              <plugin>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <configuration>
                       <source>1.8</source>
                       <target>1.8</target>
                   </configuration>
              </plugin>
         </plugins>
     </build>
</project>

7.3 建立工程目錄結構

7.4 生產者Controller

/**
 * 生產者Controller
 * @Title ProducerController
 * @author LinkedBear
 * @Time 2018年8月2日 下午3:22:02
 */
@Controller
public class ProducerController {
    //此分組名必須保證全局惟一(考慮到負載均衡等後續問題),故封裝爲靜態常量
    public static final String PRODUCE_GROUP_NAME = "TestGroup";
    //MQ的運行地址
    public static final String MQ_IP = "127.0.0.1:9876";
    
    @RequestMapping("/produceMessage")
    @ResponseBody
    public Map<String, Object> produceMessage() throws Exception {
        //1. 建立生產者鏈接(相似於JDBC中的Connection),要傳入MQ的分組名
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_GROUP_NAME);
        //2. 設置MQ的運行地址
        producer.setNamesrvAddr(MQ_IP);
        //3. 開啓鏈接
        producer.start();
        
        //4. 構造消息(重載方法較多,此處選擇topic, tag, message的三參數方法)
        Message message = new Message("test_topic", "test_tag", ("test_message。。。" + Math.random()).getBytes());
        //5. 發送消息,該方法會返回一個發送結果的對象
        SendResult result = producer.send(message);
        System.out.println(result.getSendStatus());
        //6. 關閉鏈接
        producer.shutdown();
        
        //此處將發送結果顯示在頁面上,方便查看
        Map<String, Object> map = new HashMap<>();
        map.put("消息", result.getSendStatus());
        return map;
    } 
}

7.5 消費者Controller

/**
 * 消費者Controller
 * @Title ConsumerController
 * @author LinkedBear
 * @Time 2018年8月2日 下午3:22:11
 */
@Controller
public class ConsumerController {
    @RequestMapping("/getMessage")
    @ResponseBody
    public void getMessage() throws Exception {
        //1. 建立消費者鏈接,要傳入MQ的分組名,該分組名在ProducerController中
        //此處建立的是pushConsumer,它使用監聽器,給人的感受是消息被推送的
        //pullConsumer,取消息的過程須要本身寫      
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ProducerController.PRODUCE_GROUP_NAME);
        //2. 設置MQ的運行地址
        consumer.setNamesrvAddr(ProducerController.MQ_IP);
        //3. 設置消息的提取順序
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //4. 設置消費者接收消息的Topic和Tag,此處對Tag不做限制
        consumer.subscribe("test_topic", "*");
        
        //5. 使用監聽器接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt messageExt : msgs) {
                        String message = new String(messageExt.getBody(), "utf-8");
                        System.out.println("收到消息【主題:" + messageExt.getTopic() + ", 正文:" + message + "】");
                    }
                    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    //轉換出現問題,稍後從新發送
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        
        //6. 啓動消費者
        consumer.start();
    }
}

7.6 測試運行

執行http://localhost:8080/produceMessage

 

執行http://localhost:8080/getMessage

相關文章
相關標籤/搜索