接上文,搭建好環境,用example中的示例只能進行有限的測試任務。Rocket-console沒法模擬發送和接收消息,因此自定義測試任務須要自行編寫demo程序。html
myeclipse 添加自定義jdk環境:參考文章Ajava
myeclipse添加自定義maven環境:參考文章B 、參考文章C markdown
File–New–Other–Maven Projuect–(Create a simple project)eclipse
直接把RocketMQ/devenv/lib
下的jar包都copy到剛建立的maven項目內maven
直接把RocketMQ的pom.xml的內容copy過去測試
src–New Package–New Class–Producer.javaui
內容能夠參考com.alibaba.rocketmq.example.quickstart(simple)
下的Producer,下文的Consumer相似spa
package com.alibaba.rocketmq.example.quickstart; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { // tc_pro1爲Producer group name DefaultMQProducer producer = new DefaultMQProducer("tc_pro1"); // 手動指定Namesrv服務地址 producer.setNamesrvAddr("192.168.1.170:9876"); producer.setInstanceName("Producer1-tp1"); producer.start(); // 若是broker關閉了自動建立Topic功能,請手動添加Topic:tc_demo,以確保能正常發送消息 for (int i = 0; i < 1; i++) { try { Message msg = new Message("tc_demo",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); SendResult sendResult = producer.send(msg); LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() { public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { // TODO Auto-generated method stub return null; } }; //producer.sendMessageInTransaction(msg, tranExecuter, arg) System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
xxx Package–New Class–Consumer.java.net
package com.alibaba.rocketmq.example.quickstart; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer1 { public static void main(String[] args) throws InterruptedException, MQClientException { // tc_con1爲Consumer group name,若是broker關閉了自動訂閱功能,請手動添加訂閱tc_con1,以確保能正常接收消息 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tc_con1"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 手動指定Namesrv服務地址 consumer.setNamesrvAddr("192.168.1.170:9876"); consumer.setInstanceName("Consumber1-tp1"); consumer.subscribe("tc_demo", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
前提:環境搭建成功,Namesrv 和 Broker服務運行正常,可經過jps
查看服務是否運行code
run Consumer.java /Producer.java 從myeclipse–console能夠看到Consumer角色成功啓動、Producer消息發送、Consumer接收消息。
至此,基於myeclipse上RocketMQ的demo實踐流程就走通了,更多的自定義擴展能夠參考其項目源碼
參考文章D:Producer多topic發送,Consumer多topic消費
mvn -version
顯示報錯Error: JAVA_HOME is set to an invalid directory.JAVA_HOME = 「C:\Program Files\Java\jdk1.7.0_17\bin」Please set the JAVA_HOME variable in your environment to match thelocation of your Java installation.
解決方案:jdk,maven的環境變量雖已在path裏設置完成,且jdk正常。但maven啓動另需JAVA_HOME,因此手動添加JAVA_HOME的值:xxx/java/jdk_1.7.xx (no /bin)
問題原由和解決方案:
Namesrv地址未指定或錯誤,請確認Namesrv地址
Namesrv或Broker未啓動,經過jps
查詢集羣(單機)節點服務狀態,若是沒有NamesrvStartup和BrokerStartup,從新啓動(能夠參看系列文章(二)/(三))
Broker關閉的自動建立topic和自動訂閱消費組的功能。調用mqadmin 下的 updateTopic 或updateSubGroup 建立topic或訂閱組