1、RocketMQ機器硬件要求內存最好不要低於8G, 系統linux,且已經安裝好JDKjava
2、安裝文件下載地址:
http://mirror.bit.edu.cn/apache/incubator/rocketmq/4.0.0-incubating/rocketmq-all-4.0.0-incubating-bin-release.ziplinux
3、下載RocketMQ安裝文件並上傳到服務器上後apache
解壓 bash
unzip rocketmq-all-4.0.0-incubating-bin-release.zip
進入到解壓目錄下的bin目錄中服務器
啓動 NameServer:maven
nohup sh mqnamesrv &
啓動 brokerui
nohup sh mqbroker -n localhost:9876 &
使用jps命令能夠看到有如下兩個java程序運行中.net
爲了便於其它機器調試訪問,可臨時將防火牆關閉:調試
service firewalld stop
4、Java 程序democode
maven依賴包:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.0.0-incubating</version> </dependency>
消息生產者demo代碼:
package com.classtest.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args){ DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("192.168.133.141:9876"); try { producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for test1.".getBytes()); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for test2.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "1", "Just for test3.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); }finally{ producer.shutdown(); } } }
消息消費者Java代碼demo:
package com.classtest.rocketmq; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("192.168.133.141:9876"); try { //訂閱PushTopic下Tag爲push的消息 consumer.subscribe("PushTopic", "push"); //程序第一次啓動從消息隊列頭取數據 consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> list, ConsumeConcurrentlyContext Context) { Message msg = list.get(0); System.out.println(msg.toString()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
能夠先運行消費者demo, 而後運行生產者demo時能夠消費者demo的運行窗口輸出消息
參考: