微服務架構中,消息隊列和遠程服務調用已經是兩大必不可少的組件,而RocketMQ和Dubbo正是阿里系貢獻的對應的兩大精品開源,做爲兩個已經獲得普遍應用的框架,好好學習研究是必需的。html
官方文檔:https://github.com/alibaba/RocketMQ/wiki/quick-start
根據文檔說明,須要如下軟件來完成這個快速開始示例:
⑴ 64bit OS, best to have Linux/Unix/Mac;
⑵ 64bit JDK 1.6+;
⑶ Maven 3.x
⑷ Git
⑸ Screenjava
做爲純Java程序,RocketMQ在Windows下也是能夠運行的,官方還準備了exe執行文件方便Windows環境下進行開發部署。
Windows下的編譯部署大同小異,有興趣能夠參考下面這個網址:
http://blog.csdn.net/ruishenh/article/details/22390809linux
若是Linux已經自帶JDK,能夠使用命令查看JDK版本,若是版本不符合64位1.6+,須要先卸載舊版本而後安裝新版本。
我安裝的是jdk-8u111-linux-x64.rpm,過程略過,有問題能夠參考下面的網址。
下載地址:http://www.oracle.com/technetwork/java/javase/downloads/index.htm
安裝教程:http://www.cnblogs.com/benio/archive/2010/09/14/1825909.html
按理32位JDK也是能夠運行的,只是須要調整內存配置,但可能不適合生產環境。未經測試,不妄言。git
1.3.1 下載github
cd /usr/javawork wget http://mirrors.hust.edu.cn/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
1.3.2 解壓算法
tar -zxvf apache-maven-3.3.9-bin.tar.gz
1.3.3 設置環境變量shell
vi /etc/profile
文件末尾添加兩行配置:apache
export M2_HOME=/usr/javawork/apache-maven-3.3.9 export PATH=$PATH:$M2_HOME/bin
退出vi執行命令使其生效:windows
source /etc/profile
1.3.4 添加alibaba的Maven倉庫鏡像(下載速度飛快)bash
vi /usr/javawork/apache-maven-3.3.9/conf/settings.xml
在<mirrors>項下添加鏡像信息:
<mirror> <id>alimaven</id> <name>aliyun maven</name> <mirrorOf>central</mirrorOf> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </mirror>
yum install git
若是已下載RocketMQ源碼包,Git能夠無需安裝。shell安裝腳本中有git pull命令,若是未安裝git,會提示command not found,但不影響後面的編譯。
若是嫌煩,vi install.sh打開文件刪掉 git pull這條命令便可。
yum install screen
screen 非必需,但安裝後切換會話很是方便。官方文檔中使用了這條命令,因此仍是裝上較好。
命令介紹:http://www.cnblogs.com/mchina/archive/2013/01/30/2880680.html
git clone https://github.com/alibaba/RocketMQ.git cd RocketMQ bash install.sh
若是編譯成功,最終獲得的目錄以下圖。devenv是軟連接,源文件在target目錄。
設置環境變量
cd devenv echo "ROCKETMQ_HOME=`pwd`" >> ~/.bash_profile
使環境變量生效:
source ~/.bash_profile
screen bash mqnamesrv
若是未安裝screen,能夠使用下面這條命令:
nohup sh mqnamesrv &(加 & 能夠後臺運行,不然Ctrl+c命令退出當前會話,服務會中止)
啓動成功的信息:
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
備註:出現三條警告信息是由於JDK1.8已經不支持設置方法區大小和指定CMS垃圾收集算法進行FullGC,後面會講如何去掉這些信息。
先按ctrl+a,而後再按d掛起當前會話,而後再執行如下命令:
screen bash mqbroker -n localhost:9876
啓動成功後除了警告信息外,會出現如下信息:
3.3.1 修改xml文件
cd bin vi mqadmin.xml vi mqbroker.xml vi mqnamesrv.xml vi mqfiltersrv.xml
依次打開這些xml文件,並刪除下圖紅色框中的標記的配置信息。
3.3.2 修改啓動文件
vi runserver.sh vi runbroker.sh
依次打開這兩個文件,去除下圖紅色標記的配置信息。
如圖, 黃色部分是內存設置, 由於當前是虛擬機搭建的開發環境, 因此內存調整成以下:
runserver.sh: -Xms512m -Xmx512m -Xmn256m
runbroker.sh: -Xms2g -Xmx2g -Xmn512m
關閉Name Server
sh mqshutdown namesrv
關閉Broker
sh mqshutdown broker
cd ~/logs/rocketmqlogs/
3.6.1 設置地址
export NAMESRV_ADDR=localhost:9876
3.6.2 測試命令
生產者:
bash tools.sh com.alibaba.rocketmq.example.quickstart.Producer
消費者:
bash tools.sh com.alibaba.rocketmq.example.quickstart.Consumer
/**生產者*/ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /** * 一個應用建立一個Producer,由應用來維護此對象,能夠設置爲全局對象或者單例<br> * 注意:ProducerGroupName須要由應用來保證惟一<br> * ProducerGroup這個概念發送普通的消息時,做用不大,可是發送分佈式事務消息時,比較關鍵, * 由於服務器會回查這個Group下的任意一個Producer */ final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("192.168.0.11:9876"); producer.setInstanceName("Producer"); /** * Producer對象在使用以前必需要調用start初始化,初始化一次便可<br> * 注意:切記不能夠在每次發送消息時,都調用start方法 */ producer.start(); /** * 下面這段代碼代表一個Producer對象能夠發送多個topic,多個tag的消息。 * 注意:send方法是同步調用,只要不拋異常就標識成功。可是發送成功也可會有多種狀態,<br> * 例如消息寫入Master成功,可是Slave不成功,這種狀況消息屬於成功,可是對於個別應用若是對消息可靠性要求極高,<br> * 須要對這種狀況作處理。另外,消息可能會存在發送失敗的狀況,失敗重試由應用來處理。 */ for (int i = 0; i < 10; i++) { try { { Message msg = new Message("TopicTest1", // topic "TagA", // tag "OrderID001", // key ("Hello MetaQA").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest2", // topic "TagB", // tag "OrderID0034", // key ("Hello MetaQB").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest3", // topic "TagC", // tag "OrderID061", // key ("Hello MetaQC").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } /** * 應用退出時,要調用shutdown來清理資源,關閉網絡鏈接,從MetaQ服務器上註銷本身 * 注意:咱們建議應用在JBOSS、Tomcat等容器的退出鉤子裏調用shutdown方法 */ // producer.shutdown(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { producer.shutdown(); } })); System.exit(0); } }
/** 消費者 */ public class PushConsumer { /** * 當前例子是PushConsumer用法,使用方式給用戶感受是消息從RocketMQ服務器推到了應用客戶端。<br> * 可是實際PushConsumer內部是使用長輪詢Pull方式從MetaQ服務器拉消息,而後再回調用戶Listener方法<br> */ public static void main(String[] args) throws InterruptedException, MQClientException { /** * 一個應用建立一個Consumer,由應用來維護此對象,能夠設置爲全局對象或者單例<br> * 注意:ConsumerGroupName須要由應用來保證惟一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("192.168.0.11:9876"); consumer.setInstanceName("Consumber"); /** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); /** * 訂閱指定topic下全部消息<br> * 注意:一個consumer對象能夠訂閱多個topic */ consumer.subscribe("TopicTest2", "*"); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 執行TopicTest1的消費邏輯 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 執行TagA的消費 System.out.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 執行TagC的消費 System.out.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 執行TagD的消費 System.out.println(new String(msg.getBody())); } } else if (msg.getTopic().equals("TopicTest2")) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer對象在使用以前必需要調用start初始化,初始化一次便可<br> */ consumer.start(); System.out.println("ConsumerStarted."); } }
分別運行以上兩個程序,正常會輸出以下信息:
生產者端:
SendResult [sendStatus=SEND_OK, msgId=C0A80008326873D16E9381E943250000,offsetMsgId=C0A8000B00002A9F0000000000031862, messageQueue=MessageQueue [topic=TopicTest1, brokerName=localhost.localdomain, queueId=2], queueOffset=9] SendResult [sendStatus=SEND_OK, msgId=C0A80008326873D16E9381E943540001,offsetMsgId=C0A8000B00002A9F0000000000031921, messageQueue=MessageQueue [topic=TopicTest2, brokerName=localhost.localdomain, queueId=0], queueOffset=10] SendResult [sendStatus=SEND_OK, msgId=C0A80008326873D16E9381E943630002,offsetMsgId=C0A8000B00002A9F00000000000319E1, messageQueue=MessageQueue [topic=TopicTest3, brokerName=localhost.localdomain, queueId=2], queueOffset=11]
消費者端:
ConsumerStarted. ConsumeMessageThread_1 Receive New Messages: 1 Hello MetaQA ConsumeMessageThread_4 Receive New Messages: 1 Hello MetaQA
com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest1
See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist for further details.
若是出現以上錯誤,是由於啓動Broker時後面的主機和端口未指定
screen bash mqbroker -n localhost:9876
JDK:JDK_1.8.111_x64
O S:Centos_6.5_x64
RocketMQ:3.5.8