rocketmq模型如上圖所示,分爲以下幾個部分:java
由上各部分角色的功能可知,咱們須要先安裝啓動NameServer,再啓動Broker便可搭建完RocketMQspring
首先下載鏡像:docker
docker pull rocketmqinc/rocketmq:4.4.0
複製代碼
啓動NameServer,暴露9876端口apache
docker run --name rmqnamesrv -d -p 9876:9876 rocketmqinc/rocketmq:4.4.0 sh mqnamesrv 複製代碼
啓動完成後,能夠curl 9876端口測試服務是否啓動成功bash
RocketMQ是Java編寫的程序,Broker和NameServer都在上面的鏡像中,只是啓動命令不一樣而已。服務器
啓動Brokermarkdown
docker run --name rmqbroker -d -p 10911:10911 -p 10909:10909 --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" rocketmqinc/rocketmq:4.4.0 sh mqbroker 複製代碼
--link 將NameServer容器起個別名,Broker中須要配置一個NAMESRV_ADDR參數指向NameServer地址。app
同上,這裏也能夠使用curl localhost:10911驗證下服務器是否啓動curl
這一個步驟不作也能夠經過Java等客戶端訪問到RocketMQ了,不過有可視化界面便於觀察RocketMQ數據。不須要的能夠跳過這一步maven
下載鏡像:
docker pull pangliang/rocketmq-console-ng
複製代碼
啓動容器:
docker run --name rmqconsole -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" pangliang/rocketmq-console-ng 複製代碼
自此,也能夠使用curl命令測試控制檯界面是否成功啓動。curl localhost:8080,以下表示啓動成功。
宿主機也能夠登陸訪問控制檯界面。
maven中先導入apache官方提供的starter
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency> 複製代碼
application.yml配置一個name-server地址,具體值看你的機器。
這裏也能夠經過accessKey和secureKey登陸鏈接。默認配置在RocketMQ的配置文件中,默認值是:
accessKey: RocketMQ
secureKey: 12345678
複製代碼
生產者發送消息:
@RestController public class RocketController { @Autowired private RocketMQTemplate rocketMQTemplate; // 發送給Broker,默認會自動建立topic,topic和tag用冒號分隔 @GetMapping("/rocket/send") public String rocketSend() { LocalDateTime currentTime = LocalDateTime.now(); rocketMQTemplate.convertAndSend("rocket-topic-1", currentTime.toString()); return currentTime.toString(); } // 延時消息,RocketMQ支持這幾個級別的延時消息,不能自定義時長 // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h @GetMapping("/rocket/delayMsg/send") public String rocketDelayMsgSend() { LocalDateTime currentTime = LocalDateTime.now(); rocketMQTemplate.syncSend("rocket-topic-1:tag-2", MessageBuilder.withPayload(currentTime.toString()).build(), 2000, 3); return currentTime.toString(); } } 複製代碼
消費者:
@Component @Slf4j public class RokcetServiceListener { @Service @RocketMQMessageListener(consumerGroup = "consumer-group-1", topic = "rocket-topic-1") public class Consumer1 implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("consumer1 rocket收到消息:{}", s); } } // RocketMQ支持兩種消費方式,集器消費和廣播消費 @Service @RocketMQMessageListener(consumerGroup = "consumer-group-2", topic = "rocket-topic-1", selectorExpression = "tag2", messageModel = MessageModel.BROADCASTING) public class Consumer2 implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("consumer2 rocket收到消息:{}", s); } } } 複製代碼