Rocket MQ 問題排查命令

修改rocketmq官方代碼測試:java

package com.alibaba.middleware.race.rocketmq;
import java.util.Scanner;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
/**
 * Producer,模擬rocket mq使用中可能出現的問題,學習如何排查q問題
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
        DefaultMQProducer producer = new DefaultMQProducer("procedure_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        final String topics = "TOPIC-IT-WORKER-TEST";
        for (int i = 0; i < 1000; i++) {
            @SuppressWarnings("resource")
            Scanner reader=new Scanner(System.in);
            int key = reader.nextInt();
            final String message = " order-message - " + i + " key: " + key;
            byte[] body = message.getBytes();
            Message msgToBroker = new Message(topics, "tag-push", String.valueOf(key), body);
            producer.send(msgToBroker, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.println(message);
                }
                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                }
            });
        }
    }
}
package com.alibaba.middleware.race.rocketmq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Scanner;

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("TOPIC-IT-WORKER-TEST", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    Scanner reader=new Scanner(System.in);
                    reader.hasNext();
                    byte[] body = msg.getBody();
                    if (body.length == 2 && body[0] == 0 && body[1] == 0) {
                        System.out.println("Got the end signal");
                        continue;
                    }
                    String paymentMessage = new String(body);
                    System.out.println(paymentMessage + " key: " + msg.getKeys() + " tag: " + msg.getTags());
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.alibaba.race</groupId>
    <artifactId>preliminary.demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.3</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptors>
                        <descriptor>src/main/resources/assembly.xml</descriptor>
                    </descriptors>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>install</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

<!--    com.alibaba.middleware.race.jstorm-2.1.1版本默認的日誌框架是logback,爲了不日誌衝突,排除掉log4j-->
    <dependencies>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.2.6</version>
        </dependency>
    </dependencies>
</project>
增長selector選擇器,根據key選擇進入的Broker隊列
producer.send(msgToBroker, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msgToBroker, Object arg) {
                    // 根據key來選擇隊列
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    for (MessageQueue mq : mqs) {
                        System.out.println("current queue: " + mq.getQueueId());
                    }
                    System.out.println("select id: " + index);
                    return mqs.get(index);
                }
            }, key, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.println(message);
                }

                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                }
            });

 

編譯後啓動服務端和客戶端
進入target目錄
啓動生產者生產數據:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Producer
啓動消費者消費數據:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Consumer

 

啓動&參數修改apache

mqnamesrv   啓動NameServer  jps - NamesrvStartup
mqbroker -n localhost:9876 啓動broker jps - BrokerStartup 默認端口10911
mqadmin updateBrokerConfig -c DefaultCluster -n 127.0.0.1:9876 -k listenPort -v 10911 更新broker參數配置

查看當前系統狀態app

mqadmin clusterList -n 127.0.0.1:9876

查看當前全部topicList框架

mqadmin topicList -n 127.0.0.1:9876

查看broker狀態socket

mqadmin brokerStatus -n 127.0.0.1:9876 -b 127.0.0.1:10911

 查看某個topic的狀態maven

mqadmin topicStatus -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST
當前可見,producer只發送了一條消息,Max offset爲1,最後收到消息的時間是last updated,因爲配置四個broker都是本機,只有第一個收到了當前第一條消息
第二張圖爲發了四條消息以後的狀態,看起來可能就是輪詢的,由於當我增長4條key爲1的msg以後,仍然是四個節點每一個兩條

 

查看鏈接的procedure/consumeride

mqadmin producerConnection -n 127.0.0.1:9876 -g procedure_group_name -t TOPIC-IT-WORKER-TEST
mqadmin consumerConnection -n 127.0.0.1:9876 -g consumer_group_name

查看某個key對應的msg學習

mqadmin queryMsgByKey -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST-1 -k 1
由於以前發送了5條key爲1的數據,因此這裏能夠看到是5條,每條都有一個MESSAGE ID

根據ID查看對應的MSG測試

mqadmin queryMsgById  -g consumer_group_name -i AC1F78B700002A9F00000000000A3208  -n 127.0.0.1:9876

根據位置偏移查詢上面的那條數據ui

 mqadmin queryMsgByOffset -n  127.0.0.1:9876 -o 1 -t TOPIC-IT-WORKER-TEST-1 -i 1 -b izm5e210z0uiwyavdbmpxaz

查看消費詳情

mqadmin consumerProgress -n 127.0.0.1:9876 -g consumer_group_name
這裏消費了一條,一共八條,差7條沒有消費

重置消費端offset

mqadmin resetOffsetByTime  -n 127.0.0.1:9876 -g consumer_group_name -t TOPIC-IT-WORKER-TEST-1 -f true -s 1536820000

打印broker中某個隊列裏的消息

mqadmin printMsgByQueue -a izm5e210z0uiwyavdbmpxaz -t T1 -n 127.0.0.1:9876 -i 1 -p true -d true
這裏能夠看出,storeSize最後多了一條緣由就是最後的body裏12是兩位,廢話。。。

直接打印消息

mqadmin printMsg -t TOPIC-IT-WORKER-TEST-1 -n 127.0.0.1:9876

The most commonly used mqadmin commands are:
   updateTopic          Update or create topic
   deleteTopic          Delete topic from broker and NameServer.
   updateSubGroup       Update or create subscription group
   deleteSubGroup       Delete subscription group from broker.
   updateBrokerConfig   Update broker's config
   updateTopicPerm      Update topic perm
   topicRoute           Examine topic route info
   topicStatus          Examine topic Status info
   topicClusterList     get cluster info for topic
   brokerStatus         Fetch broker runtime status data
   queryMsgById         Query Message by Id
   queryMsgByKey        Query Message by Key
   queryMsgByUniqueKey  Query Message by Unique key
   queryMsgByOffset     Query Message by offset
   queryMsgByUniqueKey  Query Message by Unique key
   printMsg             Print Message Detail
   printMsgByQueue      Print Message Detail
   sendMsgStatus        send msg to broker.
   brokerConsumeStats   Fetch broker consume stats data
   producerConnection   Query producer's socket connection and client version
   consumerConnection   Query consumer's socket connection, client version and subscription
   consumerProgress     Query consumers's progress, speed
   consumerStatus       Query consumer's internal data structure
   cloneGroupOffset     clone offset from other group.
   clusterList          List all of clusters
   topicList            Fetch all topic list from name server
   updateKvConfig       Create or update KV config.
   deleteKvConfig       Delete KV config.
   wipeWritePerm        Wipe write perm of broker in all name server
   resetOffsetByTime    Reset consumer offset by timestamp(without client restart).
   updateOrderConf      Create or update or delete order conf
   cleanExpiredCQ       Clean expired ConsumeQueue on broker.
   cleanUnusedTopic     Clean unused topic on broker.
   startMonitoring      Start Monitoring
   statsAll             Topic and Consumer tps stats
   allocateMQ           Allocate MQ
   checkMsgSendRT       check message send response time
   clusterRT            List All clusters Message Send RT
   getNamesrvConfig     Get configs of name server.
   updateNamesrvConfig  Update configs of name server.
   getBrokerConfig      Get broker config by cluster or special broker!
   queryCq              Query cq command.
相關文章
相關標籤/搜索