微服務異步架構—MQ之RocketMQ

「咱們你們都知道把一個微服務架構變成一個異步架構只須要加一個MQ,如今市面上有不少MQ的開源框架。到底選擇哪個MQ的開源框架才合適呢?」html

1

什麼是MQ?MQ的原理是什麼?

MQ就是消息隊列,是Message Queue的縮寫。消息隊列是一種通訊方式。消息的本質就是一種數據結構。由於MQ把項目中的消息集中式的處理和存儲,因此MQ主要有解耦,併發,和削峯的功能。bash

1,解耦:

MQ的消息生產者和消費者互相不關心對方是否存在,經過MQ這個中間件的存在,使整個系統達到解耦的做用。服務器

若是服務之間用RPC通訊,當一個服務跟幾百個服務通訊時,若是那個服務的通訊接口改變,那麼幾百個服務的通訊接口都的跟着變更,這是很是頭疼的一件事。數據結構

可是採用MQ以後,不論是生產者或者消費者均可以單獨改變本身。他們的改變不會影響到別的服務。從而達到解耦的目的。爲何要解耦呢?說白了就是方便,減小沒必要要的工做量。架構

2,併發

MQ有生產者集羣和消費者集羣,因此客戶端是億級用戶時,他們都是並行的。從而大大提高響應速度。併發

3,削峯

由於MQ能存儲的消息量很大,因此他能夠把大量的消息請求先存下了,而後再併發的方式慢慢處理。框架

若是採用RPC通訊,每一次請求用調用RPC接口,當請求量巨大的時候,由於RPC的請求是很耗資源的,因此巨大的請求必定會壓垮服務器。異步

削峯的目的是用戶體驗變好,而且使整個系統穩定。能承受大量請求消息。分佈式

2

如今市面上有什麼MQ,

重點介紹RocketMQ

如今市面上的MQ有不少,主要有RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ,Kafka等等,這些都是開源的MQ產品。之前不少人推薦使用RabbitMQ,他也是很是好用的MQ產品,這裏不作過多的介紹。Kafka也是高吞吐量的老大,咱們這裏也不介紹。ide

咱們重點介紹一下RocketMQ,RocketMQ是阿里巴巴在2012年開源的分佈式消息中間件,目前已經捐贈給Apache軟件基金會,並於並於2017年9月25日成爲 Apache 的頂級項目。

做爲經歷過屢次阿里巴巴雙十一這種「超級工程」的洗禮並有穩定出色表現的國產中間件,以其高性能、低延時和高可靠等特性近年來已經也被愈來愈多的國內企業使用。

功能概覽圖

能夠看見RocketMQ支持定時和延時消息,這是RabbitMQ所沒有的能力。

RocketMQ的物理結構

從這裏能夠看出,RocketMQ涉及到四大集羣,producer,Name Server,Consumer,Broker。

Producer集羣:

是生產者集羣,負責產生消息,向消費者發送由業務應用程序系統生成的消息,RocketMQ提供三種方式發送消息:同步,異步,單向。

一,普通消息

1,同步原理圖

同步消息關鍵代碼

try {
        SendResult sendResult = producer.send(msg);
        // 同步發送消息,只要不拋異常就是成功
        if (sendResult != null) {
        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());

    }
    catch (Exception e) {

        System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
        e.printStackTrace();
    }
}複製代碼

2,異步原理圖

異步消息關鍵代碼

producer.sendAsync(msg, new SendCallback() {

@Override
public void onSuccess(final SendResult sendResult) {

       // 消費發送成功
      System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());

 }
 
@Override
public void onException(OnExceptionContext context) { 

   System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());

}
});複製代碼

3,單向(Oneway)發送原理圖

單向只發送,不等待返回,因此速度最快,通常在微秒級,但可能丟失

單向(Oneway)發送消息關鍵代碼

producer.sendOneway(msg);複製代碼

三種發送消息具體代碼請參考文檔:https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.6.566.7e49793fuueSlB

二,定時消息和延時消息

發送定時消息關鍵代碼

try {

     // 定時消息,單位毫秒(ms),在指定時間戳(當前時間以後)進行投遞,例如 2016-03-07 16:21:00 投遞。若是被設置成當前時間戳以前的某個時刻,消息將馬上投遞給消費者。
    long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
    msg.setStartDeliverTime(timeStamp);

    // 發送消息,只要不拋異常就是成功
    SendResult sendResult = producer.send(msg);
    System.out.println("MessageId:"+sendResult.getMessageId());

}
catch (Exception e) {

    // 消息發送失敗,須要進行重試處理,可從新發送這條消息或持久化這條數據進行補償處理
    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());

    e.printStackTrace();

 }複製代碼

