RocketMQ知識

MQ基礎概念:

  • MQ:
    消息總線(Message Queue),是一種跨進程的通訊機制,用於上下游傳遞消息。在互聯網架構中,MQ是一種很是常見的上下游「邏輯解耦+物理解耦」的消息通訊服務。使用MQ以後,消息發送上游只須要依賴MQ,邏輯上和物理上都不用依賴其餘服務。
    MQ的不足
    (1)系統更加複雜,多了一個MQ組件
    (2)消息傳遞路徑更長,延時會增長
    (3)消息可能會被重複消費
    (4)上游沒法知道下游的執行結果(所以,調用方實時依賴執行結果的業務場景,請使用調用,而不是MQ)docker

    使用場景
    (1)上游不關注執行結果
    (2)上游關注結果,但執行時間比較長。舉個例子,微信支付,跨公網調用微信的接口,執行時間會比較長,但調用方又很是關注執行結果,此時通常怎麼玩呢?
    clipboard.png
    通常採用「回調網關+MQ」方案來解耦:微信

    a、調用方直接跨公網調用微信接口
    b、微信返回調用成功,此時並不表明返回成功
    c、微信執行完成後,回調統一網關
    d、網關將返回結果通知MQ
    e、請求方收到結果通知
  • rocketMQ:
    RocketMQ 是什麼?
    Github 上關於 RocketMQ 的介紹:
    RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的消息中間件。具備如下特性:

    支持發佈/訂閱(Pub/Sub)和點對點(P2P)消息模型
    在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞
    支持拉(pull)和推(push)兩種消息模式
    單一隊列百萬消息的堆積能力
    支持多種消息協議,如 JMS、MQTT 等
    分佈式高可用的部署架構,知足至少一次消息傳遞語義
    提供 docker 鏡像用於隔離測試和雲集羣部署
    提供配置、指標和監控等功能豐富的 Dashboard架構

  • consumer group:
    一、概念:消費者分組,多個消費者在一個消費者分組中。
    二、注意點:一個consumer group中的機器至關於一個集羣,consumer group中只有一臺機器會接收到消息,並進行消費。每個consumer group都會接收到消息。這樣子的設計要求消費端須要保證冪等性。
  • topic:
    一、概念:Topic 是一種消息的邏輯分類,好比說你有訂單類的消息,也有庫存類的消息,那麼就須要進行分類,一個是訂單 Topic 存放訂單相關的消息,一個是庫存 Topic 存儲庫存相關的消息。
    二、生產方發出的消息綁定某個topic,而後消費方監聽某個topic,消費方(各個group)接收到消息,進行消費
    三、topic應用級別:整個應用最好都使用一個topic,而更加細的區分,使用tags來區分。
  • tag:
    一、概念:標籤,用於對消息分類,在topic的基礎上進行更細的劃分。
  • nameServer:
    一、概念:Name Server 爲 producer 和 consumer 提供路由信息。相似rpc中的註冊中心。當producer須要發送消息首先去詢問nameServer須要請求哪一個broker。而當consumer須要拉取消息,也會先詢問nameServer須要請求哪一個broker。
  • broker:
    一、概念:rocketMQ中負責接收生產者消息、給消費者發送消息的組件。
  • Message:
    一、概念:Message 是消息的載體。一個 Message 必須指定 topic。Message 還有一個可選的 tag 設置,以便消費端能夠基於 tag 進行過濾消息。也能夠添加額外的鍵值對,例如你須要一個業務 key 來查找 broker 上的消息,方便在開發過程當中診斷問題。

MQ生產者者實例:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        //聲明並初始化一個producer
        //須要一個producer group名字做爲構造方法的參數,這裏爲producer1
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        
        //設置NameServer地址,此處應改成實際NameServer地址,多個地址之間用;分隔
        //NameServer的地址必須有,可是也能夠經過環境變量的方式設置,不必定非得寫死在代碼裏
        producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
        
        //調用start()方法啓動一個producer實例
        producer.start();

        //發送10條消息到Topic爲TopicTest,tag爲TagA,消息內容爲「Hello RocketMQ」拼接上i的值
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                
                //調用producer的send()方法發送消息
                //這裏調用的是同步的方式,因此會有返回結果
                SendResult sendResult = producer.send(msg);
                
                //打印返回結果,能夠看到消息發送的狀態以及一些相關信息
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        //發送完消息以後,調用shutdown()方法關閉producer
        producer.shutdown();
    }
}

MQ消費者實例:

在開發過程當中,若是想測試生產者是否發出了mq,能夠編寫一個消費者進行測試分佈式

@Test
public void testMqConsumer() throws Exception {
    String rocketmqAddress="10.113.41.2:9876;10.113.41.4:9876";

    int threadNum = 5;
    String topics = "WechatUnionCoreTemplateNotifyTopic";
    String instanceName = "TemplateComsumer";
    String groupName = "wechatUnionTemplateNotifyConsumer";
    DefaultMQPushConsumer consumer = null;

    consumer = new DefaultMQPushConsumer(groupName);
    consumer.setNamesrvAddr(rocketmqAddress);//MQ地址
    consumer.setClientCallbackExecutorThreads(threadNum);//消費現場數量
    consumer.setInstanceName(instanceName);//實例名稱
    consumer.subscribe(topics, "*");


    //註冊監聽
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            for (int i = 0; i < msgs.size(); i++) {
                MessageExt msgExt =  msgs.get(i);
                String msgId = msgExt.getMsgId();
                Integer flag = msgExt.getFlag();
                TemplateNotifyItem templateNotifyItem = ProtoBufSerialize.fromProto(msgExt.getBody(), TemplateNotifyItem.class);
                logger.info("receive new Msg:    " + "  msgId=" + msgId + "   flag=" + flag + "  templateNotifyItem=" + templateNotifyItem);
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    logger.info("監聽執行中");



    Thread.sleep(1000000);
}

參考:
http://blog.csdn.net/manzhizh...
https://www.jianshu.com/p/824...
架構師之路ide

相關文章
相關標籤/搜索