rocketMQ 本地環境搭建

  最近在本身本地搭建rocketMQ,過程當中遇到一些問題,如今總結一下,便於之後查看.html

  首先打開rocket官網:http://rocketmq.apache.org, 點擊latest realease,有source版本和binary版本供下載,點擊quikstart能夠看到source版本的構建步驟:java

      source版本須要先安裝瞭如下軟件:apache

  1. 建議使用64位操做系統,Linux / Unix / Mac;
  2. 64位JDK 1.8+;
  3. Maven 3.2.x;
  4. Git;
  5. 4g +免費磁盤用於Broker服務器

  

  我係統環境是OS 10.13.6,下載的是binary版本,省去了構建成爲二進制文件的步驟.bash

將rocketmq-all-4.4.0-bin-release.zip文件解壓後獲得rocketmq-all-4.4.0-bin-release文件夾,服務器

配置環境變量ROCKETMQ_HOME=xxx/rocketmq-all-4.4.0-bin-release,其中xxx表示你的文件夾父路徑,異步

例如/Users/david/Downloads/packages/rocketmq-all-4.4.0-bin-release.async

再將ROCKETMQ_HOME加入到PATH中如 $ROCKETMQ_HOME/bin:$PATHmaven

 

打開teiminal,輸入sh mqnamesrv啓動名稱服務器,以下圖:ide

能夠看到Java HotSpot(TM) 64-Bit Server VM warning「警告信息,不要怕,工具

它是說CMS垃圾收集器已通過時,在將來的JVM版本中會被其餘收集器替代,能夠不用管他.

namesrv已經正常啓動了

一樣,再開一個terminal,輸入sh mqbroker 啓動broker.

若是在啓動過程當中遇到:

ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

 

不要慌,這是說你的JAVA_HOME環境變量設置有問題,請檢查環境變量.

若是檢查後發現環境變量沒問題,但仍是報以上錯誤,這多是系統問題吧,

記得OS系統有好幾個文件能夠配置環境變量,最多見的是用戶目錄下的.bash_profile,對當前用戶生效,還有一個全局的,在etc目錄下的profile文件

能夠再確認一下,若是兩個文件都配置了依然報錯,那隻好用最戳的方式解決:

打開mq的安裝目錄,找到bin文件夾下的runserver.sh和runbroker.sh,

在文件中主動加入你的JAVA_HOME,以下圖:

一切順利解決,至此namesrv和broker都已經成功啓動,下面來寫幾個Javademo

首先加入maven依賴包:

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>

使用GRADLE構建的話加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'


使用GRADLE構建的話加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

使用RocketMQ有如下3種發送消息的方式,分別適用不一樣場景:

1.同步發送消息:可靠的同步傳輸用於普遍的場景,如重要的通知消息,短信通知,短信營銷系統等。

2.異步發送消息:異步傳輸一般用於響應時間敏感的業務場景。

3.以單向模式發送消息:單向傳輸用於須要中等可靠性的狀況,例如日誌收集。

咱們選第一種發送方式寫一個demo:

package com.example.demo.common.mq.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

經過循環發送100條消息,運行代碼,結果拋出以下錯誤:

該錯誤表示找不到這個topic的路由,關於這個錯誤,網上也有一些對策,無外乎如下幾個緣由:

1.broker沒有連上namesrv

2.producer沒有連上namesrv

3.namesrv沒有建立並維護該topic信息

4.netty包版本衝突

5.防火牆問題

前面3條能夠經過查找日誌肯定問題,

  1).broker.log裏面若是查到register broker to name server localhost:9876 OK 這樣的信息表示broker已經連上namesrv了.

若是沒有,那麼你能夠從新啓動broker: sh mqbroker -n localhost:9876

  2).namesrv.log裏面若是可以查到 new topic registered, TopicTest QueueData這樣的信息,表示你的topic也已經被成功建立.

若是沒有,那麼你能夠經過mq的admin工具主動生成該topic: sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t TopicTest

  3).producer有沒有連上namesrv,只需檢查一下代碼中namesrv的值對不對就好了

  4).至於4和5那就須要你本身去跟蹤源碼發現問題了.

 

如今再來運行生產者代碼,成功發送100條消息:

 

 消費者也來寫個demo:

package com.example.demo.common.mq.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
public static void main(String[] args) throws MQClientException {

// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");

// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

});

//Launch the consumer instance.
consumer.start();

System.out.printf("Consumer Started.%n");
}
}

 成功消費100條消息:

 

 另外,關於rocketMQ的一些基本概念請參考另外一篇: http://www.javashuo.com/article/p-cyjwsvap-hm.html

 下次來看看本地僞集羣怎麼搭建: http://www.javashuo.com/article/p-gxrmdamh-hp.html

相關文章
相關標籤/搜索