爲了學習和方便測試,老是要啓動一個單機版的。下載 http://rocketmq.apache.org/dowloading/releases/java
ROCKETMQ_HOME
E:\rocketmq-all-4.5.1-bin-release
啓動git
E:\rocketmq-all-4.5.1-bin-release\bin>mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
可能會出現一個錯誤: 找不到或沒法加載主類 Files\Java\jdk1.8.0_161\lib;C:\Programgithub
解決方法:(打開bin目錄的runserver.cmd)apache
修改爲maven
從新啓動,成功學習
https://github.com/apache/rocketmq-externals測試
下載好進入 rocketmq-console 目錄打包spa
mvn clean package -Dmaven.test.skip=true
進入target目錄,啓動 (最後的參數的nameserver的地址,也就是我本機地址)3d
E:\rocketmq-externals-master\rocketmq-console\target>java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876
最後訪問 http://localhost:8080 便可code
引入依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.1</version> </dependency>
一個簡單的生產者
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; public class Test { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { // 設置生產者組名 DefaultMQProducer producer = new DefaultMQProducer("my_producer_group"); // 設置NameServer地址 producer.setNamesrvAddr("10.204.241.15:9876"); // 啓動 producer.start(); for (int i = 0; i < 10; i++) { // 建立一條消息,包含topic、tag以及消息內容 Message msg = new Message("MyTopic", "MyTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 發送結果 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } // 不用的時候關閉 producer.shutdown(); } }
查看管控臺
查看詳細
下面一個簡單的消費者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.List; public class Test2 { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { // 設置生產者組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_producer_group"); // 設置NameServer地址 consumer.setNamesrvAddr("localhost:9876"); // 訂閱的主題 consumer.subscribe("MyTopic", "*"); // 註冊消息監聽 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啓動 consumer.start(); System.out.printf("Consumer Started.%n"); } }
控制檯輸出
不要關閉消費者,查看管控臺