有段時間沒寫這些技術文章了, 今天抽空寫一點,否則本身都快忘記了 這篇文章記錄了rocketmq 集羣方式搭建的過程, 也是本身半天的成果記錄吧! 感興趣的朋友點個贊在走唄!php
好了,廢話很少,下面開搞。html
本文章參考http://www.javashuo.com/article/p-kbmdwwtl-mh.html 這個博客文章編寫java
第一步:關閉要搭建的全部機器的防火牆 第二步:每臺機器執行下以下步驟linux
[root@ma01 ~]# vim /etc/sysconfig/selinux ...... SELINUX=disabled [root@ma01~]# setenforce 0 [root@ma01~]# getenforce
第三步:全部機器裝好jdk, maven , zip , unzip , ssh 免密登陸apache
配置crt鏈接: https://blog.csdn.net/cmqwan/article/details/61932792 安裝maven參考老哥博客: https://www.cnblogs.com/clicli/p/5866390.html 安裝zip,unzip參考: http://www.rpmfind.net/linux/rpm2html/search.php?query=zip&submit=Search+...&system=&arch= 安裝ssh參考: https://blog.csdn.net/m0_37590135/article/details/74275859 jdk本身百度哈, 不少參考博客的!
第四步: 以下命令是ssh機器之間copy用的命令vim
scp -r /home/administrator/test/ root@192.168.1.100:/root/
第五步: 下載完成後, 解壓服務器
unzip rocketmq-all-4.4.0-bin-release.zip
第六步:進入解壓後的文件夾rocketmq-bin4.4.0 , 在文件夾裏面新建logs , data/store, data2/store 目錄 第七步:安裝順序修改bin下面的幾個啓動文件, 因默認配置內存空間太大,本地啓動會報錯ssh
1. vim runbroker.sh 對應地方更改成 -server -Xms512m -Xmx512m -Xmn256m 2. vim runserver.sh (一樣的道理) -server -Xms512m -Xmx512m -Xmn126m -XX:PermSize=128m -XX:MaxPermSize=320m 3. vim tools.sh -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m
第八步:到rocketmq-bin4.4.0/conf/2m-2s-async 下 修改這四個文件 async
第九步: 130主機器修改以下配置文件, broker-a.properties broker-b-s.properties兩個文件 內容分別以下 broker-a.propertiesmaven
brokerClusterName=RocketMQCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH ##Broker 對外服務的監聽端口 listenPort=10911 #nameserver地址,分號分割 namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876 #在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數 defaultTopicQueueNums=4 #是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉 autoCreateTopicEnable=true #是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉 autoCreateSubscriptionGroup=true brokerIP1=192.168.175.130 storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog # 消費隊列存儲路徑存儲路徑 storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue #消息索引存儲路徑 storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index #checkpoint 文件存儲路徑 storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint #abort 文件存儲路徑 abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 # commitLog每一個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每一個文件默認存300W條,根據業務狀況調整 mapedFileSizeConsumeQueue=3000000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 #diskMaxUsedSpaceRatio=88
broker-b-s.properties
brokerClusterName=RocketMQCluster brokerName=broker-b brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=10921 #nameserver地址,分號分割 namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876 brokerIP1=192.168.175.130 #在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數 defaultTopicQueueNums=4 #是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉 autoCreateTopicEnable=true #是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉 autoCreateSubscriptionGroup=true storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog2 # 消費隊列存儲路徑存儲路徑 storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue2 #消息索引存儲路徑 storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index2 #checkpoint 文件存儲路徑 storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint2 #abort 文件存儲路徑 abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort2 #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 # commitLog每一個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每一個文件默認存300W條,根據業務狀況調整 mapedFileSizeConsumeQueue=3000000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 #diskMaxUsedSpaceRatio=88
第十步: 131, 132 機器只修改 broker-b.properties 和broker-a-s.properties 內容分別以下: broker-b.properties
# limitations under the License. brokerClusterName=RocketMQCluster brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10911 #nameserver地址,分號分割 namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876 brokerIP1=192.168.175.131 #在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數 defaultTopicQueueNums=4 #是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉 autoCreateTopicEnable=true #是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉 autoCreateSubscriptionGroup=true storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog # 消費隊列存儲路徑存儲路徑 storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue #消息索引存儲路徑 storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index #checkpoint 文件存儲路徑 storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint #abort 文件存儲路徑 abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 # commitLog每一個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每一個文件默認存300W條,根據業務狀況調整 mapedFileSizeConsumeQueue=3000000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 #diskMaxUsedSpaceRatio=88
broker-a-s.properties
# limitations under the License. brokerClusterName=RocketMQCluster brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=10921 namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876 brokerIP1=192.168.175.131 #在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數 defaultTopicQueueNums=4 #是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉 autoCreateTopicEnable=true #是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉 autoCreateSubscriptionGroup=true storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog # 消費隊列存儲路徑存儲路徑 storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue #消息索引存儲路徑 storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index #checkpoint 文件存儲路徑 storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint #abort 文件存儲路徑 abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 # commitLog每一個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每一個文件默認存30W條,根據業務狀況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 #diskMaxUsedSpaceRatio=88
第十一步: 啓動
三臺都執行: nohup sh bin/mqnamesrv > ./logs/namesrvrun.log 2>&1 & 130機器執行: nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a.log 2>&1 & nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b-s.log 2>&1 & 131, 132 機器執行: nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b.log 2>&1 & nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a-s.log 2>&1 &
執行以後,jps結果,有兩個brokerstartup就好了, 若是報錯的化,看下本身建的logs文件夾日誌
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.5.9</version> <type>pom</type> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> </dependencies>
import java.io.UnsupportedEncodingException; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; /** * 消息發送者 * @author LELE * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { // 聲明並初始化一個producer // 須要一個producer group名字做爲構造方法的參數,這裏爲producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); producer.setVipChannelEnabled(false); // 設置NameServer地址,此處應改成實際NameServer地址,多個地址之間用;分隔 // NameServer的地址必須有 // producer.setClientIP("xxxx"); // producer.setInstanceName("Producer"); producer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876"); // 調用start()方法啓動一個producer實例 producer.start(); // 發送1條消息到Topic爲TopicTest,tag爲TagA,消息內容爲「Hello RocketMQ」拼接上i的值 try { for(int i=0;i<30000;i++) { // 封裝消息 Message msg = new Message("TopicTest", // topic "TagA", // tag ("Hello RocketMQ--------"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); // 調用producer的send()方法發送消息 // 這裏調用的是同步的方式,因此會有返回結果 SendResult sendResult = producer.send(msg); // 打印返回結果 System.out.println(sendResult); } } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } // 發送完消息以後,調用shutdown()方法關閉producer System.out.println("send success"); producer.shutdown(); } }
import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; /** * 消息接收者, 須要服務器啓動mq服務 * @author LELE * */ public class Consumer { public static void main(String[] args) throws MQClientException { // 聲明並初始化一個consumer // 須要一個consumer group名字做爲構造方法的參數,這裏爲consumer1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1"); // consumer.setVipChannelEnabled(false); // 一樣也要設置NameServer地址 consumer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876"); // 這裏設置的是一個consumer的消費策略 // CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息 // CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)所有消費一遍 // CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時之前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 設置consumer所訂閱的Topic和Tag,*表明所有的Tag consumer.subscribe("TopicTest", "*"); // 設置一個Listener,主要進行消息的邏輯處理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest")) { // 執行TopicTest1的消費邏輯 System.out.println("TagA:" + new String(msg.getBody())); } System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size() + "----------------------------------------------------------------------------------"); // 返回消費狀態 // CONSUME_SUCCESS 消費成功 // RECONSUME_LATER 消費失敗,須要稍後從新消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 調用start()方法啓動consumer consumer.start(); System.out.println("Consumer Started."); } }
啓動開始盡情玩耍吧,少年, 記得點贊哦!