手把手帶你瞭解消息中間件(3)——RocketMQ

1、RocketMQ簡介

  RocketMQ做爲一款純java、分佈式、隊列模型的開源消息中間件,支持事務消息、順序消息、批量消息、定時消息、消息回溯等。html

2、RocketMQ架構

  如圖所示爲RocketMQ基本的部署結構,主要分爲NameServer集羣、Broker集羣、Producer集羣和Consumer集羣四個部分。java

  Broker在啓動的時候會去向NameServer註冊而且定時發送心跳,Producer在啓動的時候會到NameServer上去拉取Topic所屬的Broker具體地址,而後向具體的Broker發送消息git

一、NameServer

  NameServer的做用是Broker的註冊中心。github

  每一個NameServer節點互相之間是獨立的,沒有任何信息交互,也就不存在任何的選主或者主從切換之類的問題,所以NameServer是很輕量級的。單個NameServer節點中存儲了活躍的Broker列表(包括master和slave),這裏活躍的定義是與NameServer保持有心跳。web

二、Topic、Tag、Queue、GroupName

  Topic 與 Tag 都是業務上用來歸類的標識,區分在於 Topic 是一級分類,而 Tag 能夠理解爲是二級分類spring

1) Topic(話題)

  Topic是生產者在發送消息和消費者在拉取消息的類別。Topic與生產者和消費者之間的關係很是鬆散。一個生產者能夠發送不一樣類型Topic的消息。消費者組能夠訂閱一個或多個主題,只要該組的實例保持其訂閱一致便可。數據庫

  咱們能夠理解爲Topic是第一級消息類型,好比一個電商系統的消息能夠分爲:交易消息、物流消息等,一條消息必須有一個Topic。apache

2) Tag(標籤)

  意思就是子主題,爲用戶提供了額外的靈活性。有了標籤,方便RocketMQ提供的查詢功能。數組

  能夠理解爲第二級消息類型,交易建立消息,交易完成消息..... 一條消息能夠沒有Tag服務器

3) Queue(隊列)

  一個topic下,能夠設置多個queue(消息隊列),默認4個隊列。當咱們發送消息時,須要要指定該消息的topic。

  RocketMQ會輪詢該topic下的全部隊列,將消息發送出去。

  在 RocketMQ 中,全部消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指隊列中的每一個存儲單元都是定長,訪問其中的存儲單元使用 Offset 來訪問,offset 爲 java long 類型,64 位,理論上在 100年內不會溢出,因此認爲是長度無限。

  也能夠認爲 Message Queue 是一個長度無限的數組,Offset 就是下標。

4) groupName(組名稱)

  RocketMQ中也有組的概念。表明具備相同角色的生產者組合或消費者組合,稱爲生產者組或消費者組。

  做用是在集羣HA的狀況下,一個生產者down以後,本地事務回滾後,能夠繼續聯繫該組下的另一個生產者實例,不至於致使業務走不下去。在消費者組中,能夠實現消息消費的負載均衡和消息容錯目標。

  有了GroupName,在集羣下,動態擴展容量很方便。只須要在新加的機器中,配置相同的GroupName。啓動後,就當即能加入到所在的羣組中,參與消息生產或消費。

三、Broker-存放消息

  Broker是具體提供業務的服務器,單個Broker節點與全部的NameServer節點保持長鏈接及心跳,定時(每隔30s)註冊Topic信息到全部Name Server。Name Server定時(每隔10s)掃描全部存活broker的鏈接,若是Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的鏈接。底層的通訊和鏈接都是基於Netty實現的。

  負載均衡:Broker上存Topic信息,Topic由多個隊列組成,隊列會平均分散在多個Broker上,會自動輪詢當前全部可發送的broker ,儘可能平均分佈到全部隊列中,最終效果就是全部消息都平均落在每一個Broker上。

  高可用:Broker中分masterslave兩種角色,每一個master能夠對應多個slave,但一個slave只能對應一個master,master和slave經過指定相同的Brokername組成,其中不一樣的BrokerId==0 是master,非0是slave。

  高可靠併發讀寫服務:master和slave之間的同步方式分爲同步雙寫和異步複製,異步複製方式master和slave之間雖然會存在少許的延遲,但性能較同步雙寫方式要高出10%左右。

Topic、Broker、queue三者間的關係

四、Producer-生產消息

