RocketMQ入門篇

系列文章

RocketMQ入門篇
RocketMQ生產者流程篇
RocketMQ生產者消息篇java

RocketMQ總體結構

圖片描述

如上圖所示,總體能夠分紅4個角色,分別是:Producer,Consumer,Broker以及NameServer;git

1.NameServer

能夠理解爲是消息隊列的協調者,Broker向它註冊路由信息,同時Client向其獲取路由信息,若是使用過Zookeeper,就比較容易理解了,可是功能比Zookeeper弱;
NameServer自己是沒有狀態的,而且多個NameServer直接並無通訊,能夠橫向擴展多臺,Broker會和每一臺NameServer創建長鏈接;github

2.Broker

Broker是RocketMQ的核心,提供了消息的接收,存儲,拉取等功能,通常都須要保證Broker的高可用,因此會配置Broker Slave,當Master掛掉以後,Consumer而後能夠消費Slave;
Broker分爲Master和Slave,一個Master能夠對應多個Slave,Master與Slave的對應關係經過指定相同的BrokerName,不一樣的BrokerId來定義,BrokerId爲0表示Master,非0表示Slave;apache

3.Producer

消息隊列的生產者,須要與NameServer創建鏈接,從NameServer獲取Topic路由信息,並向提供Topic服務的Broker Master創建鏈接;Producer無狀態,看集羣部署;segmentfault

4.Consumer

消息隊列的消費者,一樣與NameServer創建鏈接,從NameServer獲取Topic路由信息,並向提供Topic服務的Broker Master,Slave創建鏈接;windows

5.Topic和Message Queue

在介紹完以上4個角色之後,還須要重點介紹一下上面提到的Topic和Message Queue;字面意思就是主題,用來區分不一樣類型的消息,發送和接收消息前都須要先建立Topic,針對Topic來發送和接收消息,爲了提升性能和吞吐量,引入了Message Queue,一個Topic能夠設置一個或多個Message Queue,有點相似kafka的分區(Partition),這樣消息就能夠並行往各個Message Queue發送消息,消費者也能夠並行的從多個Message Queue讀取消息;centos

單機配置和部署

如下部署在centos7,jdk1.8,rocketmq4.3.2;啓動RocketMQ的順序是先啓動NameServer,而後再啓動Broker;app

1.NameServer啓動

[root@localhost bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

如上日誌表示啓動成功,默認端口爲9876;dom

2.Broker啓動

[root@localhost bin]# ./mqbroker
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /root/rocketmq-all-4.3.2-bin-release/bin/hs_err_pid3977.log

啓動失敗,報內存不足,主要是rocketmq默認配置的啓動參數值比較大,修改runbroker.sh便可異步

[root@localhost bin]# vi runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

默認配置的可用內存爲8g,虛擬機內存不夠,修改成以下便可

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"

再次啓動,日誌以下,表示啓動成功,默認端口爲10911;

[root@localhost bin]# ./mqbroker
The broker[localhost.localdomain, 192.168.237.128:10911] boot success. serializeType=JSON

3.簡單測試

3.1生產者

public class SyncProducer {

     public static void main(String[] args) throws Exception {
           // 構造Producer
           DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
           producer.setNamesrvAddr("192.168.237.128:9876");
           // 初始化Producer,整個應用生命週期內,只須要初始化1次
           producer.start();
           for (int i = 0; i < 100; i++) {
                Message msg = new Message("TopicTest", "TagA",
                           ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
           }
           producer.shutdown();
     }
}

建立了一個DefaultMQProducer對象,同時設置了GroupName和NameServer地址,而後建立Message消息經過DefaultMQProducer將消息發送出去,返回一個SendResult對象;

3.2消費者

public class PushConsumer {

     public static void main(String[] args) throws MQClientException {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please rename to unique group name");
           consumer.setNamesrvAddr("192.168.237.128:9876");
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
           consumer.subscribe("TopicTest", "*");
           consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                     System.out.printf(Thread.currentThread().getName() + "Receive New Messages :" + msgs + "%n");
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
           });
           consumer.start();
     }
}

一樣指定了GroupName和NameServer地址,訂閱了Topic;

3.3運行測試

直接運行生產者報以下錯誤:

Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest
See http://rocketmq.apache.org/docs/faq/ for further details.
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:634)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1253)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1203)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:214)
    at com.rocketmq.SyncProducer.main(SyncProducer.java:26)

