rocketmq安裝部署過程(4.0.0版本)

  • 準備工做

 

  • 3個虛擬機節點的構成以下

 

  • 安裝步驟

 

  • 操做過程

一、安裝包已經上傳至其中1個節點。java

 

二、解壓縮安裝包git

命令:unzip rocketmq-all-4.0.0-incubating-bin-release.zipgithub

解壓縮以後以下:apache

 

三、 我這裏將解壓縮以後的文件夾移動了位置,並修改了名字,以便後續操做。dom

命令: mv /home/hadmin/software/apache-rocketmq-all/ /home/hadmin/rocketmqide

移動以後路徑以下:post

 

 四、修改配置文件測試

我這裏已經將配置文件提早準備好了,只呈現如下配置文件的結果。ui

■節點1(192.168.6.3)配置文件:spa

《broker-a-m.properties》

複製代碼

brokerClusterName=post
brokerName=broker-a
namesrvAddr=192.168.6.3:9876;192.168.6.4:9876
brokerId=0
listenPort=10911
brokerIP1=192.168.6.3
deleteWhen=04
fileReservedTime=72
brokerRole=ASYNC_MASTER
storePathRootDir=/home/hadmin/data/rocketmq/rootdir
storePathCommitLog=/home/hadmin/data/rocketmq/commitlog
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

複製代碼

《broker-c-s.properties》 

複製代碼

brokerClusterName=post
brokerName=broker-c
namesrvAddr=192.168.6.3:9876;192.168.6.4:9876
brokerId=1
listenPort=10920
brokerIP1=192.168.6.3
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
storePathRootDir=/home/hadmin/data/rocketmq/rootdir
storePathCommitLog=/home/hadmin/data/rocketmq/commitlog
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

複製代碼

 

■節點2(192.168.6.4)配置文件:

《broker-a-s.properties》

複製代碼

brokerClusterName=post
brokerName=broker-a
namesrvAddr=192.168.6.3:9876;192.168.6.4:9876
brokerId=1
listenPort=10920
deleteWhen=04
brokerIP1=192.168.6.4
fileReservedTime=72
brokerRole=SLAVE
storePathRootDir=/home/hadmin/data/rocketmq/rootdir
storePathCommitLog=/home/hadmin/data/rocketmq/commitlog
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

複製代碼

 

《broker-b-m.properties》

複製代碼

brokerClusterName=post
brokerName=broker-b
namesrvAddr=192.168.6.3:9876;192.168.6.4:9876
brokerId=0
listenPort=10911
brokerIP1=192.168.6.4
deleteWhen=04
fileReservedTime=72
brokerRole=ASYNC_MASTER
storePathRootDir=/home/hadmin/data/rocketmq/rootdir
storePathCommitLog=/home/hadmin/data/rocketmq/commitlog
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

複製代碼

 

■節點3(192.168.6.5)配置文件:

《broker-b-s.properties》 

複製代碼

brokerClusterName=post
brokerName=broker-b
namesrvAddr=192.168.6.3:9876;192.168.6.4:9876
brokerId=1
listenPort=10920
brokerIP1=192.168.6.5
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
storePathRootDir=/home/hadmin/data/rocketmq/rootdir
storePathCommitLog=/home/hadmin/data/rocketmq/commitlog
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

複製代碼

 

《broker-c-m.properties》 

複製代碼

brokerClusterName=post
brokerName=broker-c
namesrvAddr=192.168.6.3:9876;192.168.6.4:9876
brokerId=0
listenPort=10911
brokerIP1=192.168.6.5
deleteWhen=04
fileReservedTime=72
brokerRole=ASYNC_MASTER
storePathRootDir=/home/hadmin/data/rocketmq/rootdir
storePathCommitLog=/home/hadmin/data/rocketmq/commitlog
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

複製代碼

 

五、啓動nameserver

啓動命令:nohup sh /home/hadmin/rocketmq/bin/mqnamesrv >/home/hadmin/rocketmq/logs/mqnamesrv.log 2>&1 &

注意:我這裏將啓動日誌重定義到了logs路徑下,須要提早手動建立logs文件夾,以便於統一管理日誌,方便查看。

建立文件夾命令:mkdir /home/hadmin/rocketmq/logs

下圖中展現了啓動命令,日誌中能夠看到NameServer成功啓動的日誌。

■NameServer - 節點1

■NameServer = 節點2

 

六、啓動Broker-a(Master位於節點一、Slave位於節點2)

broker-a分爲Master和Slave,分別位於節點1和節點2上,須要分別啓動。

注意:須要根據啓動角色,爲broker指定一個配置文件。

■broker-a的master - 節點1

命令: nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-a-m.properties >/home/hadmin/rocketmq/logs/broker-a-m.log 2>&1 &

 

■broker-a的slave - 節點2

命令:nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-a-s.properties >/home/hadmin/rocketmq/logs/broker-a-s.log 2>&1 &

■驗證broker-a:

broker-a啓動結束,這時候可使用命令查看一下rocketmq集羣狀態。