1) 與nameserver的關係

  單個Producer和一臺NameServer節點(隨機選擇)保持長鏈接,定時查詢topic配置信息,若是該NameServer掛掉,生產者會自動鏈接下一個NameServer,直到有可用鏈接爲止,並能自動重連。與NameServer之間沒有心跳。

2) 與broker的關係

  單個Producer和與其關聯的全部broker保持長鏈接,並維持心跳。默認狀況下消息發送採用輪詢方式,會均勻發到對應Topic的全部queue中。

五、Consumer-消費消息

1) 與nameserver的關係

  單個Consumer和一臺NameServer保持長鏈接,定時查詢topic配置信息,若是該NameServer掛掉,消費者會自動鏈接下一個NameServer,直到有可用鏈接爲止,並能自動重連。與NameServer之間沒有心跳。

2) 與broker的關係

  單個Consumer和與其關聯的全部broker保持長鏈接,並維持心跳,失去心跳後,則關閉鏈接,並向該消費者分組的全部消費者發出通知,分組內消費者從新分配隊列繼續消費。

5.1 消費者類型
  • 1) pull consume   Consumer 的一種,應用一般經過 Consumer 對象註冊一個 Listener 接口,一旦收到消息,Consumer 對象馬上回調 Listener 接口方法,相似於activemq的方式
  • 2) push consume   Consumer 的一種,應用一般主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由應用控制
5.2 消費模式
  • 1) 集羣模式

  在默認狀況下,就是集羣消費,此時消息發出去後將只有一個消費者能獲取消息。

  • 2) 廣播模式

  廣播消費,一條消息被多個Consumer消費。消息會發給Consume Group中的全部消費者進行消費。

3、RocketMQ的特性

一、消息順序

  消息的順序指的是消息消費時,能按照發送的順序來消費。

  RocketMQ是經過將「相同ID的消息發送到同一個隊列,而一個隊列的消息只由一個消費者處理「來實現順序消息

二、消息重複

1) 消息重複的緣由

  消息領域有一個對消息投遞的QoS(服務質量)定義,分爲:最多一次(At most once)、至少一次(At least once)、僅一次( Exactly once)。

  MQ產品都聲稱本身作到了At least once。既然是至少一次,就有可能發生消息重複。

  有不少緣由致使,好比:網絡緣由閃斷,ACK返回失敗等等故障,確認信息沒有傳送到消息隊列,致使消息隊列不知道本身已經消費過該消息了,再次將該消息分發給其餘的消費者

  不一樣的消息隊列發送的確認信息形式不一樣:RocketMQ返回一個CONSUME_SUCCESS成功標誌,RabbitMQ是發送一個ACK確認消息

2) 消息去重
  • 1) 去重原則:使用業務端邏輯保持冪等性

  冪等性:就是用戶對於同一操做發起的一次請求或者屢次請求的結果是一致的,不會由於屢次點擊而產生了反作用,數據庫的結果都是惟一的,不可變的。

  • 2) 只要保持冪等性,無論來多少條重複消息,最後處理的結果都同樣,須要業務端來實現。

  去重策略:保證每條消息都有惟一編號(好比惟一流水號),且保證消息處理成功與去重表的日誌同時出現。

4、RocketMQ的應用場景

一、削峯填谷

  好比如秒殺等大型活動時會帶來較高的流量脈衝,若是沒作相應的保護,將致使系統超負荷甚至崩潰。若是因限制太過致使請求大量失敗而影響用戶體驗,能夠利用MQ 超高性能的消息處理能力來解決。

二、異步解耦

  經過上、下游業務系統的鬆耦合設計,好比:交易系統的下游子系統(如積分等)出現不可用甚至宕機,都不會影響到核心交易系統的正常運轉。

三、順序消息

  FIFO原理相似,MQ提供的順序消息即保證消息的先進先出,能夠應用於交易系統中的訂單建立、支付、退款等流程。

四、分佈式事務消息

  好比阿里的交易系統、支付紅包等場景須要確保數據的最終一致性,須要引入 MQ 的分佈式事務,既實現了系統之間的解耦,又能夠保證最終的數據一致性。

5、RocketMQ集羣部署方式

一、單Mater模式

  優勢:配置簡單,方便部署

  缺點:風險較大,一旦Broker重啓或者宕機,會致使整個服務不可用

二、多Master模式

  一個集羣無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master

  優勢:配置簡單,單個Master宕機重啓對應用沒有影響。消息不會丟失

  缺點:單臺機器宕機期間,這臺機器上沒有被消費的消息在恢復以前不可訂閱,消息實時性會受到影響。