錯誤顯示"沒有此Topic的路由信息",也就是生產者在發送消息的時候沒有獲取到路由信息,找不到指定的Broker,可能的緣由:
1.Broker沒有正確鏈接NameServer
2.Producer沒有鏈接NameServer
3.Topic沒有被正確建立
SyncProducer中指定了NameServer的地址,同時RocketMQ默認狀況下會自動建立Topic,因此緣由是Broker沒有註冊到NameServer,從新指定NameServer再啓動:

[root@localhost bin]# ./mqbroker -n localhost:9876
The broker[localhost.localdomain, 192.168.237.128:10911] boot success. serializeType=JSON and name server is localhost:9876

再次運行SyncProducer,日誌以下:

SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4C60000, offsetMsgId=C0A8ED8000002A9F000000000000229C, messageQueue=MessageQueue[topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=11]

SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4CD0001, offsetMsgId=C0A8ED8000002A9F000000000000234D, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=9]

SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4D90002, offsetMsgId=C0A8ED8000002A9F00000000000023FE, messageQueue=MessageQueue[topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=9]

SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4E80003, offsetMsgId=C0A8ED8000002A9F00000000000024AF, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=11]

SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4F40004, offsetMsgId=C0A8ED8000002A9F0000000000002560, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=12]

SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4F70005, offsetMsgId=C0A8ED8000002A9F0000000000002611, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=10]

SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C5030006, offsetMsgId=C0A8ED8000002A9F00000000000026C2, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=10]

SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C5070007, offsetMsgId=C0A8ED8000002A9F0000000000002773, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=12]

SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C50A0008, offsetMsgId=C0A8ED8000002A9F0000000000002824, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=13]

SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C50D0009, offsetMsgId=C0A8ED8000002A9F00000000000028D5, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=11]

消費者使用的是push模式,能夠實時接受消息:

ConsumeMessageThread_13Receive New Messages :[MessageExt [queueId=1, storeSize=177,queueOffset=11, sysFlag=0, bornTimestamp=1547086138566, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430770, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F000000000000229C, commitLogOffset=8860, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=12, CONSUME_START_TIME=1547086138573, UNIQ_KEY=0A0D53073B6073D16E933086C4C60000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId='null'}]]

ConsumeMessageThread_3Receive New Messages :[MessageExt [queueId=2, storeSize=177, queueOffset=9, sysFlag=0, bornTimestamp=1547086138573, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430783, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F000000000000234D, commitLogOffset=9037, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1547086138598, UNIQ_KEY=0A0D53073B6073D16E933086C4CD0001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId='null'}]]

ConsumeMessageThread_17Receive New Messages :[MessageExt [queueId=3, storeSize=177, queueOffset=9, sysFlag=0, bornTimestamp=1547086138585, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430794, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F00000000000023FE, commitLogOffset=9214, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1547086138601, UNIQ_KEY=0A0D53073B6073D16E933086C4D90002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId='null'}]]

ConsumeMessageThread_9Receive New Messages :[MessageExt [queueId=0, storeSize=177, queueOffset=11, sysFlag=0, bornTimestamp=1547086138600, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430807, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F00000000000024AF, commitLogOffset=9391, bodyCRC=855693371, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=12, CONSUME_START_TIME=1547086138612, UNIQ_KEY=0A0D53073B6073D16E933086C4E80003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 51], transactionId='null'}]]

ConsumeMessageThread_15Receive New Messages :[MessageExt [queueId=1, storeSize=177, queueOffset=12, sysFlag=0, bornTimestamp=1547086138612, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430809, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F0000000000002560, commitLogOffset=9568, bodyCRC=761548184, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=13, CONSUME_START_TIME=1547086138626, UNIQ_KEY=0A0D53073B6073D16E933086C4F40004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 52], transactionId='null'}]]

ConsumeMessageThread_11Receive New Messages :[MessageExt [queueId=2, storeSize=177, queueOffset=10, sysFlag=0, bornTimestamp=1547086138615, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430820, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F0000000000002611, commitLogOffset=9745, bodyCRC=1516469518, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=11, CONSUME_START_TIME=1547086138628, UNIQ_KEY=0A0D53073B6073D16E933086C4F70005, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 53], transactionId='null'}]]

ConsumeMessageThread_4Receive New Messages :[MessageExt [queueId=3, storeSize=177,queueOffset=10, sysFlag=0, bornTimestamp=1547086138627, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430824, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F00000000000026C2,commitLogOffset=9922, bodyCRC=1131031732,reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=11, CONSUME_START_TIME=1547086138633, UNIQ_KEY=0A0D53073B6073D16E933086C5030006, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 54], transactionId='null'}]]