命令:sh /home/hadmin/rocketmq/bin/mqadmin clusterList -n 192.168.6.3:9876

 

七、啓動broker-b(Master位於節點2,、Slave位於節點3)

■broker-b的Master - 節點2

命令:nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-b-m.properties >/home/hadmin/rocketmq/logs/broker-b-m.log 2>&1 &

■broker-b的Slave - 節點3

命令:nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-b-s.properties >/home/hadmin/rocketmq/logs/broker-b-s.log 2>&1 &

■驗證broker-b

命令:sh /home/hadmin/rocketmq/bin/mqadmin clusterList -n 192.168.6.3:9876

 

八、 啓動broker-b(Master位於節點3,、Slave位於節點1)

■broker-c的Master - 節點3

命令:nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-c-m.properties >/home/hadmin/rocketmq/logs/broker-c-m.log 2>&1 &

 

■broker-c的Slave - 節點1

命令:nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-c-s.properties >/home/hadmin/rocketmq/logs/broker-c-s.log 2>&1 &

 

■驗證broker-c

命令:sh /home/hadmin/rocketmq/bin/mqadmin clusterList -n 192.168.6.3:9876

 

  • 問題1:

啓動broker的時候提示內存不夠的錯誤。 

解決方法:因爲我的電腦配置不夠,沒法爲虛擬機申請更大的內存。因此,採用修改broker啓動內存的方式解決了。

修改文件路徑:{ROCKET_HOME}/bin/runbroker.sh

修改前:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

修改後:JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"

如圖所示:

從新啓動broker,日誌中沒有錯誤,jps進程中也能夠看到broker正常啓動。

 

  • 問題2:

在同一臺機器上啓動多個broker的時候提示以下錯誤。

問題緣由:

按照本文最開始的額圖片所示,同一臺機器上會存在兩個Broker,若是不進行特殊指定,broker的默認端口是10911。

因此一臺機器上啓動兩個broker時,第二個broker就會出現端口被佔用的錯誤。

解決辦法:

修改rocketmq的配置文件,增長listenPort配置。配置以後以下所示:

→同一臺機器的配置文件

→broker-a-m.properties

→broker-c-s.properties

 

使用jps查看一下進程是否有問題

在查看一下啓動日誌是否有問題

 

最後使用clusterList命令來驗證一下集羣健康狀態。

命令:sh bin/mqadmin clusterList -n 192.168.6.3:9876

  成功實現了,如本文最開始圖片所示的安裝部署。

 

  • 使用程序測試

編寫了java程序代碼嘗試向集羣中生產消息,程序代碼以下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

package post;

 

import org.apache.rocketmq.client.exception.MQBrokerException;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.exception.RemotingException;

 

import java.util.UUID;

 

public class ProducerTest {

    private static DefaultMQProducer producer = null;

 

    public static void main(String[] args) {

        System.out.print("[----------]Start");

        boolean result = false;

        try {

            ProducerStart();

            SendMessage("qch_20170706", "hello rocketmq!");

        }finally {

            producer.shutdown();

        }

        System.out.print("[----------]Succeed");

    }

 

    private static boolean ProducerStart() {

        producer = new DefaultMQProducer("pro_qch_test");

        producer.setNamesrvAddr("192.168.6.3:9876;192.168.6.4:9876");

        producer.setInstanceName(UUID.randomUUID().toString());

        producer.setVipChannelEnabled(false);

        try {

            producer.start();

        } catch (MQClientException e) {

            e.printStackTrace();

            return false;

        }

        return true;

    }

 

    private static boolean SendMessage(String topic, String str) {

        Message msg = new Message(topic, str.getBytes());

        try {

            producer.send(msg);

        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {

            e.printStackTrace();

            return false;

        }

        return true;

    }

}

運行以後,日誌中提示下面的錯誤:

 

問題緣由:

由於broker部署在虛擬機,而且虛擬雙網卡,client沒法正常鏈接服務端。

 

解決方法:

能夠在broker的配置文件中配置brokerIP1(本機IP)屬性。

 

修改後配置文件以下圖所示:

 

修改以後,從新啓動rocketmq集羣,運行生產者程序,確認正常結束。

 

而後,有嘗試這編寫消費者代碼,驗證是否能夠正常消費。結果正常,這裏貼一下代碼及結果日誌。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

package post;

 

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.message.MessageExt;

 

import java.util.List;

import java.util.UUID;

 

public class ConsumerTest {

    public static void main(String[] args) {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_qch_test");

        consumer.setInstanceName(UUID.randomUUID().toString());

        consumer.setConsumeMessageBatchMaxSize(32);

        consumer.setNamesrvAddr("192.168.6.3:9876;192.168.6.4:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override

            public ConsumeConcurrentlyStatus consumeMessage(

                    List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                for(MessageExt me : list) {

                    System.out.print(new String(me.getBody()));

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }

        });

        try {

            consumer.subscribe("qch_20170706", "*");

            consumer.start();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

 

樣例代碼上傳到了git上了,有須要的能夠去參考。

https://github.com/quchunhui/rocketmq_sample

相關文章
相關標籤/搜索