三、多Master多Slave模式(異步)

  每一個Master配置一個Slave,採用異步複製方式,主備有短暫消息延遲

  優勢:由於Master 宕機後,消費者仍然能夠從 Slave消費,此過程對應用透明。不須要人工干預。性能同多 Master 模式幾乎同樣。

  缺點:Master宕機後,會丟失少許信息

四、多Master多Slave模式(同步)

  每一個Master配置一個Slave,採用同步雙寫方式,只有主和備都寫成功,才返回成功

  優勢:數據與服務都無單點, Master宕機狀況下,消息無延遲,服務可用性與數據可用性都很是高

  缺點:性能比異步複製模式略低,大約低 10%左右,發送單個消息的 RT會略高。目前主宕機後,備機不能自動切換爲主機,後續會支持自動切換功能

6、RocketMQ的消息類型

消息發送步驟:

消息消費步驟:

  建立一個maven工程,導入依賴

<dependencies>
        <!--rocket-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
        <dependency>
       <!--順序消息中,模擬了一個消息集合,加入了lombok-->
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
        </dependency>
    </dependencies>

一、普通消息

<details> <summary>點擊查看生產者代碼</summary> ```java /** * 普通消息生產者 */ public class Producer {

public static void main(String[] args) throws Exception {

// 建立一個消息發送入口對象,主要用於消息發送,指定生產者組 DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); // 設置NameServe地址,若是是集羣環境,用分號隔開 producer.setNamesrvAddr("127.0.0.1:9876"); // 啓動並建立消息發送組件 producer.start(); // topic的名字 String topic = "rocketDemo1"; // 標籤名 String taget = "tag"; // 要發送的數據 String body = "hello,RocketMq"; Message message = new Message(topic,taget,body.getBytes()); // 發送消息 SendResult result = producer.send(message); System.out.println(result); // 關閉消息發送對象 producer.shutdown(); } }

</details>
 <details>
<summary>點擊查看消費者代碼</summary>
```java
/**
 * 普通消息消費者
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
//        建立一個消費管理對象,並建立消費者組名字
        DefaultMQPushConsumer consumerGroup = new DefaultMQPushConsumer("ConsumerGroup");
//        設置NameServer地址,若是是集羣環境,用逗號分隔
        consumerGroup.setNamesrvAddr("127.0.0.1:9876");
//        設置要讀取的消息主題和標籤
        consumerGroup.subscribe("rocketDemo1", "*");
//      設置回調函數,處理消息
        //注意:MessageListenerConcurrently     -- 並行消費監聽
        consumerGroup.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    //讀取消息記錄
                    for (MessageExt messageExt : msgs) {
                        //獲取消息主題
                        String topic = messageExt.getTopic();
                        //獲取消息標籤
                        String tags = messageExt.getTags();
                        //獲取消息體內容
                        String body = new String(messageExt.getBody(), "UTF-8");
                        System.out.println("topic:" + topic + ",tags:" + tags + ",body:" + body);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //返回消費成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
//      運行消息消費對象
        consumerGroup.start();
    }
}

</details> #### 二、順序消息 &emsp;&emsp;消息有序指的是能夠按照消息的發送順序來消費。RocketMQ是經過將「相同ID的消息發送到同一個隊列,而一個隊列的消息只由一個消費者處理「來實現順序消息 。 **如何保證順序** - 1) 消息被髮送時保持順序:發送時保持順序意味着對於有順序要求的消息,用戶應該在同一個線程中採用同步的方式發送。 - 2) 消息被存儲時保持和發送的順序一致:存儲保持和發送的順序一致則要求在同一線程中被髮送出來的消息A和B,存儲時在空間上A必定在B以前。 - 3) 消息被消費時保持和存儲的順序一致:消費保持和存儲一致則要求消息A、B到達Consumer以後必須按照先A後B的順序被處理。

<details> <summary>點擊查看模擬消息代碼</summary> ```java /** * 模擬消息 */ @Data @AllArgsConstructor @NoArgsConstructor public class Order {

private Long orderId;
private String desc;

public static List<Order> buildOrders(){
    List<Order> list = new ArrayList<Order>();
    Order order1001a = new Order(1001L,"建立");
    Order order1004a = new Order(1004L,"建立");
    Order order1006a = new Order(1006L,"建立");
    Order order1009a = new Order(1009L,"建立");
    list.add(order1001a);
    list.add(order1004a);
    list.add(order1006a);
    list.add(order1009a);
    Order order1001b = new Order(1001L,"付款");
    Order order1004b = new Order(1004L,"付款");
    Order order1006b = new Order(1006L,"付款");
    Order order1009b = new Order(1009L,"付款");
    list.add(order1001b);
    list.add(order1004b);
    list.add(order1006b);
    list.add(order1009b);
    Order order1001c = new Order(1001L,"完成");
    Order order1006c = new Order(1006L,"完成");
    list.add(order1001c);
    list.add(order1006c);
    return list;
}

}

