目錄java
64bit OS, Linux/Unix/Mac is recommended; 64bit JDK 1.8+; Maven 3.2.x; Git; 4g+ free disk for Broker server
unzip rocketmq-all-4.4.0-bin-release.zip cd rocketmq-all-4.4.0-bin-release/
# 在如下文件中替換默認的日誌路徑 ${user.home}/logs/rocketmqlogs/* conf/logback_broker.xml conf/logback_namesrv.xml conf/logback_tools.xml
conf 文件夾裏有三種配置git
##以修改 conf/2m-noslave/broker-a.properties 爲例 #所屬集羣名字 brokerClusterName=DefaultCluster #broker名字,注意此處不一樣的配置文件填寫的不同 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,分號分割 namesrvAddr=127.0.0.1:9876 #改爲服務器的本機ip,重要! brokerIP1=127.0.0.1 #在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數 defaultTopicQueueNums=4 #是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉 autoCreateTopicEnable=true #是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉 autoCreateSubscriptionGroup=true #Broker 對外服務的監聽端口 listenPort=10911 #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=48 #commitLog每一個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每一個文件默認存30W條,根據業務狀況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 #自定義數據存儲路徑 #存儲路徑 storePathRootDir=/usr/local/rocketmq/store #commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步複製Master #- SYNC_MASTER 同步雙寫Master #- SLAVE brokerRole=ASYNC_MASTER #刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #發消息線程池數量 #sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128
> nohup sh bin/mqnamesrv & > tail -f logs/rocketmqlogs/namesrv.log The Name Server boot success...
> nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-noslave/broker-a.propertie & > tail -f logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success...
> sh bin/mqshutdown broker The mqbroker(36695) is running... Send shutdown request to mqbroker(36695) OK > sh bin/mqshutdown namesrv The mqnamesrv(36664) is running... Send shutdown request to mqnamesrv(36664) OK
> export NAMESRV_ADDR=localhost:9876 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= ... > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt...
發消息報錯github
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:588) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1223) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1173) at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:214) at com.flying.demo.Producer.main(Producer.java:25)
緣由:沒有配置brokerIPdocker
建議docker 部署
非docker部署參見官方文檔 https://github.com/apache/rocketmq-externals/tree/master/rocketmq-consoleapache
docker pull styletang/rocketmq-console-ng docker run --name rocketmq-console -dit -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m" -p 8080:8080 styletang/rocketmq-console-ng bash # 進入docker查看 docker exec -it rocketmq-console /bin/bash
部署完成,到對應的端口查看便可。能夠查看服務監控,歷史消息詳情,手動建立topic等。
bash
官方鏡像 rocketmqinc/rocketmq
部署步驟參見 http://www.justdojava.com/2019/08/26/rocketmq-creator/服務器
# 引入依賴 implementation 'org.apache.rocketmq:rocketmq-client:4.3.0'
package mq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws Exception { //建立一個消息生產者,並設置一個消息生產者組 DefaultMQProducer producer = new DefaultMQProducer("producer_test"); //指定 NameServer 地址 producer.setNamesrvAddr("127.0.0.1:9876"); //初始化 Producer,整個應用生命週期內只須要初始化一次 producer.start(); for (int i = 0; i < 5; i++) { //建立一條消息對象,指定其主題、標籤和消息內容,若是服務不支持自動建立topic的話,須要先手動建立topic,也能夠在rocketmq console裏操做 Message msg = new Message( "topic_test" /* 消息主題名 */, "TagA" /* 消息標籤 */, ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內容 */ ); //發送消息並返回結果 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } // 一旦生產者實例再也不被使用則將其關閉,包括清理資源,關閉網絡鏈接等 producer.shutdown(); } }
package mq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.io.UnsupportedEncodingException; import java.util.Date; import java.util.List; public class Consumer { public static void main(String[] args) throws Exception { //建立一個消息消費者,並設置一個消息消費者組 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test"); //指定 NameServer 地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //設置 Consumer 第一次啓動時從隊列頭部開始消費仍是隊列尾部開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //訂閱指定 Topic 下的全部消息 consumer.subscribe("topic_test", "*"); //註冊消息監聽器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { //默認 list 裏只有一條消息,能夠經過設置參數來批量接收消息 if (list != null) { for (MessageExt ext : list) { try { System.out.println(new Date() + new String(ext.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 消費者對象在使用以前必需要調用 start 初始化 consumer.start(); System.out.println("消息消費者已啓動"); } }