繼我上一篇博客後 分佈式消息隊列RocketMQ學習教程① 上一篇博客最主要介紹了幾種經常使用的MQ,因此本博客再簡單介紹一下RocketMQ的原理和簡單的例子,基於Java實現,但願能夠幫助學習者html
「工於利其事,必先利其器」,因此咱們首先須要搭建好RocketMQ, 考慮到學習者不必定有Linux系統的服務器,因此本博客介紹一下Linux和Window系統的兩種安裝方法,以補充上一篇博客java
由於阿里已經將RocketMQ捐給Apache了,因此如今咱們須要去Apache官網下載 RocketMQ官網linux
注意RocketMQ是基於Java開發的,因此安裝前必須安裝JDK,Linux JDK安裝的能夠看分佈式消息隊列RocketMQ學習教程① 下載文件解壓後,能夠看到conf文件夾裏有2m-noslave、2m-2s-async、2m-2s-sync文件夾git
2m-noslave 兩主,無從的配置github
2m-2s-async 兩主,兩從,同步複製數據的配置redis
2m-2s-sync 兩主,兩從,異步複製數據的配置apache
咱們找到2m-noslave的broker-a.properties文件,修改完善配置 broker-a.properties編程
#所屬集羣名字 brokerClusterName=DefaultCluster #broker名字,注意此處不一樣的配置文件填寫的不同 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,分號分割 namesrvAddr=127.0.0.1:9876 #關鍵 brokerIP1=127.0.0.1 #在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數 defaultTopicQueueNums=4 #是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉 autoCreateTopicEnable=true #是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉 autoCreateSubscriptionGroup=true #Broker 對外服務的監聽端口 listenPort=10911 #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=48 #commitLog每一個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每一個文件默認存30W條,根據業務狀況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 #這裏是個人 日誌配置 #存儲路徑 storePathRootDir=/usr/local/rocketmq/store #commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步複製Master #- SYNC_MASTER 同步雙寫Master #- SLAVE brokerRole=ASYNC_MASTER #刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #發消息線程池數量 #sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128
先介紹一下linux系統的 通常將壓縮文件解壓到/usr/localvim
cd /usr/local tar -xzf apache-rocketmq.tar.gz mv apache-rocketmq rocketmq mkdir /usr/rocketmq/logs
環境變量配置api
vim /etc/profile
修改以下配置
export JAVA_HOME=/usr/java/jdk1.8.0_102 export ROCKETMQ_HOME=/usr/local/rocketmq export PATH=$PATH:$JAVA_HOME/bin:/usr/local/src/redis-3.2.8/bin:$ROCKETMQ_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib
啓動mqnamesrv
cd /usr/local/rocketmq/bin nohup sh /usr/local/rocketmq/bin/mqnamesrv >/usr/local/rocketmq/logs/mqnamesrv.log 2>&1 &
啓動Broker
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties > /usr/local/rocketmq/logs/mqbroker.log 2>&1 &
要設置自動建立Topic,須要加上 autoCreateTopicEnable=true
關閉Broker服務 sh mqshutdown broker
啓動成功能夠用jps查看
一、下載RocketMQ後,解壓到D:\alibaba-rocketmq
二、在D:\alibaba-rocketmq,Ctrl+Shift,右鍵,打開dom界面,輸入以下命令行 start /b bin/mqnamesrv.exe >D:\alibaba-rocketmq\logs\mqnamesrv.log 查看nameserver是否啓動 jps -v
三、啓動Broker
start /b bin/mqbroker.exe -n "127.0.0.1:9876" autoCreateTopicEnable=true >D:\alibaba-rocketmq\logs\mqbroker.log
Caused by: com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, huang_1 See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist for further details. at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:525) ~[rocketmq-client-3.5.3.jar:na] at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1011) ~[rocketmq-client-3.5.3.jar:na] at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:970) ~[rocketmq-client-3.5.3.jar:na] at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90) ~[rocketmq-client-3.5.3.jar:na] at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:107) ~[ons-client-1.2.3.jar:na]
出現以上異常啓動時添加autoCreateTopicEnable=true
四、查看topic命令:mqadmin topicList -n "127.0.0.1:9876"
cd 到bin目錄,執行下面命令 mqadmin updateTopic -t test_1 -b "127.0.0.1:10911" -n "127.0.0.1:9876" 添加以下參數到eclipse啓動工程的VM參數裏 -Drocketmq.namesrv.addr=127.0.0.1:9876
須要去github下載,下載連接 rocketmq-console
下載後在rocketmq-console文件夾裏,ctrl+shift,右鍵,在此處打開命令窗口,打開cmd窗口,主要要先搭建好maven環境
mvn clean package -Dmaven.test.skip=true
打包完成以後,咱們去target文件夾找到rocketmq-console-ng-1.0.0.jar 而後
mkdir rocketmq-console cd /usr/local/rocketmq-console
使用xftp上傳rocketmq-console-ng-1.0.0.jar到/usr/local/rocketmq-console
nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=127.0.0.1:9876 >/usr/local/rocketmq-console/run.log 2>&1 &
端口檢查
netstat -anp|grep 12581
部署成功,打開http://服務器IP:12581
maven加入配置
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.0.10</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.0.10</version> <type>pom</type> </dependency>
消息隊列消費者消費消息實例
package com.mq.test; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; public class MQConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "mq-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("TopicA-test", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("RocketMQ Consumer Started..."); } }
消息隊列生產者產生消息實例
package com.mq.test; import java.util.Date; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class MQProducer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("mq-group"); // producer.setNamesrvAddr("123.207.63.192:9876"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i = 0; i < 10; i++) { Thread.sleep(1000); //MQ每隔一秒發送一條消息 Message msg = new Message("TopicA-test",// topic "TagA",// tag ("RocketMQ message"+i) .getBytes()// body ); SendResult sendResult = producer.send(msg);//發送消息 } } catch (Exception e) { e.printStackTrace(); } producer.shutdown();//關閉消息生產者 } }
下面是來自github wiki的學習例子
Filter網絡架構,以CPU資源換取寶貴的網卡流量資源
啓動Broker時,增長如下配置,能夠自動加載Filter Server進程
filterServerNums=1
Filter樣本(Consumer僅負責將代碼上傳到Filter Server,由Filter Server編譯後執行)
package com.alibaba.rocketmq.example.filter; import com.alibaba.rocketmq.common.filter.MessageFilter; import com.alibaba.rocketmq.common.message.MessageExt; public class MessageFilterImpl implements MessageFilter { @Override public boolean match(MessageExt msg) { String property = msg.getUserProperty("SequenceId"); if (property != null) { int id = Integer.parseInt(property); if ((id % 3) == 0 && (id > 10)) { return true; } } return false; } }
Consumer例子
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java"); consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); <br> consumer.start(); System.out.println("Consumer Started."); }
附錄