RocketMQ專題1:入門

RocketMQ入門

源碼和應用下載

​ 這裏以RocketMQ的4.3.0版本爲例,本地環境爲windows10,jdk1.8, maven3.2.1.java

源碼下載地址: http://mirrors.hust.edu.cn/apache/rocketmq/4.3.0/rocketmq-all-4.3.0-source-release.zipios

應用下載地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.0/rocketmq-all-4.3.0-bin-release.zipapache

啓動

​ Windows下須要配置環境變量,ROCKETMQ_HOME, 我這裏配置爲: E:\software\rocketmq-all-4.3.0-bin-releasewindows

​ 配置完環境變量後,就能夠進入到bin目錄:異步

  • 啓動server: 直接運行bin目錄下的mqnamesrv.cmdasync

  • 啓動broker: 運行mqbroker.cmd,發現一閃而過,查看bin目錄下的bk.log日誌,發現錯誤日誌以下:maven

    錯誤: 找不到或沒法加載主類 Files\Java\jdk1.8.0_121\lib;C:\Program

    再查看mqbroker.cmd源碼,發現其最終調用了runbroker.cmd。該腳本的倒數第二行爲:ide

    set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"

    知道問題所在: CLASSPATH的配置中是包含空格的,而空格致使最終解析出來的路徑錯誤。最終我修改倒數第二行爲:函數

    set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""

    ​至此能夠順利啓動測試

​ 本覺得啓動以後就能就行消息收發了,因而我按照官網示例進入RocketMQ的bin目錄,並經過命令向broker發送消息:

tools org.apache.rocketmq.example.quickstart.Producer

​ 結果一直報錯,搜索得知在windows下須要配置環境變量NAMESRV_ADDR127.0.0.1:9876

​ 配置完成以後,再依次啓動mqnamesrv和mqbroker,從新測試Producer發現Producer的輸出大體以下:

......
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115E9C03E5, offsetMsgId=C0A8130100002A9F000000000002BC96, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=0], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115E9E03E6, offsetMsgId=C0A8130100002A9F000000000002BD4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115EA003E7, offsetMsgId=C0A8130100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=2], queueOffset=249]
11:44:47.790 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.19.1:10911] result: true
11:44:47.791 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:44:47.793 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.19.1:10909] result: true

​ 在經過命令行運行Consumer:

tools org.apache.rocketmq.example.quickstart.Consumer

​ 發現Consumer的輸出爲:

ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=249, sysFlag=0, bornTimestamp=1537242287776, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287778, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002BDFE, commitLogOffset=179710, bodyCRC=638172955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409812, UNIQ_KEY=C0A8029D46D461BBE9BA5A115EA003E7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 57], transactionId='null'}]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=248, sysFlag=0, bornTimestamp=1537242287768, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287768, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002BB2E, commitLogOffset=178990, bodyCRC=801108784, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409811, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E9803E3, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 53], transactionId='null'}]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=247, sysFlag=0, bornTimestamp=1537242287761, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287761, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002B85E, commitLogOffset=178270, bodyCRC=684865321, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409807, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E9103DF, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 49], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=246, sysFlag=0, bornTimestamp=1537242287753, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287753, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002B58E, commitLogOffset=177550, bodyCRC=1487577949, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409807, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E8903DB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 55], transactionId='null'}]]

關閉

​ 關閉的步驟與啓動正好相反

  • 關閉brokermqshutdown broker
  • 關閉namesrvmqshutdown namesrv

簡單示例

​ 在進行簡單的示例以前,咱們先要知道爲何會出現RocketMQ,下面一段話摘自RocketMQ官網:

Based on our research, with increased queues and virtual topics in use, ActiveMQ IO module reaches a bottleneck. We tried our best to solve this problem through throttling, circuit breaker or degradation, but it did not work well. So we begin to focus on the popular messaging solution Kafka at that time. Unfortunately, Kafka can not meet our requirements especially in terms of low latency and high reliability, see here for details.

In this context, we decided to invent a new messaging engine to handle a broader set of use cases, ranging from traditional pub/sub scenarios to high volume real-time zero-loss tolerance transaction system. We believe this solution can be beneficial, so we would like to open source it to the community. Today, more than 100 companies are using the open source version of RocketMQ in their business. We also published a commercial distribution based on RocketMQ, a PaaS product called the Alibaba Cloud Platform.

