修改rocketmq官方代碼測試:java
package com.alibaba.middleware.race.rocketmq; import java.util.Scanner; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendCallback; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer,模擬rocket mq使用中可能出現的問題,學習如何排查q問題 */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException { DefaultMQProducer producer = new DefaultMQProducer("procedure_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); final String topics = "TOPIC-IT-WORKER-TEST"; for (int i = 0; i < 1000; i++) { @SuppressWarnings("resource") Scanner reader=new Scanner(System.in); int key = reader.nextInt(); final String message = " order-message - " + i + " key: " + key; byte[] body = message.getBytes(); Message msgToBroker = new Message(topics, "tag-push", String.valueOf(key), body); producer.send(msgToBroker, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.println(message); } public void onException(Throwable throwable) { throwable.printStackTrace(); } }); } } }
package com.alibaba.middleware.race.rocketmq; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; import java.util.Scanner; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("TOPIC-IT-WORKER-TEST", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { Scanner reader=new Scanner(System.in); reader.hasNext(); byte[] body = msg.getBody(); if (body.length == 2 && body[0] == 0 && body[1] == 0) { System.out.println("Got the end signal"); continue; } String paymentMessage = new String(body); System.out.println(paymentMessage + " key: " + msg.getKeys() + " tag: " + msg.getTags()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.alibaba.race</groupId> <artifactId>preliminary.demo</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.3</version> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptors> <descriptor>src/main/resources/assembly.xml</descriptor> </descriptors> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>install</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <!-- com.alibaba.middleware.race.jstorm-2.1.1版本默認的日誌框架是logback,爲了不日誌衝突,排除掉log4j--> <dependencies> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> </dependencies> </project>
增長selector選擇器,根據key選擇進入的Broker隊列 producer.send(msgToBroker, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msgToBroker, Object arg) { // 根據key來選擇隊列 Integer id = (Integer) arg; int index = id % mqs.size(); for (MessageQueue mq : mqs) { System.out.println("current queue: " + mq.getQueueId()); } System.out.println("select id: " + index); return mqs.get(index); } }, key, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.println(message); } public void onException(Throwable throwable) { throwable.printStackTrace(); } });
編譯後啓動服務端和客戶端 進入target目錄 啓動生產者生產數據:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Producer 啓動消費者消費數據:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Consumer
啓動&參數修改apache
mqnamesrv 啓動NameServer jps - NamesrvStartup mqbroker -n localhost:9876 啓動broker jps - BrokerStartup 默認端口10911 mqadmin updateBrokerConfig -c DefaultCluster -n 127.0.0.1:9876 -k listenPort -v 10911 更新broker參數配置
查看當前系統狀態app
mqadmin clusterList -n 127.0.0.1:9876
查看當前全部topicList框架
mqadmin topicList -n 127.0.0.1:9876
查看broker狀態socket
mqadmin brokerStatus -n 127.0.0.1:9876 -b 127.0.0.1:10911
查看某個topic的狀態maven
mqadmin topicStatus -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST
當前可見,producer只發送了一條消息,Max offset爲1,最後收到消息的時間是last updated,因爲配置四個broker都是本機,只有第一個收到了當前第一條消息
第二張圖爲發了四條消息以後的狀態,看起來可能就是輪詢的,由於當我增長4條key爲1的msg以後,仍然是四個節點每一個兩條
查看鏈接的procedure/consumeride
mqadmin producerConnection -n 127.0.0.1:9876 -g procedure_group_name -t TOPIC-IT-WORKER-TEST
mqadmin consumerConnection -n 127.0.0.1:9876 -g consumer_group_name
查看某個key對應的msg學習
mqadmin queryMsgByKey -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST-1 -k 1
由於以前發送了5條key爲1的數據,因此這裏能夠看到是5條,每條都有一個MESSAGE ID
根據ID查看對應的MSG測試
mqadmin queryMsgById -g consumer_group_name -i AC1F78B700002A9F00000000000A3208 -n 127.0.0.1:9876
根據位置偏移查詢上面的那條數據ui
mqadmin queryMsgByOffset -n 127.0.0.1:9876 -o 1 -t TOPIC-IT-WORKER-TEST-1 -i 1 -b izm5e210z0uiwyavdbmpxaz
查看消費詳情
mqadmin consumerProgress -n 127.0.0.1:9876 -g consumer_group_name
這裏消費了一條,一共八條,差7條沒有消費
重置消費端offset
mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g consumer_group_name -t TOPIC-IT-WORKER-TEST-1 -f true -s 1536820000
打印broker中某個隊列裏的消息
mqadmin printMsgByQueue -a izm5e210z0uiwyavdbmpxaz -t T1 -n 127.0.0.1:9876 -i 1 -p true -d true
這裏能夠看出,storeSize最後多了一條緣由就是最後的body裏12是兩位,廢話。。。
直接打印消息
mqadmin printMsg -t TOPIC-IT-WORKER-TEST-1 -n 127.0.0.1:9876
The most commonly used mqadmin commands are: updateTopic Update or create topic deleteTopic Delete topic from broker and NameServer. updateSubGroup Update or create subscription group deleteSubGroup Delete subscription group from broker. updateBrokerConfig Update broker's config updateTopicPerm Update topic perm topicRoute Examine topic route info topicStatus Examine topic Status info topicClusterList get cluster info for topic brokerStatus Fetch broker runtime status data queryMsgById Query Message by Id queryMsgByKey Query Message by Key queryMsgByUniqueKey Query Message by Unique key queryMsgByOffset Query Message by offset queryMsgByUniqueKey Query Message by Unique key printMsg Print Message Detail printMsgByQueue Print Message Detail sendMsgStatus send msg to broker. brokerConsumeStats Fetch broker consume stats data producerConnection Query producer's socket connection and client version consumerConnection Query consumer's socket connection, client version and subscription consumerProgress Query consumers's progress, speed consumerStatus Query consumer's internal data structure cloneGroupOffset clone offset from other group. clusterList List all of clusters topicList Fetch all topic list from name server updateKvConfig Create or update KV config. deleteKvConfig Delete KV config. wipeWritePerm Wipe write perm of broker in all name server resetOffsetByTime Reset consumer offset by timestamp(without client restart). updateOrderConf Create or update or delete order conf cleanExpiredCQ Clean expired ConsumeQueue on broker. cleanUnusedTopic Clean unused topic on broker. startMonitoring Start Monitoring statsAll Topic and Consumer tps stats allocateMQ Allocate MQ checkMsgSendRT check message send response time clusterRT List All clusters Message Send RT getNamesrvConfig Get configs of name server. updateNamesrvConfig Update configs of name server. getBrokerConfig Get broker config by cluster or special broker! queryCq Query cq command.