</details>
 <details>
<summary>點擊查看生產者代碼</summary>
```java
/**
 * Producer端確保消息順序惟一要作的事情就是將消息路由到特定的隊列,
 * 在RocketMQ中,經過MessageQueueSelector來實現分區的選擇
 */
public class ProducerOrder {
        //nameserver地址
        private static String namesrvaddress="127.0.0.1:9876;";

        public static void main(String[] args) throws Exception {
            //建立DefaultMQProducer
            DefaultMQProducer producer = new DefaultMQProducer("order_producer_name");
            //設置namesrv地址
            producer.setNamesrvAddr(namesrvaddress);
            //啓動Producer
            producer.start();
            List<Order> orderList = Order.buildOrders();
            for (Order order : orderList) {
                String body = order.toString();
                //建立消息
                Message message = new Message("orderTopic","order",body.getBytes());
                //發送消息
                SendResult sendResult = producer.send(
                        message,
                        new MessageQueueSelector() {
                            /**
                             *
                             * @param mqs topic中的隊列集合
                             * @param msg 消息對象
                             * @param arg 業務參數
                             * @return
                             */
                            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                //參數是訂單id號
                                Long orderId = (Long) arg;
                                //肯定選擇的隊列的索引
                                long index = orderId % mqs.size();
                                return mqs.get((int) index);
                            }
                        },
                        order.getOrderId());
                System.out.println("發送結果="+sendResult);
            }
            //關閉Producer
            producer.shutdown();
        }
    }

</details> <details> <summary>點擊查看消費者代碼</summary> ```java /** * 消費者端實現MessageListenerOrderly介口監聽消息來實現順序消息 */ public class ConsumerOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); //從第一個開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("orderTopic","*");
	//MessageListenerOrderly 順序消費
    consumer.registerMessageListener(new MessageListenerOrderly() {
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("當前線程:"+Thread.currentThread().getName()+",接收消息:"+new String(msg.getBody()));
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer started.%n");
}

}

</details>
#### 三、延遲消息
&emsp;&emsp;RocketMQ 支持定時(延遲)消息,可是不支持任意時間精度,僅支持特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。  
  
&emsp;&emsp;延遲消息能夠在生產者中直接設置,也能夠在rocketmq的配置文件broker.conf中配置:messageDelayLevel=1s|5s|1m|2m|1h|2h......
 <details>