​ 能夠知道RocketMQ是阿里在使用ActiveMQ時,出現了IO瓶頸,沒法知足阿里業務所須要的低延遲和高可靠性要求時本身研發出來。而且最終捐贈給Apache,成爲頂級開源項目的。high volume real-time zero-loss tolerance transaction system是其核心特色。

​ 下面經過一個簡單的示例,來講明RocketMQ的基本使用:

引入pom依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

Producer

​ Producer通常分爲三種模式: 同步、異步和單向,具體代碼以下:

public class SimpleProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException,
            MQBrokerException, InterruptedException {
        /**
         * 同步消息發送: 通常用來進行通知、短信等重要消息的同步
         */
        // syncProducer();
        
        /**
         * 異步消息發送: 通常用來對方法調用響應時間有較嚴格要求的狀況下,異步調用,當即返回
         * 不一樣於同步的惟一在於: send方法調用的時候多攜帶一個回調接口參數,用來異步處理消息發送結果
         */
        asyncProducer();
        
        /**
         * 單向模式: 通常用來對可靠性有必定要求的消息發送,例如日誌系統
         * 不一樣於同步的惟一之處在於: 調用的是sendOneway方法,且該方法不會給調用者任何返回值
         */
        // oneWayProducer();
    }

    private static void oneWayProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
        // STEP1: 建立Producer而且指定組名
        DefaultMQProducer oneWayProducer = new DefaultMQProducer("GroupA");

        // STEP2: 指定nameServer地址
        oneWayProducer.setNamesrvAddr("localhost:9876");

        // STEP3: 啓動Producer
        oneWayProducer.start();

        // STEP4: 循環發送消息
        for (int i = 0; i < 50; i++) {
            Message message = new Message("OneWayTopic", "TagA",
                    ("OneWayMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            oneWayProducer.sendOneway(message);
        }

        // STEP5: 關閉Producer
        oneWayProducer.shutdown();
    }

    private static void asyncProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
        // STEP1: 建立Producer而且指定組名
        DefaultMQProducer asyncProducer = new DefaultMQProducer("GroupA");

        // STEP2: 指定nameServer地址
        asyncProducer.setNamesrvAddr("localhost:9876");

        // STEP3: 啓動Producer
        asyncProducer.start();
        asyncProducer.setRetryTimesWhenSendAsyncFailed(0);      // 設置異步發送失敗重試次數,默認爲2

        // STEP4: 循環發送消息
        for (int i = 0; i < 50; i++) {
            Message message = new Message("AsyncTopic", "TagA",
                    ("AsyncMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 建立回調函數處理髮送成功或者異常
            asyncProducer.send(message, new SendCallback() {
                
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                
                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            });
        }

        // STEP5: 關閉Producer
        TimeUnit.SECONDS.sleep(10); // 睡眠10秒,確保消息都發送出去
        asyncProducer.shutdown();
    }

    private static void syncProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
        // STEP1: 建立Producer而且指定組名
        DefaultMQProducer syncProducer = new DefaultMQProducer("GroupA");

        // STEP2: 指定nameServer地址
        syncProducer.setNamesrvAddr("localhost:9876");

        // STEP3: 啓動Producer
        syncProducer.start();

        // STEP4: 循環發送消息
        for (int i = 0; i < 50; i++) {
            Message message = new Message("SyncTopic", "TagA",
                    ("SyncMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = syncProducer.send(message);
            System.out.println(sendResult);
        }
        
        // STEP5: 關閉Producer
        syncProducer.shutdown();
    }
}

Consumer

​ consumer的實現就較爲簡單了,定義一個事件監聽接口便可.

public class SimpleConsumer {
    public static void main(String[] args) throws MQClientException {
        // STEP1: 建立默認Consumer並指定
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA");
        
        // STEP2: 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        
        // STEP3: 訂閱對應主題和tag
        consumer.subscribe("AsyncTopic", "*");
        
        // STEP4: 註冊接收到broker消息後的處理接口
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    System.out.println(new String(msgs.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        
        // STEP5: 啓動consumer (必須在註冊完消息監聽器以後啓動,不然會報錯)
        consumer.start();
        
        System.out.println("Consumer started......");
    }
}

總結

  • 運行Producer的時候必須保證nameServer和broker都正常運行,不然會報org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
  • 即便先運行Producer只要在運行Consumer以前,未重啓broker或者nameServer。Consumer啓動時仍是能正常收到消息

參考連接

http://rocketmq.apache.org/docs/simple-example/

相關文章
相關標籤/搜索