RocketMq 集羣方式搭建 步驟教學包教包會

mq集羣方式搭建

有段時間沒寫這些技術文章了, 今天抽空寫一點,否則本身都快忘記了 這篇文章記錄了rocketmq 集羣方式搭建的過程, 也是本身半天的成果記錄吧! 感興趣的朋友點個贊在走唄!php

好了,廢話很少,下面開搞。html

本文章參考http://www.javashuo.com/article/p-kbmdwwtl-mh.html 這個博客文章編寫java

準備工做

第一步:關閉要搭建的全部機器的防火牆 第二步:每臺機器執行下以下步驟linux

[root@ma01 ~]# vim /etc/sysconfig/selinux
......
SELINUX=disabled
[root@ma01~]# setenforce 0
[root@ma01~]# getenforce

第三步:全部機器裝好jdk, maven , zip , unzip , ssh 免密登陸apache

配置crt鏈接: https://blog.csdn.net/cmqwan/article/details/61932792
安裝maven參考老哥博客:  https://www.cnblogs.com/clicli/p/5866390.html
安裝zip,unzip參考: http://www.rpmfind.net/linux/rpm2html/search.php?query=zip&submit=Search+...&system=&arch=
安裝ssh參考: https://blog.csdn.net/m0_37590135/article/details/74275859
jdk本身百度哈, 不少參考博客的!

第四步: 以下命令是ssh機器之間copy用的命令vim

scp -r /home/administrator/test/ root@192.168.1.100:/root/

在這裏插入圖片描述 第五步: 下載完成後, 解壓服務器

unzip  rocketmq-all-4.4.0-bin-release.zip

第六步:進入解壓後的文件夾rocketmq-bin4.4.0 , 在文件夾裏面新建logs , data/store, data2/store 目錄 在這裏插入圖片描述 第七步:安裝順序修改bin下面的幾個啓動文件, 因默認配置內存空間太大,本地啓動會報錯ssh

1. vim runbroker.sh    對應地方更改成  -server -Xms512m -Xmx512m -Xmn256m
2. vim runserver.sh (一樣的道理) -server -Xms512m -Xmx512m -Xmn126m -XX:PermSize=128m -XX:MaxPermSize=320m
3. vim tools.sh           -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m

第八步:到rocketmq-bin4.4.0/conf/2m-2s-async 下 修改這四個文件 在這裏插入圖片描述async

注: 這裏說明下哈, 我是用了三臺機器,全部配置了130, 131和132同樣的,大家2臺機器徹底能夠用,131和132配置一臺就能夠了哈,ip自行更改哈。

第九步: 130主機器修改以下配置文件, broker-a.properties broker-b-s.properties兩個文件 內容分別以下 broker-a.propertiesmaven

brokerClusterName=RocketMQCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
##Broker 對外服務的監聽端口
listenPort=10911
#nameserver地址,分號分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
#在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數
defaultTopicQueueNums=4
#是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉
autoCreateTopicEnable=true
#是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉
autoCreateSubscriptionGroup=true
brokerIP1=192.168.175.130
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每一個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每一個文件默認存300W條,根據業務狀況調整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88

broker-b-s.properties

brokerClusterName=RocketMQCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
#nameserver地址,分號分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.130
#在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數
defaultTopicQueueNums=4
#是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉
autoCreateTopicEnable=true
#是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉
autoCreateSubscriptionGroup=true
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog2
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue2
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index2
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint2
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort2
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每一個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每一個文件默認存300W條,根據業務狀況調整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88

第十步: 131, 132 機器只修改 broker-b.properties 和broker-a-s.properties 內容分別以下: broker-b.properties

# limitations under the License.
brokerClusterName=RocketMQCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
#nameserver地址,分號分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.131
#在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數
defaultTopicQueueNums=4
#是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉
autoCreateTopicEnable=true
#是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉
autoCreateSubscriptionGroup=true

storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每一個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每一個文件默認存300W條,根據業務狀況調整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88

broker-a-s.properties

# limitations under the License.
brokerClusterName=RocketMQCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

