最近在本身本地搭建rocketMQ,過程當中遇到一些問題,如今總結一下,便於之後查看.html
首先打開rocket官網:http://rocketmq.apache.org, 點擊latest realease,有source版本和binary版本供下載,點擊quikstart能夠看到source版本的構建步驟:java
source版本須要先安裝瞭如下軟件:apache
我係統環境是OS 10.13.6,下載的是binary版本,省去了構建成爲二進制文件的步驟.bash
將rocketmq-all-4.4.0-bin-release.zip文件解壓後獲得rocketmq-all-4.4.0-bin-release文件夾,服務器
配置環境變量ROCKETMQ_HOME=xxx/rocketmq-all-4.4.0-bin-release,其中xxx表示你的文件夾父路徑,異步
例如/Users/david/Downloads/packages/rocketmq-all-4.4.0-bin-release.async
再將ROCKETMQ_HOME加入到PATH中如 $ROCKETMQ_HOME/bin:$PATHmaven
打開teiminal,輸入sh mqnamesrv啓動名稱服務器,以下圖:ide
能夠看到」Java HotSpot(TM) 64-Bit Server VM warning「警告信息,不要怕,工具
它是說CMS垃圾收集器已通過時,在將來的JVM版本中會被其餘收集器替代,能夠不用管他.
namesrv已經正常啓動了
一樣,再開一個terminal,輸入sh mqbroker 啓動broker.
若是在啓動過程當中遇到:
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
不要慌,這是說你的JAVA_HOME環境變量設置有問題,請檢查環境變量.
若是檢查後發現環境變量沒問題,但仍是報以上錯誤,這多是系統問題吧,
記得OS系統有好幾個文件能夠配置環境變量,最多見的是用戶目錄下的.bash_profile,對當前用戶生效,還有一個全局的,在etc目錄下的profile文件
能夠再確認一下,若是兩個文件都配置了依然報錯,那隻好用最戳的方式解決:
打開mq的安裝目錄,找到bin文件夾下的runserver.sh和runbroker.sh,
在文件中主動加入你的JAVA_HOME,以下圖:
一切順利解決,至此namesrv和broker都已經成功啓動,下面來寫幾個Javademo
首先加入maven依賴包:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
使用GRADLE構建的話加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
使用GRADLE構建的話加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
使用RocketMQ有如下3種發送消息的方式,分別適用不一樣場景:
咱們選第一種發送方式寫一個demo:
package com.example.demo.common.mq.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
經過循環發送100條消息,運行代碼,結果拋出以下錯誤:
該錯誤表示找不到這個topic的路由,關於這個錯誤,網上也有一些對策,無外乎如下幾個緣由:
1.broker沒有連上namesrv
2.producer沒有連上namesrv
3.namesrv沒有建立並維護該topic信息
4.netty包版本衝突
5.防火牆問題
前面3條能夠經過查找日誌肯定問題,
1).broker.log裏面若是查到register broker to name server localhost:9876 OK 這樣的信息表示broker已經連上namesrv了.
若是沒有,那麼你能夠從新啓動broker: sh mqbroker -n localhost:9876
2).namesrv.log裏面若是可以查到 new topic registered, TopicTest QueueData這樣的信息,表示你的topic也已經被成功建立.
若是沒有,那麼你能夠經過mq的admin工具主動生成該topic: sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t TopicTest
3).producer有沒有連上namesrv,只需檢查一下代碼中namesrv的值對不對就好了
4).至於4和5那就須要你本身去跟蹤源碼發現問題了.
如今再來運行生產者代碼,成功發送100條消息:
消費者也來寫個demo:
package com.example.demo.common.mq.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
成功消費100條消息:
另外,關於rocketMQ的一些基本概念請參考另外一篇: http://www.javashuo.com/article/p-cyjwsvap-hm.html
下次來看看本地僞集羣怎麼搭建: http://www.javashuo.com/article/p-gxrmdamh-hp.html