ConsumeMessageThread_14Receive New Messages :[MessageExt [queueId=0, storeSize=177, queueOffset=12, sysFlag=0, bornTimestamp=1547086138631, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430827, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F0000000000002773, commitLogOffset=10099, bodyCRC=879565858, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=13, CONSUME_START_TIME=1547086138635, UNIQ_KEY=0A0D53073B6073D16E933086C5070007, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 55], transactionId='null'}]]

ConsumeMessageThread_10Receive New Messages :[MessageExt [queueId=1, storeSize=177, queueOffset=13, sysFlag=0, bornTimestamp=1547086138634, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430830, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F0000000000002824, commitLogOffset=10276, bodyCRC=617742771, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=14, CONSUME_START_TIME=1547086138638, UNIQ_KEY=0A0D53073B6073D16E933086C50A0008, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 56], transactionId='null'}]]

ConsumeMessageThread_7Receive New Messages :[MessageExt [queueId=2, storeSize=177, queueOffset=11, sysFlag=0, bornTimestamp=1547086138637, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430833, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F00000000000028D5, commitLogOffset=10453, bodyCRC=1406480677, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=12, CONSUME_START_TIME=1547086138641, UNIQ_KEY=0A0D53073B6073D16E933086C50D0009, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 57], transactionId='null'}]]

多機集羣配置和部署

分別部署兩臺NameServer,兩臺Broker而且分別提供Slave,準備兩臺電腦分別是本機的windows以及虛擬機centos;

1.啓動NameServer

分別啓動2臺NameServer,端口號都使用默認的9876,地址端口以下:

192.168.237.128:9876
10.13.83.7:9876

2.啓動Broker

每臺機器上分別啓動一個Master和一個Slave,互爲主備,在主目錄下的conf文件夾下提供了多種broker配置模式,分別有:2m-2s-async,2m-2s-sync,2m-noslave,能夠以此爲模版作以下配置:

2.1配置10.13.83.7Master和Slave

Master配置以下:

namesrvAddr=192.168.237.128:9876;10.13.83.7:9876
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=E:/rocketmq-all-4.3.2-bin-release/store-a-m

Slave配置以下:

namesrvAddr=192.168.237.128:9876;10.13.83.7:9876
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10811
storePathRootDir=E:/rocketmq-all-4.3.2-bin-release/store-a-s

分別啓動結果以下:

E:\rocketmq-all-4.3.2-bin-release\bin>mqbroker -c E:\rocketmq-all-4.3.2-bin-rele
ase\conf\broker-m.conf
The broker[broker-a, 10.13.83.7:10911] boot success. serializeType=JSON and name
 server is 192.168.237.128:9876;10.13.83.7:9876

以上是Master啓動日誌,Slave日誌以下:

E:\rocketmq-all-4.3.2-bin-release\bin>mqbroker -c E:\rocketmq-all-4.3.2-bin-rele
ase\conf\broker-s.conf
The broker[broker-a, 10.13.83.7:10811] boot success. serializeType=JSON and name
 server is 192.168.237.128:9876;10.13.83.7:9876

2.2配置10.13.83.7Slave

Master配置以下:

namesrvAddr=192.168.237.128:9876;10.13.83.7:9876
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/root/rocketmq-all-4.3.2-bin-release/store-b-m

Slave配置以下:

namesrvAddr=192.168.237.128:9876;10.13.83.7:9876
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10811
storePathRootDir=/root/rocketmq-all-4.3.2-bin-release/store-b-s

啓動日誌分別以下:

[root@localhost bin]# ./mqbroker -c ../conf/broker-m.conf 
The broker[broker-b, 192.168.237.128:10911] boot success. serializeType=JSON and name server is 192.168.237.128:9876;10.13.83.7:9876
[root@localhost bin]# ./mqbroker -c ../conf/broker-s.conf 
The broker[broker-b, 192.168.237.128:10811] boot success. serializeType=JSON and name server is 192.168.237.128:9876;10.13.83.7:9876

3.配置說明

1.namesrvAddr

NameServer地址,能夠配置多個,用逗號分隔;

2.brokerClusterName

所屬集羣名稱,若是節點較多能夠配置多個

3.brokerName

broker名稱,master和slave使用相同的名稱,代表他們的主從關係

