本文快速入門,RocketMQ消息系統的安裝部署,發送,和接收消息,監控消息,的詳細說明。java
64位操做系統,建議使用Linux / Unix /git
請參考個人另外一篇文章github
www.ymq.io/2018/02/01/…spring
新建一個 maven 項目,這裏就不詳細操做了,你們都會的apache
不過也能夠下載個人示例源碼,下載地址以下app
GitHub 源碼:github.com/souyunku/sp…maven
在POM 中添加以下依賴ide
<!-- RocketMq客戶端相關依賴 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.1.0-incubating</version>
</dependency>
複製代碼
在配置文件 application.properties
添加一下內容spring-boot
# 消費者的組名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生產者的組名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=192.168.252.121:9876
複製代碼
@Component
public class Producer {
/** * 生產者的組名 */
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
/** * NameServer 地址 */
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQProducer() {
//生產者的組名
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多個地址以 ; 隔開
producer.setNamesrvAddr(namesrvAddr);
try {
/** * Producer對象在使用以前必需要調用start初始化,初始化一次便可 * 注意:切記不能夠在每次發送消息時,都調用start方法 */
producer.start();
for (int i = 0; i < 100; i++) {
String messageBody = "我是消息內容:" + i;
String message = new String(messageBody.getBytes(), "utf-8");
//構建消息
Message msg = new Message("PushTopic" /* PushTopic */, "push"/* Tag */, "key_" + i /* Keys */, message.getBytes());
//發送消息
SendResult result = producer.send(msg);
System.out.println("發送響應:MsgId:" + result.getMsgId() + ",發送狀態:" + result.getSendStatus());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
複製代碼
@Component
public class Consumer {
/** * 消費者的組名 */
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
/** * NameServer地址 */
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
//消費者的組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//指定NameServer地址,多個地址以 ; 隔開
consumer.setNamesrvAddr(namesrvAddr);
try {
//訂閱PushTopic下Tag爲push的消息
consumer.subscribe("PushTopic", "push");
//設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費
//若是非第一次啓動,那麼按照上次消費的位置繼續消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : list) {
System.out.println("messageExt: " + messageExt);//輸出消息內容
String messageBody = new String(messageExt.getBody(), "utf-8");
System.out.println("消費響應:Msg: " + messageExt.getMsgId() + ",msgBody: " + messageBody);//輸出消息內容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍後再試
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
}
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
複製代碼
@SpringBootApplication
public class SpringBootRocketmqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootRocketmqApplication.class, args);
}
}
複製代碼
控制檯會有響應
發送響應:MsgId:0AFF015E556818B4AAC208A0504F0063,發送狀態:SEND_OK
messageExt: MessageExt [queueId=0, storeSize=195, queueOffset=113824, sysFlag=0, bornTimestamp=1517559124047, bornHost=/192.168.252.1:62165, storeTimestamp=1517559135052, storeHost=/192.168.252.121:10911, msgId=C0A8FC7900002A9F00000000056F499C, commitLogOffset=91179420, bodyCRC=1687852546, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=113825, KEYS=key_99, CONSUME_START_TIME=1517559124049, UNIQ_KEY=0AFF015E556818B4AAC208A0504F0063, WAIT=true, TAGS=push}, body=21]]
消費響應:Msg: 0AFF015E556818B4AAC208A0504F0063,msgBody: 我是消息內容:99
...
複製代碼
RocketMQ web界面監控RocketMQ-Console-Ng部署
下載而且 maven 編譯
git clone https://github.com/apache/rocketmq-externals.git
rocketmq-externals/rocketmq-console/
mvn clean package -Dmaven.test.skip=true
複製代碼
rocketmq.config.namesrvAddr
NameServer
地址,默認啓動端口8080
nohup java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=127.0.0.1:9876
複製代碼
GitHub 源碼:github.com/souyunku/sp…
Gitee 源碼:gitee.com/souyunku/sp…