RocketMQ做爲一款純java、分佈式、隊列模型的開源消息中間件,支持事務消息、順序消息、批量消息、定時消息、消息回溯等。java
如圖所示爲RocketMQ基本的部署結構,主要分爲NameServer集羣、Broker集羣、Producer集羣和Consumer集羣四個部分。git
Broker在啓動的時候會去向NameServer註冊而且定時發送心跳,Producer在啓動的時候會到NameServer上去拉取Topic所屬的Broker具體地址,而後向具體的Broker發送消息github
NameServer的做用是Broker的註冊中心。web
每一個NameServer節點互相之間是獨立的,沒有任何信息交互,也就不存在任何的選主或者主從切換之類的問題,所以NameServer是很輕量級的。單個NameServer節點中存儲了活躍的Broker列表(包括master和slave),這裏活躍的定義是與NameServer保持有心跳。spring
Topic 與 Tag 都是業務上用來歸類的標識,區分在於 Topic 是一級分類,而 Tag 能夠理解爲是二級分類數據庫
Topic是生產者在發送消息和消費者在拉取消息的類別。Topic與生產者和消費者之間的關係很是鬆散。一個生產者能夠發送不一樣類型Topic的消息。消費者組能夠訂閱一個或多個主題,只要該組的實例保持其訂閱一致便可。apache
咱們能夠理解爲Topic是第一級消息類型,好比一個電商系統的消息能夠分爲:交易消息、物流消息等,一條消息必須有一個Topic。數組
意思就是子主題,爲用戶提供了額外的靈活性。有了標籤,方便RocketMQ提供的查詢功能。springboot
能夠理解爲第二級消息類型,交易建立消息,交易完成消息..... 一條消息能夠沒有Tag服務器
一個topic下,能夠設置多個queue(消息隊列),默認4個隊列。當咱們發送消息時,須要要指定該消息的topic。
RocketMQ會輪詢該topic下的全部隊列,將消息發送出去。
在 RocketMQ 中,全部消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指隊列中的每一個存儲單元都是定長,訪問其中的存儲單元使用 Offset 來訪問,offset 爲 java long 類型,64 位,理論上在 100年內不會溢出,因此認爲是長度無限。
也能夠認爲 Message Queue 是一個長度無限的數組,Offset 就是下標。
RocketMQ中也有組的概念。表明具備相同角色的生產者組合或消費者組合,稱爲生產者組或消費者組。
做用是在集羣HA的狀況下,一個生產者down以後,本地事務回滾後,能夠繼續聯繫該組下的另一個生產者實例,不至於致使業務走不下去。在消費者組中,能夠實現消息消費的負載均衡和消息容錯目標。
有了GroupName,在集羣下,動態擴展容量很方便。只須要在新加的機器中,配置相同的GroupName。啓動後,就當即能加入到所在的羣組中,參與消息生產或消費。
Broker是具體提供業務的服務器,單個Broker節點與全部的NameServer節點保持長鏈接及心跳,定時(每隔30s)註冊Topic信息到全部Name Server。Name Server定時(每隔10s)掃描全部存活broker的鏈接,若是Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的鏈接。底層的通訊和鏈接都是基於Netty實現的。
負載均衡:Broker上存Topic信息,Topic由多個隊列組成,隊列會平均分散在多個Broker上,會自動輪詢當前全部可發送的broker ,儘可能平均分佈到全部隊列中,最終效果就是全部消息都平均落在每一個Broker上。
高可用:Broker中分master和slave兩種角色,每一個master能夠對應多個slave,但一個slave只能對應一個master,master和slave經過指定相同的Brokername組成,其中不一樣的BrokerId==0 是master,非0是slave。
高可靠併發讀寫服務:master和slave之間的同步方式分爲同步雙寫和異步複製,異步複製方式master和slave之間雖然會存在少許的延遲,但性能較同步雙寫方式要高出10%左右。
單個Producer和一臺NameServer節點(隨機選擇)保持長鏈接,定時查詢topic配置信息,若是該NameServer掛掉,生產者會自動鏈接下一個NameServer,直到有可用鏈接爲止,並能自動重連。與NameServer之間沒有心跳。
單個Producer和與其關聯的全部broker保持長鏈接,並維持心跳。默認狀況下消息發送採用輪詢方式,會均勻發到對應Topic的全部queue中。
單個Consumer和一臺NameServer保持長鏈接,定時查詢topic配置信息,若是該NameServer掛掉,消費者會自動鏈接下一個NameServer,直到有可用鏈接爲止,並能自動重連。與NameServer之間沒有心跳。
單個Consumer和與其關聯的全部broker保持長鏈接,並維持心跳,失去心跳後,則關閉鏈接,並向該消費者分組的全部消費者發出通知,分組內消費者從新分配隊列繼續消費。
在默認狀況下,就是集羣消費,此時消息發出去後將只有一個消費者能獲取消息。
廣播消費,一條消息被多個Consumer消費。消息會發給Consume Group中的全部消費者進行消費。
消息的順序指的是消息消費時,能按照發送的順序來消費。
RocketMQ是經過將「相同ID的消息發送到同一個隊列,而一個隊列的消息只由一個消費者處理「來實現順序消息
消息領域有一個對消息投遞的QoS(服務質量)定義,分爲:最多一次(At most once)、至少一次(At least once)、僅一次( Exactly once)。
MQ產品都聲稱本身作到了At least once。既然是至少一次,就有可能發生消息重複。
有不少緣由致使,好比:網絡緣由閃斷,ACK返回失敗等等故障,確認信息沒有傳送到消息隊列,致使消息隊列不知道本身已經消費過該消息了,再次將該消息分發給其餘的消費者
不一樣的消息隊列發送的確認信息形式不一樣:RocketMQ返回一個CONSUME_SUCCESS成功標誌,RabbitMQ是發送一個ACK確認消息
冪等性:就是用戶對於同一操做發起的一次請求或者屢次請求的結果是一致的,不會由於屢次點擊而產生了反作用,數據庫的結果都是惟一的,不可變的。
去重策略:保證每條消息都有惟一編號(好比惟一流水號),且保證消息處理成功與去重表的日誌同時出現。
好比如秒殺等大型活動時會帶來較高的流量脈衝,若是沒作相應的保護,將致使系統超負荷甚至崩潰。若是因限制太過致使請求大量失敗而影響用戶體驗,能夠利用MQ 超高性能的消息處理能力來解決。
經過上、下游業務系統的鬆耦合設計,好比:交易系統的下游子系統(如積分等)出現不可用甚至宕機,都不會影響到核心交易系統的正常運轉。
FIFO原理相似,MQ提供的順序消息即保證消息的先進先出,能夠應用於交易系統中的訂單建立、支付、退款等流程。
好比阿里的交易系統、支付紅包等場景須要確保數據的最終一致性,須要引入 MQ 的分佈式事務,既實現了系統之間的解耦,又能夠保證最終的數據一致性。
優勢:配置簡單,方便部署
缺點:風險較大,一旦Broker重啓或者宕機,會致使整個服務不可用
一個集羣無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master
優勢:配置簡單,單個Master宕機重啓對應用沒有影響。消息不會丟失
缺點:單臺機器宕機期間,這臺機器上沒有被消費的消息在恢復以前不可訂閱,消息實時性會受到影響。
每一個Master配置一個Slave,採用異步複製方式,主備有短暫消息延遲
優勢:由於Master 宕機後,消費者仍然能夠從 Slave消費,此過程對應用透明。不須要人工干預。性能同多 Master 模式幾乎同樣。
缺點:Master宕機後,會丟失少許信息
每一個Master配置一個Slave,採用同步雙寫方式,只有主和備都寫成功,才返回成功
優勢:數據與服務都無單點, Master宕機狀況下,消息無延遲,服務可用性與數據可用性都很是高
缺點:性能比異步複製模式略低,大約低 10%左右,發送單個消息的 RT會略高。目前主宕機後,備機不能自動切換爲主機,後續會支持自動切換功能
建立一個maven工程,導入依賴
<dependencies> <!--rocket--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> <dependency> <!--順序消息中,模擬了一個消息集合,加入了lombok--> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.22</version> </dependency> </dependencies>
/** * 普通消息生產者 */ public class Producer { public static void main(String[] args) throws Exception { // 建立一個消息發送入口對象,主要用於消息發送,指定生產者組 DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); // 設置NameServe地址,若是是集羣環境,用分號隔開 producer.setNamesrvAddr("127.0.0.1:9876"); // 啓動並建立消息發送組件 producer.start(); // topic的名字 String topic = "rocketDemo1"; // 標籤名 String taget = "tag"; // 要發送的數據 String body = "hello,RocketMq"; Message message = new Message(topic,taget,body.getBytes()); // 發送消息 SendResult result = producer.send(message); System.out.println(result); // 關閉消息發送對象 producer.shutdown(); } }
/** * 普通消息消費者 */ public class Consumer { public static void main(String[] args) throws Exception { // 建立一個消費管理對象,並建立消費者組名字 DefaultMQPushConsumer consumerGroup = new DefaultMQPushConsumer("ConsumerGroup"); // 設置NameServer地址,若是是集羣環境,用逗號分隔 consumerGroup.setNamesrvAddr("127.0.0.1:9876"); // 設置要讀取的消息主題和標籤 consumerGroup.subscribe("rocketDemo1", "*"); // 設置回調函數,處理消息 //注意:MessageListenerConcurrently -- 並行消費監聽 consumerGroup.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { //讀取消息記錄 for (MessageExt messageExt : msgs) { //獲取消息主題 String topic = messageExt.getTopic(); //獲取消息標籤 String tags = messageExt.getTags(); //獲取消息體內容 String body = new String(messageExt.getBody(), "UTF-8"); System.out.println("topic:" + topic + ",tags:" + tags + ",body:" + body); } } catch (Exception e) { e.printStackTrace(); } //返回消費成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 運行消息消費對象 consumerGroup.start(); } }
消息有序指的是能夠按照消息的發送順序來消費。RocketMQ是經過將「相同ID的消息發送到同一個隊列,而一個隊列的消息只由一個消費者處理「來實現順序消息 。
如何保證順序
/** * 模擬消息 */ @Data @AllArgsConstructor @NoArgsConstructor public class Order { private Long orderId; private String desc; public static List<Order> buildOrders(){ List<Order> list = new ArrayList<Order>(); Order order1001a = new Order(1001L,"建立"); Order order1004a = new Order(1004L,"建立"); Order order1006a = new Order(1006L,"建立"); Order order1009a = new Order(1009L,"建立"); list.add(order1001a); list.add(order1004a); list.add(order1006a); list.add(order1009a); Order order1001b = new Order(1001L,"付款"); Order order1004b = new Order(1004L,"付款"); Order order1006b = new Order(1006L,"付款"); Order order1009b = new Order(1009L,"付款"); list.add(order1001b); list.add(order1004b); list.add(order1006b); list.add(order1009b); Order order1001c = new Order(1001L,"完成"); Order order1006c = new Order(1006L,"完成"); list.add(order1001c); list.add(order1006c); return list; } }
/** * Producer端確保消息順序惟一要作的事情就是將消息路由到特定的隊列, * 在RocketMQ中,經過MessageQueueSelector來實現分區的選擇 */ public class ProducerOrder { //nameserver地址 private static String namesrvaddress="127.0.0.1:9876;"; public static void main(String[] args) throws Exception { //建立DefaultMQProducer DefaultMQProducer producer = new DefaultMQProducer("order_producer_name"); //設置namesrv地址 producer.setNamesrvAddr(namesrvaddress); //啓動Producer producer.start(); List<Order> orderList = Order.buildOrders(); for (Order order : orderList) { String body = order.toString(); //建立消息 Message message = new Message("orderTopic","order",body.getBytes()); //發送消息 SendResult sendResult = producer.send( message, new MessageQueueSelector() { /** * * @param mqs topic中的隊列集合 * @param msg 消息對象 * @param arg 業務參數 * @return */ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //參數是訂單id號 Long orderId = (Long) arg; //肯定選擇的隊列的索引 long index = orderId % mqs.size(); return mqs.get((int) index); } }, order.getOrderId()); System.out.println("發送結果="+sendResult); } //關閉Producer producer.shutdown(); } }
/** * 消費者端實現MessageListenerOrderly介口監聽消息來實現順序消息 */ public class ConsumerOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); //從第一個開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("orderTopic","*"); //MessageListenerOrderly 順序消費 consumer.registerMessageListener(new MessageListenerOrderly() { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { System.out.println("當前線程:"+Thread.currentThread().getName()+",接收消息:"+new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer started.%n"); } }
RocketMQ 支持定時(延遲)消息,可是不支持任意時間精度,僅支持特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。
延遲消息能夠在生產者中直接設置,也能夠在rocketmq的配置文件broker.conf中配置:messageDelayLevel=1s|5s|1m|2m|1h|2h......
/** * 延遲消息 生產者 */ public class ProducerDelay { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("delay_producer"); //設置nameserver producer.setNamesrvAddr("127.0.0.1:9876"); //生產者開啓 producer.start(); //建立消息對象 Message message = new Message("delayTopic","delay","hello world".getBytes()); //設置延遲時間級別 message.setDelayTimeLevel(2); //發送消息 SendResult sendResult = producer.send(message); System.out.println(sendResult); //生產者關閉 producer.shutdown(); } }
/** * 延遲消息 消費者 */ public class ConsumerDelay { public static void main(String[] args) throws Exception { //建立消費者對象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); //設置nameserver consumer.setNamesrvAddr("127.0.0.1:9876"); //設置主題和tag consumer.subscribe("delayTopic","*"); //註冊消息監聽 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消息ID:"+msg.getMsgId()+"發送時間:"+new Date(msg.getStoreTimestamp())+",延遲時間:"+(System.currentTimeMillis()-msg.getStoreTimestamp())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //開啓消費者 consumer.start(); System.out.println("消費者啓動"); } }
/** * 批量 生產者 */ public class ProducerBatch { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("delay_producer"); //設置nameserver producer.setNamesrvAddr("127.0.0.1:9876"); //生產者開啓 producer.start(); //建立消息對象 集合 String topic = "batchTopic"; String tag = "batch"; List<Message> messageList = new ArrayList<Message>(); Message message1 = new Message(topic,tag,"hello world1".getBytes()); Message message2 = new Message(topic,tag,"hello world2".getBytes()); Message message3 = new Message(topic,tag,"hello world3".getBytes()); messageList.add(message1); messageList.add(message2); messageList.add(message3); //發送消息 SendResult sendResult = producer.send(messageList); System.out.println(sendResult); //生產者關閉 producer.shutdown(); } }
/** * 批量消費者 */ public class ConsumerBatch { public static void main(String[] args) throws Exception { //建立消費者對象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); //設置nameserver consumer.setNamesrvAddr("127.0.0.1:9876"); //設置主題和tag consumer.subscribe("batchTopic","*"); //註冊消息監聽 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消息ID:"+msg.getMsgId()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //開啓消費者 consumer.start(); System.out.println("消費者啓動"); } }
rocketmq默認採用的是集羣消費,咱們想要使用廣播消費,只需在消費者中加入consumer.setMessageModel(MessageModel.BROADCASTING)
這段配置,MessageModel.CLUSTERING
爲集羣模式,是默認的;
/** * 生產者 */ public class ProducerBroadcast { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("delay_producer"); //設置nameserver producer.setNamesrvAddr("127.0.0.1:9876"); //生產者開啓 producer.start(); //建立消息對象 集合 String topic = "broadcastTopic"; String tag = "broad"; List<Message> messageList = new ArrayList<Message>(); Message message1 = new Message(topic,tag,"hello world1".getBytes()); Message message2 = new Message(topic,tag,"hello world2".getBytes()); Message message3 = new Message(topic,tag,"hello world3".getBytes()); messageList.add(message1); messageList.add(message2); messageList.add(message3); //發送消息 SendResult sendResult = producer.send(messageList); System.out.println(sendResult); //生產者關閉 producer.shutdown(); } }
/** * 消費者1 */ public class ConsumerBroadcast1 { public static void main(String[] args) throws Exception { //建立消費者對象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); //設置nameserver consumer.setNamesrvAddr("127.0.0.1:9876"); //設置主題和tag consumer.subscribe("broadcastTopic","*"); //設置消息模式 爲 廣播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //註冊消息監聽 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消費者1:消息ID:"+msg.getMsgId()+",內容"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //開啓消費者 consumer.start(); System.out.println("消費者1啓動"); } }
/** * 消費者2 */ public class ConsumerBroadcast2 { public static void main(String[] args) throws Exception { //建立消費者對象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); //設置nameserver consumer.setNamesrvAddr("127.0.0.1:9876"); //設置主題和tag consumer.subscribe("broadcastTopic","*"); //設置消息模式 爲 廣播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //註冊消息監聽 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消費者2:消息ID:"+msg.getMsgId()+",內容"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //開啓消費者 consumer.start(); System.out.println("消費者2啓動"); } }
建立一個maven工程,導入依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.2.1.RELEASE</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>2.2.1.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.22</version> </dependency> </dependencies>
/** * 模擬消息 */ @Data @AllArgsConstructor @NoArgsConstructor public class Order { private Long orderId; private String desc; public static List<Order> buildOrders(){ List<Order> list = new ArrayList<Order>(); Order order1001a = new Order(1001L,"1001建立"); Order order1004a = new Order(1004L,"1004建立"); Order order1006a = new Order(1006L,"1006建立"); Order order1009a = new Order(1009L,"1009建立"); list.add(order1001a); list.add(order1004a); list.add(order1006a); list.add(order1009a); Order order1001b = new Order(1001L,"1001付款"); Order order1004b = new Order(1004L,"1004付款"); Order order1006b = new Order(1006L,"1006付款"); Order order1009b = new Order(1009L,"1009付款"); list.add(order1001b); list.add(order1004b); list.add(order1006b); list.add(order1009b); Order order1001c = new Order(1001L,"1001完成"); Order order1006c = new Order(1006L,"1006完成"); list.add(order1001c); list.add(order1006c); return list; } }
@Slf4j @RunWith(SpringRunner.class) @SpringBootTest public class RocketMQTest { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 普通消息生產者 */ @Test public void testSend(){ rocketMQTemplate.convertAndSend("testTopic","這是測試消息!"); } /** * 延遲消息生產者 */ @Test public void testDelaySend(){ SendResult sendResult = rocketMQTemplate.syncSend("testTopic", new GenericMessage("這是延遲測試消息!"+new Date()), 10000, 4); log.info("sendResult=="+sendResult); } /** * 順序消息 生產者 */ @Test public void testOrderlySend(){ List<Order> orderList = Order.buildOrders(); for (Order order : orderList) { //發送消息 rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //參數是訂單id號 Long orderId = Long.valueOf((String)arg); //肯定選擇的隊列的索引 long index = orderId % mqs.size(); log.info("mqs is ::" + mqs.get((int) index)); return mqs.get((int) index); } }); SendResult sendOrderly = rocketMQTemplate.syncSendOrderly("testTopicOrderLy", new GenericMessage<String>(order.toString()), order.getOrderId().toString()); log.info("發送結果="+sendOrderly+",orderid :"+order.getOrderId()); } } }
/** * 普通、延遲消息 消費者代碼 */ @Component @RocketMQMessageListener(consumerGroup = "myConsumer", topic = "testTopic") public class RocketConsumer implements RocketMQListener<String> { public void onMessage(String message) { System.out.println("接收到消息:="+message); } }
/** * 順序消息 ,消費者 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "myConsumerOrderly", topic = "testTopicOrderLy",consumeMode = ConsumeMode.ORDERLY) public class RocketConsumerOrderly implements RocketMQListener<String> { public void onMessage(String message) { log.info("當前線程:"+Thread.currentThread().getName()+",接收到消息:="+message); } }
1.啓動mqnamesrv.cmd start mqnamesrv.cmd
成功的彈窗,此框勿關閉。
2.啓動mqbroker.cmd start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
成功的彈窗,此框勿關閉。
注意:假如彈出提示框提示‘錯誤: 找不到或沒法加載主類 xxxxxx’。打開runbroker.cmd,而後將‘%CLASSPATH%’加上英文雙引號。保存並從新執行start語句。
1) 下載地址: https://github.com/apache/rocketmq-externals/releases
2) 修改rocketmq-console\src\main\resources\application.properties,修改以下:
3) cmd窗口執行:mvn clean package -Dmaven.test.skip=true
4) jar包運行:java -jar rocketmq-console-ng-1.0.0.jar
5) 測試輸入地址: http://127.0.0.1:8080/#/ops