4.brokerId

0表示Master,大於0表示不一樣的slave

5.deleteWhen

表示幾點作消息刪除動做,默認是凌晨4點

6.fileReservedTime

在磁盤上保留消息的時長,單位是小時

7.brokerRole

有三個值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和異步表示Master和Slave之間同步數據的機制;

8.flushDiskType

刷盤策略,取值爲:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盤和異步刷盤;SYNC_FLUSH消息寫入磁盤後才返回成功狀態,ASYNC_FLUSH不須要;

9.listenPort

啓動監聽的端口號

10.storePathRootDir

存儲消息的根目錄

管理工具

1.命令行管理工具

mqadmin是RocketMQ自帶的命令行管理工具,能夠建立、修改Topic,查詢消息,更新配置信息等操做,具體能夠經過以下命令查看:

E:\rocketmq-all-4.3.2-bin-release\bin>mqadmin
The most commonly used mqadmin commands are:
   updateTopic          Update or create topic
   deleteTopic          Delete topic from broker and NameServer.
   updateSubGroup       Update or create subscription group
   deleteSubGroup       Delete subscription group from broker.
   updateBrokerConfig   Update broker's config
   updateTopicPerm      Update topic perm
   topicRoute           Examine topic route info
   topicStatus          Examine topic Status info
   topicClusterList     get cluster info for topic
   brokerStatus         Fetch broker runtime status data
   queryMsgById         Query Message by Id
   queryMsgByKey        Query Message by Key
   queryMsgByUniqueKey  Query Message by Unique key
   queryMsgByOffset     Query Message by offset
   printMsg             Print Message Detail
   printMsgByQueue      Print Message Detail
   sendMsgStatus        send msg to broker.
   brokerConsumeStats   Fetch broker consume stats data
   producerConnection   Query producer's socket connection and client vers
   consumerConnection   Query consumer's socket connection, client version
ubscription
   consumerProgress     Query consumers's progress, speed
   consumerStatus       Query consumer's internal data structure
   cloneGroupOffset     clone offset from other group.
   clusterList          List all of clusters
   topicList            Fetch all topic list from name server
   updateKvConfig       Create or update KV config.
   deleteKvConfig       Delete KV config.
   wipeWritePerm        Wipe write perm of broker in all name server
   resetOffsetByTime    Reset consumer offset by timestamp(without client
t).
   updateOrderConf      Create or update or delete order conf
   cleanExpiredCQ       Clean expired ConsumeQueue on broker.
   cleanUnusedTopic     Clean unused topic on broker.
   startMonitoring      Start Monitoring
   statsAll             Topic and Consumer tps stats
   allocateMQ           Allocate MQ
   checkMsgSendRT       check message send response time
   clusterRT            List All clusters Message Send RT
   getNamesrvConfig     Get configs of name server.
   updateNamesrvConfig  Update configs of name server.
   getBrokerConfig      Get broker config by cluster or special broker!
   queryCq              Query cq command.
   sendMessage          Send a message
   consumeMessage       Consume message

See 'mqadmin help <command>' for more information on a specific command.

列出了全部支持的命令以及簡單的介紹,若是想看詳細的能夠以下命令:

E:\rocketmq-all-4.3.2-bin-release\bin>mqadmin help statsAll
usage: mqadmin statsAll [-a] [-h] [-n <arg>] [-t <arg>]
 -a,--activeTopic         print active topic only
 -h,--help                Print help
 -n,--namesrvAddr <arg>   Name server address list, eg: 192.168.0.1:9876;192.168
.0.2:9876
 -t,--topic <arg>         print select topic only

2.圖形界面管理工具

除了命令,還提供了圖形界面管理工具,在RocketMQ的擴展包裏面,具體地址以下:

https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0/rocketmq-console

目前的穩定版本是1.0.0,能夠下載下來在本地運行,對application.properties作簡單配置:

rocketmq.config.namesrvAddr=10.13.83.7:9876

須要指定NameServer的地址,而後就能夠打包運行了,運做以後會啓動8080端口,直接訪問地址:

http://localhost:8080

圖片描述

總結

本文從最簡單的安裝部署入手,並對經常使用的配置參數作了簡單介紹;而後瞭解了RocketMQ的部署的總體結構,分別對其中的角色作了簡單介紹;最後介紹了兩種RocketMQ的管理工具,方便對RocketMQ的監控和管理。

相關文章
相關標籤/搜索