SpringBoot RocketMQ 整合使用和監控

圖片

前提

經過前面兩篇文章能夠簡單的瞭解 RocketMQ 和 安裝 RocketMQ ,今天就將 SpringBoot 和 RocketMQ 整合起來使用。java

建立項目

在 IDEA 建立一個 SpringBoot 項目,項目結構以下:git

圖片

pom 文件

引入 RocketMQ 的一些相關依賴,最後的 pom 文件以下:github

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.zhisheng</groupId>
    <artifactId>rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>rocketmq</name>
    <description>Demo project for Spring Boot RocketMQ</description>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.2.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

配置文件

application.properties 中以下:web

# 消費者的組名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生產者的組名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=localhost:9876

生產者

package com.zhisheng.rocketmq.client;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import javax.annotation.PostConstruct;
/**
 * Created by zhisheng_tian on 2018/2/6
 */
@Component
public class RocketMQClient {
    /**
     * 生產者的組名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;
    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;
    @PostConstruct
    public void defaultMQProducer() {
        //生產者的組名
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多個地址以 ; 隔開
        producer.setNamesrvAddr(namesrvAddr);
        try {
            /**
             * Producer對象在使用以前必需要調用start初始化,初始化一次便可
             * 注意:切記不能夠在每次發送消息時,都調用start方法
             */
            producer.start();
               //建立一個消息實例,包含 topic、tag 和 消息體
             //以下:topic 爲 "TopicTest",tag 爲 "push"
            Message message = new Message("TopicTest", "push", "發送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
            StopWatch stop = new StopWatch();
            stop.start();
            for (int i = 0; i < 10000; i++) {
                SendResult result = producer.send(message);
                System.out.println("發送響應:MsgId:" + result.getMsgId() + ",發送狀態:" + result.getSendStatus());
            }
            stop.stop();
            System.out.println("----------------發送一萬條消息耗時:" + stop.getTotalTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }
}

消費者

package com.zhisheng.rocketmq.server;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
 * Created by zhisheng_tian on 2018/2/6
 */
@Component
public class RocketMQServer {
    /**
     * 消費者的組名
     */
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;
    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;
    @PostConstruct
    public void defaultMQPushConsumer() {
        //消費者的組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        //指定NameServer地址,多個地址以 ; 隔開
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //訂閱PushTopic下Tag爲push的消息
            consumer.subscribe("TopicTest", "push");
            //設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費
            //若是非第一次啓動,那麼按照上次消費的位置繼續消費
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {
                        System.out.println("messageExt: " + messageExt);//輸出消息內容
                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("消費響應:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//輸出消息內容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍後再試
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

啓動類

package com.zhisheng.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketmqApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketmqApplication.class, args);
    }
}

RocketMQ

代碼已經都寫好了,接下來咱們須要將與 RocketMQ 有關的啓動起來。spring

啓動 Name Server

在前面文章中已經寫過怎麼啓動,http://www.54tianzhisheng.cn/2018/02/06/RocketMQ-install/#%E5%90%AF%E5%8A%A8-NameServerdocker

進入到目錄 :apache

cd distribution/target/apache-rocketmq

啓動:app

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log //經過日誌查看是否啓動成功

啓動 Broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log    //經過日誌查看是否啓動成功

而後運行啓動類,運行效果以下:maven

圖片

監控

RocketMQ有一個對其擴展的開源項目 ocketmq-console ,現在也提交給了 Apache ,地址在:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console ,官方也給出了其支持的功能的中文文檔:https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/100/UserGuide_CN.md , 那麼該如何安裝?ide

Docker 安裝

一、獲取 Docker 鏡像

docker pull styletang/rocketmq-console-ng

二、運行,注意將你本身的 NameServer 地址替換下面的 127.0.0.1

docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng

非 Docker 安裝

咱們 git clone 一份代碼到本地:

git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-externals/rocketmq-console/

須要 jdk 1.7 以上。 執行如下命令:

mvn spring-boot:run

或者

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

注意:

一、若是你下載依賴緩慢,你能夠從新設置 maven 的 mirror 爲阿里雲的鏡像

<mirrors>
    <mirror>
          <id>alimaven</id>
          <name>aliyun maven</name>
          <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
          <mirrorOf>central</mirrorOf>        
    </mirror>
</mirrors>

二、若是你使用的 RocketMQ 版本小於 3.5.8,若是您使用 rocketmq < 3.5.8,請在啓動 rocketmq-console-ng 時添加 -Dcom.rocketmq.sendMessageWithVIPChannel=false(或者您能夠在 ops 頁面中更改它)

三、更改 resource / application.properties 中的 rocketmq.config.namesrvAddr(或者能夠在ops頁面中更改它)

錯誤解決方法

一、Docker 啓動項目報錯

org.apache.rocketmq.remoting.exception.RemotingConnectException:connect to<null>failed

圖片

將 Docker 啓動命令改爲以下之後:

docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng

報錯信息改變了,新的報錯信息以下:

ERROR op=global_exception_handler_print_error
org.apache.rocketmq.console.exception.ServiceException: This date have't data!

圖片

看到網上有人也遇到這個問題,他們都經過本身的方式解決了,可是方法我都試了,不適合我。不得不說,阿里,你能再用心點嗎?既然把 RocketMQ 捐給 Apache 了,這些文檔啥的都必須更新啊,不要還滯後着呢,否則少不了被吐槽!

搞了好久這種方法沒成功,暫時放棄!mmp

二、非 Docker 安裝,只好把源碼編譯打包了。

1) 注意須要修改以下圖中的配置:

圖片

rocketmq.config.namesrvAddr=localhost:9876        //注意替換你本身的ip
#若是你 rocketmq 版本小於 3.5.8 才需設置 `rocketmq.config.isVIPChannel` 爲 false,默認是 true, 這個能夠在源碼中能夠看到的
rocketmq.config.isVIPChannel=

2) 執行如下命令:

mvn clean package -Dmaven.test.skip=true

編譯成功:

圖片

能夠看到已經打好了 jar 包:

運行:

java -jar rocketmq-console-ng-1.0.0.jar

成功,不報錯了,開心,訪問 http://localhost:8080/

圖片

圖片

圖片

圖片

圖片

圖片

圖片

整個監控大概就是這些了。

而後我運行以前的 SpringBoot 整合項目,查看監控信息以下:

圖片

總結

整篇文章講述了 SpringBoot 與 RocketMQ 整合和 RocketMQ 監控平臺的搭建。

參考文章

一、http://www.ymq.io/2018/02/02/spring-boot-rocketmq-example/#%E6%96%B0%E5%8A%A0%E9%A1%B9%E7%9B%AE

二、GitHub 官方 README

相關文章

一、SpringBoot Kafka 整合使用

二、SpringBoot RabbitMQ 整合使用

三、SpringBoot ActiveMQ 整合使用

四、Kafka 安裝及快速入門

五、SpringBoot RabbitMQ 整合進階版

六、RocketMQ 初探

七、RocketMQ 安裝及快速入門

關注我

圖片

轉載請務必註明原創地址爲:http://www.54tianzhisheng.cn/2018/02/07/SpringBoot-RocketMQ/

相關文章
相關標籤/搜索