RocketMq架構原理和使用總結

RocketMQ是一個分佈式開放消息中間件,底層基於隊列模型來實現消息收發功能。RocketMQ集羣中包含4個模塊:Namesrv, Broker, Producer, Consumer。html

RocketMq架構原理和使用總結

主要功能

  • 削峯填谷(主要解決瞬時寫壓力大於應用服務能力致使消息丟失、系統奔潰等問題)
  • 系統解耦(解決不一樣重要程度、不一樣能力級別系統之間依賴致使一死全死)
  • 提高性能(當存在一對多調用時,能夠發一條消息給消息系統,讓消息系統通知相關係統)
  • 蓄流壓測(線上有些鏈路很差壓測,能夠經過堆積必定量消息再放開來壓測)

各個模塊的做用

  • Namesrv: 存儲當前集羣全部Brokers信息、Topic跟Broker的對應關係。
  • Broker: 集羣最核心模塊,主要負責Topic消息存儲、消費者的消費位點管理(消費進度)。
  • Producer: 消息生產者,每一個生產者都有一個ID(編號),多個生產者實例能夠共用同一個ID。同一個ID下全部實例組成一個生產者集羣。
  • Consumer: 消息消費者,每一個訂閱者也有一個ID(編號),多個消費者實例能夠共用同一個ID。同一個ID下全部實例組成一個消費者集羣。

各個模塊功能關係參考博客:https://www.cnblogs.com/wxd0108/p/6041829.html架構

功能架構部署圖:app

RocketMq架構原理和使用總結

MQ集羣工做流程

  1. 啓動Namesrv,Namesrv起來後監聽端口,等待Broker、Produer、Consumer連上來,至關於一個路由控制中心。異步

  2. Broker啓動,跟全部的Namesrv保持長鏈接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲全部topic信息。註冊成功後,namesrv集羣中就有Topic跟Broker的映射關係。分佈式

  3. 收發消息前,先建立topic,建立topic時須要指定該topic要存儲在哪些Broker上。也能夠在發送消息時自動建立Topic。ide

  4. Producer發送消息,啓動時先跟Namesrv集羣中的其中一臺創建長鏈接,並從Namesrv中獲取當前發送的Topic存在哪些Broker上,而後跟對應的Broker創建長鏈接,直接向Broker發消息。性能

  5. Consumer跟Producer相似。跟其中一臺Namesrv創建長鏈接,獲取當前訂閱Topic存在哪些Broker上,而後直接跟Broker創建鏈接通道,開始消費消息。

Producer

示例代碼:日誌

這裏用InitializingBean, DisposableBean來管理mq的生命週期,InitializingBean用來初始化mq配置信息,DisposableBean 在mq執行完成後用來銷燬bean。code

@Component
public class CancelDisplayProducer implements InitializingBean, DisposableBean {

    private static final Logger logger= LoggerFactory.getLogger(CancelDisplayProducer.class);

    private DefaultMQProducer defaultMQProducer;
    @Value("${crk.topic}")
    private String topicName;

    @Value("${crk.nameServer}")
    private String nameServer;

    @Value(("${crk.groupName}"))
    private String groupName;

    public SendResult sendCancelDisplayMq(String tag, String msg, Object primaryKey, Object hashVal){
        logger.info("發送取消延時隊列消息內容{}",msg);
        Message rocketMsg = null;
        com.alibaba.rocketmq.client.producer.SendResult sendResult = null;
        try {
            rocketMsg =  new Message(topicName, tag, primaryKey + "", msg.getBytes("UTF-8"));
            //設置該消息延遲1s發送
            rocketMsg.setDelayTimeLevel(1);
            sendResult = defaultMQProducer.send(rocketMsg, new MessageQueueSelector() {
            //發送順序消息
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object obj) {
                    int hashCode = obj.hashCode();
                    if(hashCode < 0) {
                        hashCode = Math.abs(hashCode);
                    }
                    int index = hashCode % list.size();
                    return list.get(index);
                }
            }, hashVal);
            if(sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
                logger.info("發送取消延時隊列消息成功,發送內容:{},keys:{}", msg, primaryKey);
            }
        } catch (Exception e) {
            logger.error("發送取消延時隊列消息異常【{}】", e);
        }
        return sendResult;

    }

    @Override
    public void destroy() throws Exception {
        defaultMQProducer.shutdown();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        logger.info("groupName=" + groupName);
        logger.info("nameServer=" + nameServer);
        //初始化
        defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setNamesrvAddr(nameServer);
        defaultMQProducer.setProducerGroup(groupName);
        defaultMQProducer.setRetryTimesWhenSendFailed(5);
        defaultMQProducer.setInstanceName("openCarCancelDisplayInstance");
        //設置超時時間爲5s
        defaultMQProducer.setSendMsgTimeout(5000);
        defaultMQProducer.start();
        logger.info("DefaultMQProudcer start success!");
    }
}

//***調用生產者發送消息***
cancelDisplayProducer.sendCancelDisplayMq("cancleDisplay",JSONObject.toJSONString(bodyJson),orderNo,orderNo);

Producer順序發送

Rocketmq可以保證消息嚴格順序,可是Rocketmq須要producer保證順序消息按順序發送到同一個queue中,好比購買流程(1)下單(2)支付(3)支付成功,htm

這三個消息須要根據特定規則將這個三個消息按順序發送到一個queue
Producer端確保消息順序惟一要作的事情就是將消息路由到特定的分區(這裏的分區能夠理解爲不一樣的隊列),在RocketMQ中,經過MessageQueueSelector來實現分區的選擇。

