每章一點正能量:自我控制是最強者的本能。——蕭伯納
最近在學習消息中間件——RocketMQ,打算把這個學習過程記錄下來。此章主要介紹環境搭建。這次主要是單機搭建(條件有限),包括在Windows、Linux環境下的搭建,以及console監控平臺搭建,最後加一demo驗證一下。html
在搭建RocketMQ以前,請先確保以下環境已經搭建完畢java
沒有搭建的同窗走傳送門:linux
JDK環境搭建: JAVA8環境搭建
Maven環境搭建: Windows環境下使用Nexus 3.X 搭建Maven私服及使用介紹
Git環境搭建:Git環境搭建及配置git
官方網站:http://rocketmq.apache.org/github
目前最新版的是V4.5.0,點擊進去。
選擇下載 rocketmq-all-4.5.0-bin-release.zip。彈出另一個頁面,這裏選擇rocketmq-all-4.5.0-bin-release.zip進行下載。
下載成功後,選擇一個目錄放好並解壓。
web
以上操做完畢以後,進入目錄bin目錄,我這裏是H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin
。
找到runserver.cmd
和runbroker.cmd
中的JAVA_OPT。
原JAVA_OPT:spring
set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
將 Xms Xmx 這兩個值改小一些,改成1g,如:apache
set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
本身根據虛擬機內存大小設置,超出內存大小可能會報錯。
上述步驟執行完畢後,咱們須要將RocketMQ安裝目錄的bin目錄配置到環境變量中。編程
以上配置都完成,接下來就是啓動過程。中間有點坑,請務必按步驟安裝。vim
在RocketMQ安裝目錄的bin目錄下,執行命令cmd:
個人目錄:
H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin
能夠經過shift+鼠標右擊 觸發cmd窗口選項。也能夠經過win+R 在窗口輸入cmd,進入cmd窗口後移動到bin目錄下。
成功後會彈出提示框,此框勿關閉。
注意:假如彈出提示框提示‘錯誤: 找不到或沒法加載主類 xxxxxx’。打開runbroker.cmd,而後將‘%CLASSPATH%’加上英文雙引號。
打開 runbroker.cmd
進行修改
原:
set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"
修改後:
set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
再次執行命令:
啓動成功!
這時候一共有三個窗口。
下載地址:https://github.com/apache/roc...
下載完後如圖所示:選擇——>rocketmq-console
下載完成以後,進入‘rocketmq-externalsrocketmq-consolesrcmainresources’文件夾,打開‘application.properties’進行配置。
進入‘rocketmq-externalsrocketmq-console’文件夾,執行‘mvn clean package -Dmaven.test.skip=true’,編譯生成。中間有個比較慢的下載過程須要等待。
編譯成功以後,cmd進入‘target’文件夾,執行‘java -jar rocketmq-console-ng-1.0.1.jar’,啓動‘rocketmq-console-ng-1.0.1.jar’。
訪問地址:localhost:8082
下載JDK:https://www.oracle.com/techne...
下載須要的版本:
上傳到建立的目錄/usr/java
解壓命令
tar -zxvf jdk-8u181-linux-x64.tar.gz
配置環境變量命令
vim /etc/profile JAVA_HOME=/usr/java/jdk1.8.0_161 JRE_HOME=/usr/java/jdk1.8.0_161/jre CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin export JAVA_HOME JRE_HOME CLASS_PATH PATH source /etc/profile
驗證是否成功命令
java -version
按照以上操做,完成JDK的安裝。接下來安裝Maven環境。
wget http://mirror.bit.edu.cn/apache/maven/binaries/apache-maven-3.2.2-bin.tar.gz
tar -zxvf apache-maven-3.2.2-bin.tar.gz
vim /etc/profile #配置maven環境變量 export MAVEN_HOME=/usr/maven/apache-maven-3.5.4 export MAVEN_HOME export PATH=$PATH:$MAVEN_HOME/bin source /etc/profile
mvn -v
wget http://mirrors.hust.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-source-release.zip
unzip rocketmq-all-4.4.0-source-release.zip
進入解壓後的文件目錄。
mvn -Prelease-all -DskipTests clean install -U
同Windows環境同樣,修改JVM配置。
移動到目錄 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq/bin
中。編輯bin目錄下runserver.sh
與 runbroker.sh
文件。
根據我的虛擬機大小進行修改
vim runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m" vim runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
分別執行以下命令:
#修改環境變量 vim /etc/profile export ROCKETMQ=/home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq export PATH=$PATH:$ROCKETMQ/bin #更新配置 source /etc/profile
依然在以前的目錄 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq
##啓動命令 nohup sh bin/mqnamesrv >/dev/null 2>&1 & ##查看日誌 tail -f ~/logs/rocketmqlogs/namesrv.log
能夠看圖已經成功了!
##啓動命令 nohup sh bin/mqbroker -n localhost:9876 & ##查看日誌 tail -f ~/logs/rocketmqlogs/broker.log
注意防火牆,若是端口鏈接失敗,注意開通。
sh bin/mqshutdown broker //中止 broker sh bin/mqshutdown namesrv //中止 nameserver
同Windows平臺搭建
我這裏直接將Windows平臺打包好的jar包直接丟到了Linux系統中
java -jar rocketmq-console-ng-1.0.1.jar
訪問地址:http://192.168.220.72:8082
這裏不作過多介紹,能夠參考如下文章
官網地址:https://github.com/apache/roc...
其餘博客地址:https://guozh.net/rocketmqzhi...
案例整合環境:SpringBoot環境
案例來源於網絡
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.coderprogramming.rocketmq</groupId> <artifactId>rocketmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rocketmq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
** * @Description: 生產者 * @author Coder編程 * @date 2019/5/8 17:08 */ @Component public class Producer { /** * 生產者的組名 */ @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; public void orderedProducer() throws MQClientException, InterruptedException { /** * 一個應用建立一個Producer,由應用來維護此對象,能夠設置爲全局對象或者單例 * 注意:ProducerGroupName須要由應用來保證惟一 * ProducerGroup這個概念發送普通的消息時,做用不大,可是發送分佈式事務消息時,比較關鍵, * 由於服務器會回查這個Group下的任意一個Producer */ DefaultMQProducer producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr); /** * Producer對象在使用以前必需要調用start初始化,初始化一次便可 注意:切記不能夠在每次發送消息時,都調用start方法 */ producer.start(); /** * 下面這段代碼代表一個Producer對象能夠發送多個topic,多個tag的消息。 * 注意:send方法是同步調用,只要不拋異常就標識成功。可是發送成功也可會有多種狀態 * 例如消息寫入Master成功,可是Slave不成功,這種狀況消息屬於成功,可是對於個別應用若是對消息可靠性要求極高, * 須要對這種狀況作處理。另外,消息可能會存在發送失敗的狀況,失敗重試由應用來處理。 */ try { for (int i = 0; i < 10; i++) { Message msg = new Message("Topic1",// topic "TagA",// tag "001",// key ("Send Msg:Hello MetaQ1").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); Message msg2 = new Message("Topic2",// topic "TagB",// tag "002",// key ("Send Msg:Hello MetaQ2").getBytes());// body SendResult sendResult2 = producer.send(msg2); System.out.println(sendResult2); Message msg3 = new Message("Topic3",// topic "TagC",// tag "003",// key ("Send Msg:Hello MetaQ3").getBytes());// body SendResult sendResult3 = producer.send(msg3); System.out.println(sendResult3); } } catch (Exception e) { e.printStackTrace(); } /** * 應用退出時,要調用shutdown來清理資源,關閉網絡鏈接,從MetaQ服務器上註銷本身 * 注意:咱們建議應用在JBOSS、Tomcat等容器的退出鉤子裏調用shutdown方法 */ producer.shutdown(); } }
/** * @Description: 消費者 * @author Coder編程 * @date 2019/5/8 17:08 */ @Component public class Consumer { /** * 生產者的組名 */ @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; /** * 當前例子是PushConsumer用法,使用方式給用戶感受是消息從RocketMQ服務器推到了應用客戶端。 * 可是實際PushConsumer內部是使用長輪詢Pull方式從Broker拉消息,而後再回調用戶Listener方法 */ public void orderedConsumer() throws InterruptedException,MQClientException { /** * 一個應用建立一個Consumer,由應用來維護此對象,能夠設置爲全局對象或者單例 * 注意:ConsumerGroupName須要由應用來保證惟一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup); // consumer.setNamesrvAddr("10.10.0.102:9876"); consumer.setNamesrvAddr(namesrvAddr); /** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("Topic1", "TagA || TagC || TagD"); /** * 訂閱指定topic下全部消息<br> * 注意:一個consumer對象能夠訂閱多個topic */ consumer.subscribe("Topic2", "*"); consumer.subscribe("Topic3", "*"); /** * 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費 若是非第一次啓動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默認msgs裏只有一條消息,能夠經過設置consumeMessageBatchMaxSize參數來批量接收消息 */ @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("Topic1")) { if (null != msg.getTags()) { // 執行Topic1的消費邏輯 if (msg.getTags().equals("TagA")) { // 執行TagA的消費 System.out.println("TagA開始。"); } else if (msg.getTags().equals("TagC")) { System.out.println("TagC開始。"); // 執行TagC的消費 } else if (msg.getTags().equals("TagD")) { // 執行TagD的消費 System.out.println("TagD開始。"); } } } else if (msg.getTopic().equals("Topic2")) { // 執行Topic2的消費邏輯 System.out.println("Topic2"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer對象在使用以前必需要調用start初始化,初始化一次便可 */ consumer.start(); System.out.println("Consumer Started."); } }
# 消費者的組名 apache.rocketmq.consumer.PushConsumer=PushConsumer # 生產者的組名 apache.rocketmq.producer.producerGroup=Producer # NameServer地址 apache.rocketmq.namesrvAddr=192.168.220.72:9876 # 設置應用端口 server.port=8089
/** * @author Coder編程 * @Title: HelloWord * @ProjectName rocketmq * @Description: Hello World * @date 2019/5/814:14 */ @RestController public class Test { @Autowired private Producer producer; @Autowired private Consumer consumer; @RequestMapping("/test") public String testMQ2() { try { System.out.println("-----------------開始生產-----------------"); producer.orderedProducer(); System.out.println("-----------------開始消費-----------------"); consumer.orderedConsumer(); } catch (Exception e) { e.printStackTrace(); } return "success"; } }
以上安裝jar包和案例測試源碼已經上傳至GitHub/Gitee
源碼地址:
歡迎關注公衆號: Coder編程
獲取最新原創技術文章和相關免費學習資料,隨時隨地學習技術知識!