listenPort=10921
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.131
#在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數
defaultTopicQueueNums=4
#是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉
autoCreateTopicEnable=true
#是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉
autoCreateSubscriptionGroup=true

storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每一個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每一個文件默認存30W條,根據業務狀況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88

第十一步: 啓動

三臺都執行:

nohup sh bin/mqnamesrv > ./logs/namesrvrun.log 2>&1 &

130機器執行: 
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a.log 2>&1 &

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b-s.log 2>&1 &

131, 132 機器執行:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties  -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b.log 2>&1 &

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a-s.log 2>&1 &

執行以後,jps結果,有兩個brokerstartup就好了, 若是報錯的化,看下本身建的logs文件夾日誌 在這裏插入圖片描述

好了,到此rocketmq 基礎配置就搭建起來了,下面在講一講實戰代碼


導入依賴包

<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.4.0</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba.rocketmq</groupId>
			<artifactId>rocketmq-all</artifactId>
			<version>3.5.9</version>
			<type>pom</type>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

mq消息發送方

import java.io.UnsupportedEncodingException;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

/**
 * 消息發送者
 * @author LELE
 *
 */
public class Producer {

	public static void main(String[] args) throws MQClientException, InterruptedException {

		// 聲明並初始化一個producer
		// 須要一個producer group名字做爲構造方法的參數,這裏爲producer1
		DefaultMQProducer producer = new DefaultMQProducer("producer1");
		producer.setVipChannelEnabled(false);
		// 設置NameServer地址,此處應改成實際NameServer地址,多個地址之間用;分隔
		// NameServer的地址必須有
		// producer.setClientIP("xxxx");
		// producer.setInstanceName("Producer");
		producer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876");

		// 調用start()方法啓動一個producer實例
		producer.start();

		// 發送1條消息到Topic爲TopicTest,tag爲TagA,消息內容爲「Hello RocketMQ」拼接上i的值
		try {
			for(int i=0;i<30000;i++) {
				// 封裝消息
				Message msg = new Message("TopicTest", // topic
						"TagA", // tag
						("Hello RocketMQ--------"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
				);
				// 調用producer的send()方法發送消息
				// 這裏調用的是同步的方式,因此會有返回結果
				SendResult sendResult = producer.send(msg);
				// 打印返回結果
				System.out.println(sendResult);
			}
			
		} catch (RemotingException e) {
			e.printStackTrace();
		} catch (MQBrokerException e) {
			e.printStackTrace();
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		// 發送完消息以後,調用shutdown()方法關閉producer
		System.out.println("send success");
		producer.shutdown();
	}

}

消息消費者

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * 消息接收者, 須要服務器啓動mq服務
 * @author LELE
 *
 */
public class Consumer {

	public static void main(String[] args) throws MQClientException {

		// 聲明並初始化一個consumer
		// 須要一個consumer group名字做爲構造方法的參數,這裏爲consumer1
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
		// consumer.setVipChannelEnabled(false);
		// 一樣也要設置NameServer地址
		consumer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876");

		// 這裏設置的是一個consumer的消費策略
		// CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息
		// CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)所有消費一遍
		// CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時之前
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

		// 設置consumer所訂閱的Topic和Tag,*表明所有的Tag
		consumer.subscribe("TopicTest", "*");

		// 設置一個Listener,主要進行消息的邏輯處理
		consumer.registerMessageListener(new MessageListenerConcurrently() {

			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

				MessageExt msg = msgs.get(0);

				if (msg.getTopic().equals("TopicTest")) {

					// 執行TopicTest1的消費邏輯

					System.out.println("TagA:" + new String(msg.getBody()));

				}

				System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()
						+ "----------------------------------------------------------------------------------");
				// 返回消費狀態
				// CONSUME_SUCCESS 消費成功
				// RECONSUME_LATER 消費失敗,須要稍後從新消費
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		// 調用start()方法啓動consumer
		consumer.start();
		System.out.println("Consumer Started.");

	}
}

啓動開始盡情玩耍吧,少年, 記得點贊哦!

相關文章
相關標籤/搜索