<summary>點擊查看生產者代碼</summary>
```java
/**
 * 延遲消息 生產者
 */
public class ProducerDelay {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
        //設置nameserver
        producer.setNamesrvAddr("127.0.0.1:9876");
        //生產者開啓
        producer.start();
        //建立消息對象
        Message message = new Message("delayTopic","delay","hello world".getBytes());
        //設置延遲時間級別
        message.setDelayTimeLevel(2);
        //發送消息
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);
        //生產者關閉
        producer.shutdown();
    }
}

</details> <details> <summary>點擊查看消費者代碼</summary> ```java /** * 延遲消息 消費者 */ public class ConsumerDelay {

public static void main(String[] args) throws Exception {
    //建立消費者對象
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
    //設置nameserver
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //設置主題和tag
    consumer.subscribe("delayTopic","*");
    //註冊消息監聽
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("消息ID:"+msg.getMsgId()+"發送時間:"+new Date(msg.getStoreTimestamp())+",延遲時間:"+(System.currentTimeMillis()-msg.getStoreTimestamp()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //開啓消費者
    consumer.start();
    System.out.println("消費者啓動");
}

}

</details>
#### 四、批量發送消息
 <details>
<summary>點擊查看生產者代碼</summary>
```java
/**
 * 批量 生產者
 */
public class ProducerBatch {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
        //設置nameserver
        producer.setNamesrvAddr("127.0.0.1:9876");
        //生產者開啓
        producer.start();
        //建立消息對象  集合
        String topic = "batchTopic";
        String tag = "batch";
        List<Message> messageList = new ArrayList<Message>();
        Message message1 = new Message(topic,tag,"hello world1".getBytes());
        Message message2 = new Message(topic,tag,"hello world2".getBytes());
        Message message3 = new Message(topic,tag,"hello world3".getBytes());
        messageList.add(message1);
        messageList.add(message2);
        messageList.add(message3);
        //發送消息
        SendResult sendResult = producer.send(messageList);
        System.out.println(sendResult);
        //生產者關閉
        producer.shutdown();
    }
}

</details> <details> <summary>點擊查看消費者代碼</summary> ```java /** * 批量消費者 */ public class ConsumerBatch { public static void main(String[] args) throws Exception { //建立消費者對象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); //設置nameserver consumer.setNamesrvAddr("127.0.0.1:9876"); //設置主題和tag consumer.subscribe("batchTopic","*"); //註冊消息監聽 consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("消息ID:"+msg.getMsgId());
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //開啓消費者
    consumer.start();
    System.out.println("消費者啓動");
}

}

</details>
#### 五、廣播消息
&emsp;&emsp;rocketmq默認採用的是集羣消費,咱們想要使用廣播消費,只需在消費者中加入`consumer.setMessageModel(MessageModel.BROADCASTING)`這段配置,`MessageModel.CLUSTERING`爲集羣模式,是默認的;
 <details>
<summary>點擊查看生產者代碼</summary>
```java
/**
 * 生產者
 */
public class ProducerBroadcast {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
        //設置nameserver
        producer.setNamesrvAddr("127.0.0.1:9876");
        //生產者開啓
        producer.start();
        //建立消息對象  集合
        String topic = "broadcastTopic";
        String tag = "broad";
        List<Message> messageList = new ArrayList<Message>();
        Message message1 = new Message(topic,tag,"hello world1".getBytes());
        Message message2 = new Message(topic,tag,"hello world2".getBytes());
        Message message3 = new Message(topic,tag,"hello world3".getBytes());
        messageList.add(message1);
        messageList.add(message2);
        messageList.add(message3);
        //發送消息
        SendResult sendResult = producer.send(messageList);
        System.out.println(sendResult);
        //生產者關閉
        producer.shutdown();
    }
}

</details> <details> <summary>點擊查看消費者1代碼</summary> ```java /** * 消費者1 */ public class ConsumerBroadcast1 {

public static void main(String[] args) throws Exception {
    //建立消費者對象
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
    //設置nameserver
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //設置主題和tag
    consumer.subscribe("broadcastTopic","*");
    //設置消息模式 爲 廣播模式
    consumer.setMessageModel(MessageModel.BROADCASTING);
    //註冊消息監聽
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("消費者1:消息ID:"+msg.getMsgId()+",內容"+new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //開啓消費者
    consumer.start();
    System.out.println("消費者1啓動");
}

}

</details>
 <details>
<summary>點擊查看消費者2代碼</summary>
```java
/**
 * 消費者2
 */
public class ConsumerBroadcast2 {

    public static void main(String[] args) throws Exception {
        //建立消費者對象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
        //設置nameserver
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //設置主題和tag
        consumer.subscribe("broadcastTopic","*");
        //設置消息模式 爲 廣播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //註冊消息監聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消費者2:消息ID:"+msg.getMsgId()+",內容"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //開啓消費者
        consumer.start();
        System.out.println("消費者2啓動");
    }
}

</details> ### 7、SpringBoot整合RocketMQ &emsp;&emsp;建立一個maven工程,導入依賴 ```java <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.2.1.RELEASE</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>2.2.1.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.22</version> </dependency> </dependencies> ``` <details> <summary>點擊查看模擬消息代碼</summary> ```java /** * 模擬消息 */ @Data @AllArgsConstructor @NoArgsConstructor public class Order {

private Long orderId;
private String desc;

public static List<Order> buildOrders(){
    List<Order> list = new ArrayList<Order>();
    Order order1001a = new Order(1001L,"1001建立");
    Order order1004a = new Order(1004L,"1004建立");
    Order order1006a = new Order(1006L,"1006建立");
    Order order1009a = new Order(1009L,"1009建立");
    list.add(order1001a);
    list.add(order1004a);
    list.add(order1006a);
    list.add(order1009a);
    Order order1001b = new Order(1001L,"1001付款");
    Order order1004b = new Order(1004L,"1004付款");
    Order order1006b = new Order(1006L,"1006付款");
    Order order1009b = new Order(1009L,"1009付款");
    list.add(order1001b);
    list.add(order1004b);
    list.add(order1006b);
    list.add(order1009b);
    Order order1001c = new Order(1001L,"1001完成");
    Order order1006c = new Order(1006L,"1006完成");
    list.add(order1001c);
    list.add(order1006c);
    return list;
}

}

</details>
 <details>
<summary>點擊查看消息生產者代碼</summary>
```java
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMQTest {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 普通消息生產者
     */
    @Test
    public void testSend(){
        rocketMQTemplate.convertAndSend("testTopic","這是測試消息!");
    }
    
    /**
     * 延遲消息生產者
     */
    @Test
    public void testDelaySend(){
        SendResult sendResult = rocketMQTemplate.syncSend("testTopic",
                new GenericMessage("這是延遲測試消息!"+new Date()),
                10000,
                4);
        log.info("sendResult=="+sendResult);
    }

    /**
     * 順序消息 生產者
     */
    @Test
    public void testOrderlySend(){
        List<Order> orderList = Order.buildOrders();
        for (Order order : orderList) {
            //發送消息
            rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {

                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    //參數是訂單id號
                    Long orderId = Long.valueOf((String)arg);
                    //肯定選擇的隊列的索引
                    long index = orderId % mqs.size();
                    log.info("mqs is ::" + mqs.get((int) index));
                    return mqs.get((int) index);
                }
            });
            SendResult sendOrderly = rocketMQTemplate.syncSendOrderly("testTopicOrderLy",
                    new GenericMessage<String>(order.toString()), order.getOrderId().toString());
            log.info("發送結果="+sendOrderly+",orderid :"+order.getOrderId());
        }
    }
}

</details> <details> <summary>點擊查看普通|延遲消費者代碼</summary> ```java /** * 普通、延遲消息 消費者代碼 */ @Component @RocketMQMessageListener(consumerGroup = "myConsumer", topic = "testTopic") public class RocketConsumer implements RocketMQListener<String> {

public void onMessage(String message) {
    System.out.println("接收到消息:="+message);
}

}

</details>
 <details>
<summary>點擊查看順序消費者代碼</summary>
```java
/**
 * 順序消息 ,消費者
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "myConsumerOrderly", topic = "testTopicOrderLy",consumeMode = ConsumeMode.ORDERLY)
public class RocketConsumerOrderly implements RocketMQListener<String> {
    
    public void onMessage(String message) {
       log.info("當前線程:"+Thread.currentThread().getName()+",接收到消息:="+message);
    }
}

</details> ### 8、RocketMQ的安裝配置 ##### 一、配置系統環境變量;計算機/屬性/高級系統設置/環境變量/系統變量,新建系統變量ROCKETMQ_HOME=RocketMQ安裝路徑 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109132659277-1403218980.png) ##### 二、進入RocketMQ安裝目錄的bin目錄下,右鍵用記事本打開修改runserver.cmd文件 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109135526592-344591921.png) ##### 三、修改runbroker.cmd文件 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109135534570-287023226.png) ##### 四、cmd進入到MQ/bin目錄下啓動 ```java 1.啓動mqnamesrv.cmd start mqnamesrv.cmd ``` 成功的彈窗,此框勿關閉。 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109134804446-1433923488.png) ```java 2.啓動mqbroker.cmd start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true ``` 成功的彈窗,此框勿關閉。 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109134835168-2104617731.png) 注意:假如彈出提示框提示‘錯誤: 找不到或沒法加載主類 xxxxxx’。打開runbroker.cmd,而後將‘%CLASSPATH%’加上英文雙引號。保存並從新執行start語句。 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109135553489-368231230.png) ##### 五、下載RocketMQ的可視化插件 - 1) 下載地址: https://github.com/apache/rocketmq-externals/releases

  • 2) 修改rocketmq-console\src\main\resources\application.properties,修改以下:

  • 3) cmd窗口執行:mvn clean package -Dmaven.test.skip=true

  • 4) jar包運行:java -jar rocketmq-console-ng-1.0.0.jar

  • 5) 測試輸入地址: http://127.0.0.1:8080/#/ops

原文出處:https://www.cnblogs.com/kaischoolmate/p/12147205.html

相關文章
相關標籤/搜索