一、生產者:java
package com.ebways.mq.test.mq; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; /** * Created by gmq on 2016/10/13 0013. */ public class NewRocketProductor { public static void main(String[] args) throws MQClientException, Exception { /** * 一個應用建立一個Producer,由應用來維護此對象,能夠設置爲全局對象或者單例<br> * 注意:ProducerGroupName須要由應用來保證惟一<br> * ProducerGroup這個概念發送普通的消息時,做用不大,可是發送分佈式事務消息時,比較關鍵, * 由於服務器會回查這個Group下的任意一個Producer */ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("192.168.100.190:9876"); producer.setInstanceName("Producer"); /** * Producer對象在使用以前必需要調用start初始化,初始化一次便可<br> * 注意:切記不能夠在每次發送消息時,都調用start方法 */ producer.start(); /** * 下面這段代碼代表一個Producer對象能夠發送多個topic,多個tag的消息。 * 注意:send方法是同步調用,只要不拋異常就標識成功。可是發送成功也可會有多種狀態,<br> * 例如消息寫入Master成功,可是Slave不成功,這種狀況消息屬於成功,可是對於個別應用若是對消息可靠性要求極高,<br> * 須要對這種狀況作處理。另外,消息可能會存在發送失敗的狀況,失敗重試由應用來處理。 */ for (int i = 0; i < 10; i++) { try { { Message msg = new Message("TopicTest1",// topic "TagA",// tag "OrderID001",// key ("Hello MetaQ").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest2",// topic "TagB",// tag "OrderID0034",// key ("Hello MetaQ").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest3",// topic "TagC",// tag "OrderID061",// key ("Hello MetaQ").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } /** * 應用退出時,要調用shutdown來清理資源,關閉網絡鏈接,從MetaQ服務器上註銷本身 * 注意:咱們建議應用在JBOSS、Tomcat等容器的退出鉤子裏調用shutdown方法 */ producer.shutdown(); } }
二、消費者服務器
package com.ebways.mq.test.mq; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * Created by Administrator on 2016/10/14 0014. */ public class NewRocketConsumer { public static void main(String[] args) throws Exception { /** * 一個應用建立一個Consumer,由應用來維護此對象,能夠設置爲全局對象或者單例<br> * 注意:ConsumerGroupName須要由應用來保證惟一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("192.168.100.190:9876"); consumer.setInstanceName("Consumber"); /** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); /** * 訂閱指定topic下全部消息<br> * 注意:一個consumer對象能夠訂閱多個topic */ consumer.subscribe("TopicTest2", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默認msgs裏只有一條消息,能夠經過設置consumeMessageBatchMaxSize參數來批量接收消息 */ @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.err.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 執行TopicTest1的消費邏輯 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 執行TagA的消費 System.err.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 執行TagC的消費 System.err.println("TagC:====="); System.err.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 執行TagD的消費 System.err.println("TagD:====="); System.err.println(new String(msg.getBody())); } } else if (msg.getTopic().equals("TopicTest2")) { System.err.println("TopicTest2:====="); System.err.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer對象在使用以前必需要調用start初始化,初始化一次便可<br> */ consumer.start(); System.err.println("Consumer Started."); } }