消息中間件 RocketMQ 源碼解析 —— 調試環境搭建

摘要: 原創出處 www.iocoder.cn/RocketMQ/bu… 「芋道源碼」歡迎轉載,保留摘要,謝謝!html


0. 友情提示

閱讀源碼以前,建議胖友對 RocketMQ 的文檔已經熟讀。目前 RocketMQ 4 的中文文檔不多,因此英文不太好的胖友,推薦看看以下資料:java

1. 依賴工具

  • JDK :1.8+
  • Maven
  • IntelliJ IDEA

2. 源碼拉取

從官方倉庫 [github.com/apache/rock…) Fork 出屬於本身的倉庫。爲何要 Fork ?既然開始閱讀、調試源碼,咱們可能會寫一些註釋,有了本身的倉庫,能夠進行自由的提交。😈git

使用 IntelliJ IDEAFork 出來的倉庫拉取代碼。拉取完成後,Maven 會下載依賴包,可能會花費一些時間,耐心等待下。github


在等待的過程當中,我來簡單說下,搭建調試環境的過程:面試

  1. 啓動 RocketMQ Namesrv
  2. 啓動 RocketMQ Broker
  3. 啓動 RocketMQ Producer
  4. 啓動 RocketMQ Consumer

最小化的 RocketMQ 的環境,暫時不考慮 Namesrv 集羣、Broker 集羣、Consumer 集羣。數據庫

😈 另外,本文使用的 RocketMQ 版本是 4.4.0-SNAPSHOTapache

3. 啓動 RocketMQ Namesrv

打開 org.apache.rocketmq.namesrv.NameServerInstanceTest 單元測試類,參考 #startup() 方法,咱們編寫 #main(String[] args) 靜態方法,代碼以下:ide

// NameServerInstanceTest.java

public static void main(String[] args) throws Exception {
    // NamesrvConfig 配置
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    // NettyServerConfig 配置
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(9876); // 設置端口
    // 建立 NamesrvController 對象,並啓動
    NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
    namesrvController.initialize();
    namesrvController.start();
    // 睡覺,就不起來
    Thread.sleep(DateUtils.MILLIS_PER_DAY);
}
複製代碼

而後,右鍵運行,RocketMQ Namesrv 就啓動完成。輸出日誌以下:工具

17:54:03.354 [NettyEventExecutor] INFO  RocketmqRemoting - NettyEventExecutor service started
17:54:03.355 [FileWatchService] INFO  RocketmqCommon - FileWatchService service started
複製代碼

最後,這是一個可選的步驟,命令行中輸入 telnet 127.0.0.1 9876 ,看看是否能鏈接上 RocketMQ Namesrv 。單元測試

4. 啓動 RocketMQ Broker

打開 org.apache.rocketmq.broker.BrokerControllerTest 單元測試類,參考 #testBrokerRestart() 方法,咱們編寫 #main(String[] args) 方法,代碼以下:

// BrokerControllerTest.java

public static void main(String[] args) throws Exception {
    // 設置版本號
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    // NettyServerConfig 配置
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(10911);
    // BrokerConfig 配置
    final BrokerConfig brokerConfig = new BrokerConfig();
    brokerConfig.setBrokerName("broker-a");
    brokerConfig.setNamesrvAddr("127.0.0.1:9876");
    // MessageStoreConfig 配置
    final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    messageStoreConfig.setDeleteWhen("04");
    messageStoreConfig.setFileReservedTime(48);
    messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
    messageStoreConfig.setDuplicationEnable(false);

// BrokerPathConfigHelper.setBrokerConfigPath("/Users/yunai/百度雲同步盤/開發/Javascript/Story/incubator-rocketmq/conf/broker.conf");
    // 建立 BrokerController 對象,並啓動
    BrokerController brokerController = new BrokerController(//
            brokerConfig, //
            nettyServerConfig, //
            new NettyClientConfig(), //
            messageStoreConfig);
    brokerController.initialize();
    brokerController.start();
    // 睡覺,就不起來
    System.out.println("你猜");
    Thread.sleep(DateUtils.MILLIS_PER_DAY);
}
複製代碼

而後,右鍵運行,RocketMQ Broker 就啓動完成了。輸出日誌以下:

你猜
複製代碼
  • 不要懵逼,咱們打開下 RocketMQ Namesrv 那,已經輸出日誌以下:

    18:17:30.443 [NettyServerCodecThread_5] INFO  RocketmqRemoting - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:63847
    18:17:30.443 [NettyServerCodecThread_5] INFO  RocketmqRemoting - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:63847]
    18:17:30.457 [RemotingExecutorThread_4] DEBUG RocketmqNamesrv - receive request, 103 127.0.0.1:63847 RemotingCommand [code=103, language=JAVA, version=275, opaque=0, flag(B)=0, remark=null, extFields={brokerId=0, bodyCrc32=1880081823, clusterName=DefaultCluster, brokerAddr=192.168.3.26:10911, haServerAddr=192.168.3.26:10912, compressed=false, brokerName=broker-a}, serializeTypeCurrentRPC=JSON]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, BenchmarkTest QueueData [brokerName=broker-a, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, broker-a QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, TBW102 QueueData [brokerName=broker-a, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, DefaultCluster QueueData [brokerName=broker-a, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new broker registered, 192.168.3.26:10911 HAServer: 192.168.3.26:10912
    複製代碼
    • 妥妥的,原來 RocketMQ Broker 已經啓動完成,而且註冊到 RocketMQ Namesrv 上。

最後,這是一個可選的步驟,命令行中輸入 telnet 127.0.0.1 10911 ,看看是否能鏈接上 RocketMQ Broker 。

5. 啓動 RocketMQ Producer

打開 org.apache.rocketmq.example.quickstart.Producer 示例類,代碼以下:

// Producer.java

/** * This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}. */
public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {

        /* * Instantiate with a producer group name. */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */

        /* * Launch the instance. */
        producer.setNamesrvAddr("127.0.0.1:9876"); // <x> 哈哈哈哈
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

                /* * Create a message instance, specifying topic, tag and message body. */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /* * Call send message to deliver message to one of brokers. */
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /* * Shut down once the producer instance is not longer in use. */
        producer.shutdown();
    }

}
複製代碼
  • 注意,在 <x> 哈哈哈哈處,咱們增長了 producer.setNamesrvAddr("127.0.0.1:9876") 代碼塊,指明 Producer 使用的 RocketMQ Namesrv 。
  • 😈 可能會有胖友會問,爲何指定的不是 RocketMQ Broker 呢?請退回到 「0. 友情提示」

而後,右鍵運行,RocketMQ Producer 就啓動完成。輸出日誌以下:

18:22:13.507 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
SendResult [sendStatus=SEND_OK, msgId=C0A8031AE91718B4AAC27A6364050000, offsetMsgId=C0A8031A00002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
// ... 中間省略 N 條 ... 
SendResult [sendStatus=SEND_OK, msgId=C0A8031AE91718B4AAC27A6369F603E6, offsetMsgId=C0A8031A00002A9F000000000002BD4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8031AE91718B4AAC27A6369F703E7, offsetMsgId=C0A8031A00002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=249]
18:22:15.558 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.3.26:10911] result: true
18:22:15.559 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
18:22:15.560 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.3.26:10909] result: true
複製代碼

沒有最後。

6. 啓動 RocketMQ Consumer

打開 org.apache.rocketmq.example.quickstart.Consumer 示例類,代碼以下:

// Consumer.java

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /* * Instantiate with specified consumer group name. */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */

        /* * Specify where to start in case the specified consumer group is a brand new one. */
        consumer.setNamesrvAddr("127.0.0.1:9876"); // <x> 哈哈哈哈
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /* * Subscribe one more more topics to consume. */
        consumer.subscribe("TopicTest", "*");

        /* * Register callback to execute on arrival of messages fetched from brokers. */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /* * Launch the consumer instance. */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }

}
複製代碼
  • 注意,在 <x> 哈哈哈哈處,咱們還增長了 consumer.setNamesrvAddr("127.0.0.1:9876") 代碼塊,指明 Consumer 使用的 RocketMQ Namesrv 。
  • 😈 再來一道送命題,爲何指定的不是 RocketMQ Broker 呢?

而後,右鍵運行,RocketMQ Consumer 就啓動完成。輸入日誌以下:

18:37:12.196 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1543054934061, bornHost=/192.168.3.26:64103, storeTimestamp=1543054934065, storeHost=/192.168.3.26:10911, msgId=C0A8031A00002A9F0000000000000164, commitLogOffset=356, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1543055832771, UNIQ_KEY=C0A8031AE91718B4AAC27A63642D0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] 
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=2, storeSize=179, queueOffset=4, sysFlag=0, bornTimestamp=1543054934102, bornHost=/192.168.3.26:64103, storeTimestamp=1543054934103, storeHost=/192.168.3.26:10911, msgId=C0A8031A00002A9F0000000000000BD9, commitLogOffset=3033, bodyCRC=367242165, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1543055832779, UNIQ_KEY=C0A8031AE91718B4AAC27A6364560011, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 55], transactionId='null'}]] 
// ... 中間省略 N 條 ... 
CONSUME_START_TIME=1543055832779, UNIQ_KEY=C0A8031AE91718B4AAC27A636450000F, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 53], transactionId='null'}]] 
複製代碼

沒有最後。

666. 彩蛋

😈 一直想寫這篇,一直忘記掉。

妥妥的,徐媽是最胖的。

仍是那句話,必定必定必定要看 「0. 友情提示」 提供的文檔。先懂原理,才能更好的讀懂源碼。

源碼,是原理的具象化
原理,是代碼的抽象化

相關文章
相關標籤/搜索