RocketMQ主從集羣模式搭建

  • 主從模式環境能夠保障消息的即時性與可靠性
  • 投遞一條消息後,關閉主節點
  • 從節點繼續能夠提供消費者數據進行消費,可是不能接收消息
  • 主節點從新上線後會自動進行消費進度offset的同步

準備兩臺機器,一主一從:java

機器IP hostname 角色
192.168.243.169 rocketmq01 master
192.168.243.170 rocketmq02 slave

我這裏事先在兩臺機器上安裝好了RocketMQ,關於RocketMQ的安裝能夠參考以下文章:apache

接下來,咱們開始搭建RocketMQ主從集羣。首先,配置兩臺機器的hostsvim

$ vim /etc/hosts
192.168.243.169 rocketmq-nameserver1 rocketmq-master1
192.168.243.170 rocketmq-nameserver2 rocketmq-slave1

修改master節點的配置文件:bash

[root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a.properties
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a.properties
#節點所屬的集羣名稱
brokerClusterName=rocketmq-cluster
#broker 名字,注意此處不一樣的配置文件填寫的不同
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer 地址,分號分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在發送消息時,自動建立服務器不存在的 topic,默認建立的隊列數
defaultTopicQueueNums=4
#是否容許 Broker 自動建立 Topic,建議線下開啓,線上關閉
autoCreateTopicEnable=true
#是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認凌晨 4 點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
#commitLog 每一個文件的大小默認 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每一個文件默認存 30W 條,根據業務狀況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/rocketmq-4.7.1/store
#commitLog 存儲路徑
storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/rocketmq-4.7.1/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint
#abort 文件存儲路徑
abortFile=/usr/local/rocketmq-4.7.1/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步複製 Master
#- SYNC_MASTER 同步雙寫 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128

修改slave節點的配置文件:服務器

[root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a-s.properties
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a-s.properties
#節點所屬的集羣名稱
brokerClusterName=rocketmq-cluster
#broker 名字,注意此處不一樣的配置文件填寫的不同
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer 地址,分號分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在發送消息時,自動建立服務器不存在的 topic,默認建立的隊列數
defaultTopicQueueNums=4
#是否容許 Broker 自動建立 Topic,建議線下開啓,線上關閉
autoCreateTopicEnable=true
#是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認凌晨 4 點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
#commitLog 每一個文件的大小默認 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每一個文件默認存 30W 條,根據業務狀況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/rocketmq-4.7.1/store
#commitLog 存儲路徑
storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/rocketmq-4.7.1/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint
#abort 文件存儲路徑
abortFile=/usr/local/rocketmq-4.7.1/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步複製 Master
#- SYNC_MASTER 同步雙寫 Master
#- SLAVE
brokerRole=SLAVE
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128

而後將這兩個配置文件拷貝到Slave節點上:app

[root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a-s.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties

完成配置後,就能夠啓動RocketMQ了,在master節點上執行以下命令:dom

[root@rocketmq01 ~]# nohup sh mqnamesrv &
[root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

在slave節點上執行以下命令:異步

[root@rocketmq02 ~]# nohup sh mqnamesrv &
[root@rocketmq02 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &

啓動完成後,分別在兩個節點上檢查下服務的進程和端口是否正常:async

[root@rocketmq01 ~]# jps
1942 Jps
1739 NamesrvStartup
1775 BrokerStartup
[root@rocketmq01 ~]# netstat -lntp |grep java
tcp6       0      0 :::10909                :::*              LISTEN      1775/java           
tcp6       0      0 :::10911                :::*              LISTEN      1775/java           
tcp6       0      0 :::10912                :::*              LISTEN      1775/java           
tcp6       0      0 :::9876                 :::*              LISTEN      1739/java           
[root@rocketmq01 ~]#

修改RocketMQ的管控臺配置,並啓動:tcp

[root@rocketmq01 ~]# cd /usr/local/src/rocketmq-externals/rocketmq-console/
[root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# vim src/main/resources/application.properties 
# 增長nameserver的地址
rocketmq.config.namesrvAddr=192.168.243.169:9876;192.168.243.170:9876
[root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# java -jar target/rocketmq-console-ng-2.0.0.jar

此時在管控臺中能夠看到有兩個節點了:
RocketMQ主從集羣模式搭建

在Dashboard中也能夠看到有兩個Broker:
RocketMQ主從集羣模式搭建


主從集羣模式下的高可用機制故障演練

建立一個普通的Maven項目,pom文件添加rocketmq-client依賴以下:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

生產者代碼示例:

package com.zj.rocketmq.learn.quickstart;

import com.zj.rocketmq.learn.constant.Constants;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.UUID;

/**
 * rocketmq - 生產者
 *
 * @author 01
 * @date 2020-11-30
 **/
public class Producer {

    public static void main(String[] args) throws Exception {
        // 在rocketmq中生產者必須在一個生產者組內
        String producerGroup = "quickstart_producer_group";
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        // 設置nameserver的地址
        producer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESSES);
        // 啓動生產者
        producer.start();

        // 消息投遞的目標主題
        String topic = "quickstart_topic";
        // 給消息打一個標籤,標籤的主要做用是用來過濾的
        String tag = "quickstart_tag";
        // 給消息設置一個key,是消息的惟一標識
        String key = UUID.randomUUID().toString();
        // 消息體,即具體的消息內容
        String body = "this is quickstart message!";
        Message message = new Message(topic, tag, key, body.getBytes());
        // 發送消息
        SendResult sendResult = producer.send(message);
        System.out.println("消息發送結果:" + sendResult);
        producer.shutdown();
    }
}

常量類代碼:

public class Constants {

    public static final String NAME_SERVER_ADDRESSES = "192.168.243.169:9876;192.168.243.170:9876";
}

運行生產者代碼發送一條消息,控制檯輸出以下:

消息發送結果:SendResult [sendStatus=SEND_OK, msgId=C0A8010B36502437C6DC998FAEE00000, offsetMsgId=C0A8F3A900002A9F0000000000033234, messageQueue=MessageQueue [topic=quickstart_topic, brokerName=broker-a, queueId=1], queueOffset=0]

此時將主節點給停掉,模擬宕機:

[root@rocketmq01 ~]# mqshutdown broker

而後編寫消費者端,代碼以下:

package com.zj.rocketmq.learn.quickstart;

import com.zj.rocketmq.learn.constant.Constants;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * rocketmq - 消費者
 *
 * @author 01
 * @date 2020-11-30
 **/
public class Consumer {

    public static void main(String[] args) throws Exception {
        // 定義消費者組
        String consumerGroup = "quickstart_consumer_group";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        // 設置nameserver的地址
        consumer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESS);
        // 設置從哪一個位置開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        // 從哪一個主題消費數據
        String topic = "quickstart_topic";
        // 用於匹配消息標籤的表達式
        String subExpression = "*";
        // 訂閱主題
        consumer.subscribe(topic, subExpression);

        // 註冊消息監聽器,在監聽器中實現消息的處理邏輯
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.println("------------- 接收到消息,開始進行業務處理 -------------");
            for (MessageExt msg : msgs) {
                try {
                    System.out.printf("topic: %s, tags: %s, keys: %s, body: %s%n",
                            msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody()));

                    if ("0".equals(msg.getKeys())) {
                        throw new RuntimeException("模擬業務處理髮生異常");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    int reconsumeTimes = msg.getReconsumeTimes();
                    System.err.println("reconsumeTimes: " + reconsumeTimes);
                    if (reconsumeTimes == 3) {
                        // TODO 重試次數達到閾值,放棄重試,記錄日誌後續作補償...
                        System.out.println("重試次數達到閾值,放棄重試!");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }

                    // 消息處理失敗時返回,因爲Broker的重試機制,會從新消費該消息
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            // 消息處理成功時返回
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 啓動消費者
        consumer.start();
        System.out.println("consumer started...");
    }
}

運行消費者,正常狀況下,該消費者依舊可以消費到數據:
RocketMQ主從集羣模式搭建

從新啓動master節點,讓其從新加入集羣:

[root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

在此過程注意查看消費者的控制檯,正常狀況下,master從新加入集羣,消費者也不會重複消費,由於master會和slave同步offset進度。

相關文章
相關標籤/搜索