發送延時消息關鍵代碼

try {
   
    // 延時消息,單位毫秒(ms),在指定延遲時間(當前時間以後)進行投遞,例如消息在 3 秒後投遞
    long delayTime = System.currentTimeMillis() + 3000;

    // 設置消息須要被投遞的時間 msg.setStartDeliverTime(delayTime);
     SendResult sendResult = producer.send(msg);
     // 同步發送消息,只要不拋異常就是成功
     if (sendResult != null) {
        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
      }
      
} catch (Exception e) {
   
   // 消息發送失敗,須要進行重試處理,可從新發送這條消息或持久化這條數據進行補償處理
    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());

    e.printStackTrace();

 }複製代碼

注意事項

1,定時和延時消息的 msg.setStartDeliverTime 參數須要設置成當前時間戳以後的某個時刻(單位毫秒)。若是被設置成當前時間戳以前的某個時刻,消息將馬上投遞給消費者。

2,定時和延時消息的 msg.setStartDeliverTime 參數可設置40天內的任什麼時候刻(單位毫秒),超過40天消息發送將失敗。

3,StartDeliverTime 是服務端開始向消費端投遞的時間。 若是消費者當前有消息堆積,那麼定時和延時消息會排在堆積消息後面,將不能嚴格按照配置的時間進行投遞。

4,因爲客戶端和服務端可能存在時間差,消息的實際投遞時間與客戶端設置的投遞時間之間可能存在誤差。

5,設置定時和延時消息的投遞時間後,依然受 3 天的消息保存時長限制。例如,設置定時消息 5 天后才能被消費,若是第 5 天后一直沒被消費,那麼這條消息將在第8天被刪除。

6,除 Java 語言支持延時消息外,其餘語言都不支持延時消息。

發佈消息原理圖

三,事務消息

RocketMQ提供相似X/Open XA的分佈式事務功能來確保業務發送方和MQ消息的最終一致性,其本質是經過半消息的方式把分佈式事務放在MQ端來處理。

原理圖

其中:

​ 1,發送方向消息隊列 RocketMQ 服務端發送消息。

​ 2,服務端將消息持久化成功以後,向發送方 ACK 確認消息已經發送成功,此時消息爲半消息。

​ 3,發送方開始執行本地事務邏輯。

​ 4,發送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態則將半消息標記爲可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。

​ 5,在斷網或者是應用重啓的特殊狀況下,上述步驟 4 提交的二次確認最終未到達服務端,通過固定時間後服務端將對該消息發起消息回查。

​ 6,發送方收到消息回查後,須要檢查對應消息的本地事務執行的最終結果。

​ 7,發送方根據檢查獲得的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟 4 對半消息進行操做。

RocketMQ的半消息機制的注意事項是

1,根據第六步能夠看出他要求發送方提供業務回查接口。

2,不能保證發送方的消息冪等,在ack沒有返回的狀況下,可能存在重複消息

3,消費方要作冪等處理。

核心代碼

final BusinessService businessService = new BusinessService(); // 本地業務

TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
new LocalTransactionCheckerImpl());

producer.start();

Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());

try {
    
    SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
    
    @Override
    public TransactionStatus execute(Message msg, Object arg) {
    
        // 消息 ID(有可能消息體同樣,但消息 ID 不同,當前消息 ID 在控制檯沒法查詢)
        String msgId = msg.getMsgID();
        
        // 消息體內容進行 crc32,也可使用其它的如 MD5
        long crc32Id = HashUtil.crc32Code(msg.getBody());
       
        // 消息 ID 和 crc32id 主要是用來防止消息重複
        // 若是業務自己是冪等的,能夠忽略,不然須要利用 msgId 或 crc32Id 來作冪等
        // 若是要求消息絕對不重複,推薦作法是對消息體 body 使用 crc32 或 MD5 來防止重複消息
        Object businessServiceArgs = new Object();
        
        TransactionStatus transactionStatus =TransactionStatus.Unknow;
        
        try {
        
        boolean isCommit = businessService.execbusinessService(businessServiceArgs);
        
        if (isCommit) {
        
        // 本地事務成功則提交消息 transactionStatus = TransactionStatus.CommitTransaction;
        
        } else {
        
        // 本地事務失敗則回滾消息 transactionStatus = TransactionStatus.RollbackTransaction;
        
        }
        
        } catch (Exception e) {log.error("Message Id:{}", msgId, e);
        
        }
        
        System.out.println(msg.getMsgID());log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
         
         return transactionStatus;
    }
    }, null);
    }

