學習RocketMQ
的第一天,應該從官網的QuickStart
案例開始,這一節就來介紹一下如何部署單機RocketMQ
以及進行消息的收發。java
使用RocketMQ
須要有以下的硬件要求:git
瞭解版本說明以後,咱們就能夠開始進行實戰了。github
Ps: RocketMQ
版本爲Release.4.7.0
。apache
打開RocketMQ
在Github
上的主頁,獲取倉庫地址。而後在本地電腦上克隆本倉庫。服務器
git clone https://github.com/apache/rocketmq.git
打開項目後,第一步要作的是啓動nameserver
,這是RocketMQ
的路由中心,它提供輕量級服務發現和路由,主要的做用是存儲路由信息,管理broker
節點,包括路由的查找、註冊和刪除。app
在RocketMQ
工程的namesrv
包中找到入口類org.apache.rocketmq.namesrv.NamesrvStartup
,運行這個類的main
函數,發現報錯了。ide
Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation
這個報錯是由於在爲nameserver
設置相關配置時沒有設置成功。函數
if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); }
ROCKETMQ_HOME
環境變量主要用於設置nameserver
的配置,只須要將包含conf
配置目錄的這個路徑賦值給環境變量ROCKETMQ_HOME
便可,以下圖。學習
再次運行main
函數,就會發現啓動成功。ui
The Name Server boot success. serializeType=JSON
接下來要啓動的是broker
,它主要用於消息存儲,接收和發送。
一樣在RocketMQ
工程的broker
包中找到入口類org.apache.rocketmq.broker.BrokerStartup
,可是與啓動nameserver
不一樣的是,啓動broker
時須要指定註冊的nameserver
地址,在啓動命令中輸入-n 127.0.0.1:9876
便可。
運行main
函數,若是發現與以前同樣的報錯,從新設置該Application
環境變量便可,運行成功的輸出以下。
The broker[daxiongMac.local, 192.168.31.126:10911] boot success. serializeType=JSON and name server is namesrvAddr=127.0.0.1:9876
至此,RocketMQ
的路由中心和接收發消息的服務器就啓動成功了,咱們能夠經過nameserver
和broker
來進行消息傳遞了。
找到example
包的org.apache.rocketmq.example.quickstart.Producer
類,這是一個最簡單的消息生產者,咱們來看一下它的源碼。
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
DefaultMQProducer
類來建立生產者實例,並指定消息組Group
和路由中心地址。Topic
和Tag
(用於區分消息的類別)。21:22:35.450 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E4E0000, offsetMsgId=C0A81F7E00002A9F0000000000068BA2, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=1], queueOffset=500] SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E810001, offsetMsgId=C0A81F7E00002A9F0000000000068C54, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=2], queueOffset=500] SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E850002, offsetMsgId=C0A81F7E00002A9F0000000000068D06, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=3], queueOffset=500] ......
這樣就實現了RocketMQ
的發送消息。
找到example
包的org.apache.rocketmq.example.quickstart.Consumer
類,這是一個最簡單的消息消費者,咱們來看一下它的源碼。
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
DefaultMQPushConsumer
類來建立消費者實例,並指定消息組Group
、路由中心地址、消費模式、消息類別。21:24:03.482 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework Consumer Started. ConsumeMessageThread_6 Receive New Messages: [MessageExt [brokerName=daxiongMac.local, queueId=2, storeSize=178, queueOffset=502, sysFlag=0, bornTimestamp=1591449756319, bornHost=/192.168.31.126:50803, storeTimestamp=1591449756321, storeHost=/192.168.31.126:10911, msgId=C0A81F7E00002A9F00000000000691E4, commitLogOffset=430564, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1591449844575, UNIQ_KEY=C0A81F7E8D39330BEDB41E560E9F0009, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57], transactionId='null'}]] ConsumeMessageThread_5 Receive New Messages: [MessageExt [brokerName=daxiongMac.local, queueId=1, storeSize=178, queueOffset=502, sysFlag=0, bornTimestamp=1591449756316, bornHost=/192.168.31.126:50803, storeTimestamp=1591449756317, storeHost=/192.168.31.126:10911, msgId=C0A81F7E00002A9F0000000000069132, commitLogOffset=430386, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1591449844576, UNIQ_KEY=C0A81F7E8D39330BEDB41E560E9C0008, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56], transactionId='null'}]] ......
至此,咱們就完成了RocketMQ
的快速入門,啓動nameserver
和broker
,建立生產者發送消息,建立消費者接收消息。
版權聲明:本文爲 Planeswalker23所創,轉載請帶上原文連接,感謝。