前面幾篇文章介紹了爲何選擇RocketMQ,以及與kafka的一些對比: 阿里 RocketMQ 優點對比,方便你們對於RocketMQ有一個簡單的總體瞭解,以後介紹了:MQ 應用場景,讓咱們知道MQ在何時能夠使用,能夠解決什麼問題,以後介紹了:RocketMQ集羣部署配置;本篇文章接着上篇內容以後,來給你們介紹下RocketMQ快速入門。java
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>
複製代碼
DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
//指定NameServer地址
producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改成本身的
/** * Producer對象在使用以前必需要調用start初始化,初始化一次便可 * 注意:切記不能夠在每次發送消息時,都調用start方法 */
producer.start();
for (int i = 0; i < 997892; i++) {
try {
//構建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("測試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//發送同步消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
複製代碼
/** * Consumer Group,很是重要的概念,後續會慢慢補充 */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
//指定NameServer地址,多個地址以 ; 隔開
consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改成本身的
/** * 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費 * 若是非第一次啓動,那麼按照上次消費的位置繼續消費 */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg:msgs){
String msgbody = new String(msg.getBody(), "utf-8");
System.out.println(" MessageBody: "+ msgbody);//輸出消息內容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍後再試
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
複製代碼
各位根據本身的環境,修改NamesrvAddr的值,個人集羣請參考:RocketMQ集羣部署配置。稍後經過RocketMQ管控臺就能夠看到以前搭建的多Master多Slave模式,異步複製集羣模式。git
rocketmq-console-ng獲取方式爲:rocketmq-console-ng,以後經過mavne進行編譯獲取jar,命令以下:github
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar
複製代碼
獲得rocketmq-console-ng-1.0.0.jar以後,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根據本身的NamesrvAddr進行修改rocketmq.config.namesrvAddr的值。spring
直接啓動:apache
java -jar rocketmq-console-ng-1.0.0.jar
複製代碼
一個好的習慣是先運行Consumer,以後在運行Producer,以後經過rocketmq-console-ng管控臺觀察springboot
運行完成以後,的確broker-a的數據加上broker-b的數據量就等於咱們發送的數據量,並且slave的數量也master的數量也是一致的,效果以下:app
查看發送這些數據,2臺機器的磁盤狀況以下: rocketmq1佔用磁盤空間異步
到目前位置,關於RocketMQ快速入門就結束了,未完待續……maven
若是讀完以爲有收穫的話,歡迎點贊加關注。ide
我的公衆號,歡迎關注,查閱更多精彩歷史!!!