catch (Exception e) {
  
  // 消息發送失敗,須要進行重試處理,可從新發送這條消息或持久化這條數據進行補償處理
   System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());

   e.printStackTrace();

}複製代碼

具體代碼參考文檔:https://help.aliyun.com/document_detail/29548.html?spm=a2c4g.11186623.6.570.5d5738a49FJl1t

全部消息發佈原理圖

producer徹底無狀態,能夠集羣部署。

Name Server集羣:

NameServer是一個幾乎無狀態的節點,可集羣部署,節點之間無任何信息同步,NameServer很像註冊中心的功能。

據說阿里以前的NameServer 是用ZooKeeper作的,可能由於Zookeeper不能知足大規模併發的要求,因此以後NameServer 是阿里自研的。

NameServer其實就是一個路由表,他管理Producer和Comsumer之間的發現和註冊。

Broker集羣:

Broker部署相對複雜,Broker分爲Master與Slave,一個Master能夠對應多個Slaver,可是一個Slaver只能對應一個Master,Master與Slaver的對應關係經過指定相同的BrokerName。

不一樣的BrokerId來定義,BrokerId爲0表示Master,非0表示Slaver。Master能夠部署多個。每一個Broker與NameServer集羣中的全部節點創建長鏈接,定時註冊Topic信息到全部的NameServer。

Consumer集羣:

訂閱方式

消息隊列 RocketMQ 支持如下兩種訂閱方式:

集羣訂閱:同一個 Group ID 所標識的全部 Consumer 平均分攤消費消息。 例如某個 Topic 有 9 條消息,一個 Group ID 有 3 個 Consumer 實例,那麼在集羣消費模式下每一個實例平均分攤,只消費其中的 3 條消息。

// 集羣訂閱方式設置(不設置的狀況下,默認爲集羣訂閱方式)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);複製代碼

廣播訂閱:同一個 Group ID 所標識的全部 Consumer 都會各自消費某條消息一次。 例如某個 Topic 有 9 條消息,一個 Group ID 有 3 個 Consumer 實例,那麼在廣播消費模式下每一個實例都會各自消費 9 條消息。

// 廣播訂閱方式設置
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);複製代碼

訂閱消息關鍵代碼:

Consumer consumer = ONSFactory.createConsumer(properties);

consumer.subscribe("TopicTestMQ", "TagA||TagB", **new** MessageListener() { //訂閱多個 Tag

public Action consume(Message message, ConsumeContext context) {

   System.out.println("Receive: " + message);
   return Action.CommitMessage;
}
});


//訂閱另一個 Topic

consumer.subscribe("TopicTestMQ-Other", "*", **new** MessageListener() { //訂閱所有 Tag

public Action consume(Message message, ConsumeContext context) {

    System.out.println("Receive: " + message);
    return Action.CommitMessage;
}
});

consumer.start();複製代碼

注意事項:

消費端要作冪等處理,全部MQ基本上都不會作冪等處理,須要業務端處理,緣由是若是在MQ端作冪等處理會帶來MQ的複雜度,並且嚴重影響MQ的性能。

消息收發模型

主子帳號建立

建立主子帳號的緣由是權限問題。下面是主帳號建立流程圖

詳細操做地址:https://help.aliyun.com/document_detail/34411.html?spm=a2c4g.11186623.6.555.38c57f91JXUK7o

子帳號流程圖

詳細操做地址:https://help.aliyun.com/document_detail/96402.html?spm=a2c4g.11186623.6.556.60194fedfSkxIB

3

MQ是微服務架構

很是重要的部分

MQ的誕生把原來的同步架構思惟轉變到異步架構思惟提供一種方法,爲大規模,高併發的業務場景的穩定性實現提供了很好的解決思路。

Martin Fowler強調:分佈式調用的第一原則就是不要分佈式。這句話看似頗具哲理,然而就企業應用系統而言,只要整個系統在不停地演化,並有多個子系統共同存在時,這條原則就會被迫打破。

Martin Fowler提出的這條原則,一方面是但願設計者可以審慎地對待分佈式調用,另外一方面卻也是分佈式系統自身存在的缺陷所致。

因此微服務並非萬能藥,適合的架構纔是最好的架構。

相關文章
相關標籤/搜索