- 主從模式環境能夠保障消息的即時性與可靠性
- 投遞一條消息後,關閉主節點
- 從節點繼續能夠提供消費者數據進行消費,可是不能接收消息
- 主節點從新上線後會自動進行消費進度offset的同步
準備兩臺機器,一主一從:java
機器IP | hostname | 角色 |
---|---|---|
192.168.243.169 | rocketmq01 | master |
192.168.243.170 | rocketmq02 | slave |
我這裏事先在兩臺機器上安裝好了RocketMQ,關於RocketMQ的安裝能夠參考以下文章:apache
接下來,咱們開始搭建RocketMQ主從集羣。首先,配置兩臺機器的hosts
:vim
$ vim /etc/hosts 192.168.243.169 rocketmq-nameserver1 rocketmq-master1 192.168.243.170 rocketmq-nameserver2 rocketmq-slave1
修改master節點的配置文件:bash
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a.properties [root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a.properties #節點所屬的集羣名稱 brokerClusterName=rocketmq-cluster #broker 名字,注意此處不一樣的配置文件填寫的不同 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer 地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在發送消息時,自動建立服務器不存在的 topic,默認建立的隊列數 defaultTopicQueueNums=4 #是否容許 Broker 自動建立 Topic,建議線下開啓,線上關閉 autoCreateTopicEnable=true #是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉 autoCreateSubscriptionGroup=true #Broker 對外服務的監聽端口 listenPort=10911 #刪除文件時間點,默認凌晨 4 點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 #commitLog 每一個文件的大小默認 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue 每一個文件默認存 30W 條,根據業務狀況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 #存儲路徑 storePathRootDir=/usr/local/rocketmq-4.7.1/store #commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue #消息索引存儲路徑 storePathIndex=/usr/local/rocketmq-4.7.1/store/index #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint #abort 文件存儲路徑 abortFile=/usr/local/rocketmq-4.7.1/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步複製 Master #- SYNC_MASTER 同步雙寫 Master #- SLAVE brokerRole=ASYNC_MASTER #刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #發消息線程池數量 #sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128
修改slave節點的配置文件:服務器
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a-s.properties [root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a-s.properties #節點所屬的集羣名稱 brokerClusterName=rocketmq-cluster #broker 名字,注意此處不一樣的配置文件填寫的不同 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=1 #nameServer 地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在發送消息時,自動建立服務器不存在的 topic,默認建立的隊列數 defaultTopicQueueNums=4 #是否容許 Broker 自動建立 Topic,建議線下開啓,線上關閉 autoCreateTopicEnable=true #是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉 autoCreateSubscriptionGroup=true #Broker 對外服務的監聽端口 listenPort=10911 #刪除文件時間點,默認凌晨 4 點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 #commitLog 每一個文件的大小默認 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue 每一個文件默認存 30W 條,根據業務狀況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 #存儲路徑 storePathRootDir=/usr/local/rocketmq-4.7.1/store #commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue #消息索引存儲路徑 storePathIndex=/usr/local/rocketmq-4.7.1/store/index #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint #abort 文件存儲路徑 abortFile=/usr/local/rocketmq-4.7.1/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步複製 Master #- SYNC_MASTER 同步雙寫 Master #- SLAVE brokerRole=SLAVE #刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #發消息線程池數量 #sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128
而後將這兩個配置文件拷貝到Slave節點上:app
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties [root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a-s.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties
完成配置後,就能夠啓動RocketMQ了,在master節點上執行以下命令:dom
[root@rocketmq01 ~]# nohup sh mqnamesrv & [root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
在slave節點上執行以下命令:異步
[root@rocketmq02 ~]# nohup sh mqnamesrv & [root@rocketmq02 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
啓動完成後,分別在兩個節點上檢查下服務的進程和端口是否正常:async
[root@rocketmq01 ~]# jps 1942 Jps 1739 NamesrvStartup 1775 BrokerStartup [root@rocketmq01 ~]# netstat -lntp |grep java tcp6 0 0 :::10909 :::* LISTEN 1775/java tcp6 0 0 :::10911 :::* LISTEN 1775/java tcp6 0 0 :::10912 :::* LISTEN 1775/java tcp6 0 0 :::9876 :::* LISTEN 1739/java [root@rocketmq01 ~]#
修改RocketMQ的管控臺配置,並啓動:tcp
[root@rocketmq01 ~]# cd /usr/local/src/rocketmq-externals/rocketmq-console/ [root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# vim src/main/resources/application.properties # 增長nameserver的地址 rocketmq.config.namesrvAddr=192.168.243.169:9876;192.168.243.170:9876 [root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# java -jar target/rocketmq-console-ng-2.0.0.jar
此時在管控臺中能夠看到有兩個節點了:
在Dashboard中也能夠看到有兩個Broker:
主從集羣模式下的高可用機制故障演練
建立一個普通的Maven項目,pom
文件添加rocketmq-client
依賴以下:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
生產者代碼示例:
package com.zj.rocketmq.learn.quickstart; import com.zj.rocketmq.learn.constant.Constants; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.util.UUID; /** * rocketmq - 生產者 * * @author 01 * @date 2020-11-30 **/ public class Producer { public static void main(String[] args) throws Exception { // 在rocketmq中生產者必須在一個生產者組內 String producerGroup = "quickstart_producer_group"; DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 設置nameserver的地址 producer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESSES); // 啓動生產者 producer.start(); // 消息投遞的目標主題 String topic = "quickstart_topic"; // 給消息打一個標籤,標籤的主要做用是用來過濾的 String tag = "quickstart_tag"; // 給消息設置一個key,是消息的惟一標識 String key = UUID.randomUUID().toString(); // 消息體,即具體的消息內容 String body = "this is quickstart message!"; Message message = new Message(topic, tag, key, body.getBytes()); // 發送消息 SendResult sendResult = producer.send(message); System.out.println("消息發送結果:" + sendResult); producer.shutdown(); } }
常量類代碼:
public class Constants { public static final String NAME_SERVER_ADDRESSES = "192.168.243.169:9876;192.168.243.170:9876"; }
運行生產者代碼發送一條消息,控制檯輸出以下:
消息發送結果:SendResult [sendStatus=SEND_OK, msgId=C0A8010B36502437C6DC998FAEE00000, offsetMsgId=C0A8F3A900002A9F0000000000033234, messageQueue=MessageQueue [topic=quickstart_topic, brokerName=broker-a, queueId=1], queueOffset=0]
此時將主節點給停掉,模擬宕機:
[root@rocketmq01 ~]# mqshutdown broker
而後編寫消費者端,代碼以下:
package com.zj.rocketmq.learn.quickstart; import com.zj.rocketmq.learn.constant.Constants; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; /** * rocketmq - 消費者 * * @author 01 * @date 2020-11-30 **/ public class Consumer { public static void main(String[] args) throws Exception { // 定義消費者組 String consumerGroup = "quickstart_consumer_group"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); // 設置nameserver的地址 consumer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESS); // 設置從哪一個位置開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 從哪一個主題消費數據 String topic = "quickstart_topic"; // 用於匹配消息標籤的表達式 String subExpression = "*"; // 訂閱主題 consumer.subscribe(topic, subExpression); // 註冊消息監聽器,在監聽器中實現消息的處理邏輯 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { System.out.println("------------- 接收到消息,開始進行業務處理 -------------"); for (MessageExt msg : msgs) { try { System.out.printf("topic: %s, tags: %s, keys: %s, body: %s%n", msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody())); if ("0".equals(msg.getKeys())) { throw new RuntimeException("模擬業務處理髮生異常"); } } catch (Exception e) { e.printStackTrace(); int reconsumeTimes = msg.getReconsumeTimes(); System.err.println("reconsumeTimes: " + reconsumeTimes); if (reconsumeTimes == 3) { // TODO 重試次數達到閾值,放棄重試,記錄日誌後續作補償... System.out.println("重試次數達到閾值,放棄重試!"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // 消息處理失敗時返回,因爲Broker的重試機制,會從新消費該消息 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // 消息處理成功時返回 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 啓動消費者 consumer.start(); System.out.println("consumer started..."); } }
運行消費者,正常狀況下,該消費者依舊可以消費到數據:
從新啓動master節點,讓其從新加入集羣:
[root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
在此過程注意查看消費者的控制檯,正常狀況下,master從新加入集羣,消費者也不會重複消費,由於master會和slave同步offset進度。