如何實現把順序消息發送到同一個queue:

RocketMq架構原理和使用總結

通常消息是經過輪詢全部隊列發送的,順序消息能夠根據業務好比說訂單號orderId相同的消息發送到同一個隊列, 或者同一用戶userId發送到同一隊列等等

messageQueueList [orderId%messageQueueList.size()]

messageQueueList [userId%messageQueueList.size()]

Consumer

示例代碼:

@Component
public class CancelDisplayConsumer implements InitializingBean, DisposableBean {

    private static  final String CANCEL_DISPLAY_GROUP_NAME="cancle_display_consumer_group";

    private static  final  String CANCLE_DISPLAY_INSTANCE_NAME="cancle_display_consumer_instance";

    private static  final Logger logger= LoggerFactory.getLogger(CancelDisplayConsumer.class);

    private DefaultMQPushConsumer consumer;

    @Autowired
    private CancelDisplayProducer cancelDisplayProducer;
    @Autowired
    private IComTransChannelConfigService comTransChannelConfigService;
    @Value("${crk.nameServer}")
    private String nameServer;
    @Value("${crk.topic}")
    private String topicName;
    @Autowired
    private IHongqiOrderMappingService hongqiOrderMappingService;
    @Override
    public void destroy() throws Exception {
        consumer.shutdown();
        logger.info("訂單取消延時隊列消費消息關閉");
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        try {
            consumer = new DefaultMQPushConsumer(CANCEL_DISPLAY_GROUP_NAME);
            consumer.setNamesrvAddr(nameServer);
            consumer.setInstanceName(CANCLE_DISPLAY_INSTANCE_NAME);
            consumer.subscribe(topicName, "*");
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {

                        for(MessageExt messageExt : list) {
                            logger.info("消費取消延遲消息start:{}", list);
                            String body = new String(messageExt.getBody());
                            JSONObject bodyJson = JSONObject.parseObject(body);
                            String orderNo = bodyJson.getString("orderNo");
                            String channel=bodyJson.getString("channel");
                            MDC.put("traceId", messageExt.getMsgId());
                            //邏輯代碼忽略.........
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            logger.error("消費延遲取消消息consume啓動異常:{}",e);
        }
    }
}

如何保證消息不丟失

分別從Producer發送機制、Broker的持久化機制,以及消費者的offSet機制來最大程度保證消息不易丟失

1、producer重試發送消息

  1. 默認狀況下,能夠經過同步的方式阻塞式的發送,check SendStatus,狀態是OK,表示消息必定成功的投遞到了Broker,狀態超時或者失敗,則會觸發默認的2次重試。此方法的發送結果,可能Broker存儲成功了,也可能沒成功

  2. 採起事務消息的投遞方式,並不能保證消息100%投遞成功到了Broker,可是若是消息發送Ack失敗的話,此消息會存儲在CommitLog當中,可是對ConsumerQueue是不可見的。能夠在日誌中查看到這條異常的消息,嚴格意義上來說,也並無徹底丟失

  3. RocketMQ支持 日誌的索引,若是一條消息發送以後超時,也能夠經過查詢日誌的API,來check是否在Broker存儲成功

2、broker的持久化機制

  1. 消息支持持久化到Commitlog裏面,即便宕機後重啓,未消費的消息也是能夠加載出來的

2.Broker自身支持同步刷盤、異步刷盤的策略,能夠保證接收到的消息必定存儲在本地的內存中

  1. Broker集羣支持 1主N從的策略,支持同步複製和異步複製的方式,同步複製能夠保證即便Master 磁盤崩潰,消息仍然不會丟失

3、消費端的重試機制

消費者能夠根據自身的策略批量Pull消息

  1. Consumer自身維護一個持久化的offset(對應MessageQueue裏面的min offset),標記已經成功消費或者已經成功發回到broker的消息下標

  2. 若是Consumer消費失敗,那麼它會把這個消息發回給Broker,發回成功後,再更新本身的offset

  3. 若是Consumer消費失敗,發回給broker時,broker掛掉了,那麼Consumer會定時重試這個操做

若是Consumer和broker一塊兒掛了,消息也不會丟失,由於consumer 裏面的offset是定時持久化的,重啓以後,繼續拉取offset以前的消息到本地

關於offset:

RocketMQ 中, 一 種類型的消息會放到 一 個 Topic 裏,爲了可以並行, 通常一個 Topic 會有多個 Message Queue (也能夠 設置成一個), Offset是指某個 Topic下的一條消息在某個 Message Queue裏的 位置,經過 Offset的值能夠定位到這條消息,或者指示 Consumer從這條消息 開始向後繼續處理。

Offset主要分爲本地文件類型和 Broker代存的類型兩種。

RocketMq架構原理和使用總結

Rocketmq集羣有兩種消費模式

默認是 CLUSTERING 模式,也就是同一個 Consumer group 裏的多個消費者每人消費一部分,各自收到的消息內容不同。 這種狀況下,由 Broker 端存儲和控制 Offset 的值,使用 RemoteBrokerOffsetStore 結構。

BROADCASTING模式下,每一個 Consumer 都收到這個 Topic 的所有消息,各個 Consumer 間相互沒有干擾, RocketMQ 使用 LocalfileOffsetStore,把 Offset存到本地。

原文來自:http://suo.im/5wYoLH

相關文章
相關標籤/搜索