本文探討如何使用 RocketMQ Binder 完成 Spring Cloud 應用消息的訂閱和發佈。java
介紹
RocketMQ 是一款開源的分佈式消息系統,基於高可用分佈式集羣技術,提供低延時的、高可靠的消息發佈與訂閱服務,普遍應用於多個領域,包括異步通訊解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通訊、移動應用、手遊、視頻、物聯網、車聯網等。git
RocketMQ 是阿里巴巴在2012年開源的分佈式消息中間件,目前已經捐贈給 Apache 軟件基金會,並於2017年9月25日成爲 Apache 的頂級項目。做爲經歷過屢次阿里巴巴雙十一這種「超級工程」的洗禮並有穩定出色表現的國產中間件,以其高性能、低延時和高可靠等特性近年來已經也被愈來愈多的國內企業使用。github
RocketMQ特色
- 是一個隊列模型的消息中間件,具備高性能、高可靠、高實時、分佈式等特色
- Producer、Consumer、隊列均可以分佈式
- Producer 向一些隊列輪流發送消息,隊列集合稱爲 Topic,Consumer 若是作廣播消費,則一個 Consumer 實例消費這個 Topic 對應的全部隊列,若是作集羣消費,則多個 Consumer 實例平均消費這個 Topic 對應的隊列集合
- 可以保證嚴格的消息順序
- 支持拉(pull)和推(push)兩種消息模式
- 高效的訂閱者水平擴展能力
- 實時的消息訂閱機制
- 億級消息堆積能力
- 支持多種消息協議,如 JMS、OpenMessaging 等
- 較少的依賴
Spring Cloud Stream
Spring Cloud Stream 是一個構建消息驅動微服務的框架。web
Spring Cloud Stream 提供了消息中間件配置的統一抽象,推出了 pub/sub,consumer groups,semantics,stateful partition 這些統一的模型支持。spring
Spring Cloud Stream 核心構件有:Binders、Bindings和Message,應用程序經過 inputs 或者 outputs 來與 binder 交互,經過咱們配置來 binding ,而 binder 負責與中間件交互,Message爲數據交換的統一數據規範格式。apache
- Binding: 包括 Input Binding 和 Output Binding。
Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋樑,實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據便可,屏蔽了開發者與底層消息中間件的接觸。編程
- Binder: 跟外部消息中間件集成的組件,用來建立 Binding,各消息中間件都有本身的 Binder 實現。
好比 Kafka
的實現 KafkaMessageChannelBinder
,RabbitMQ
的實現 RabbitMessageChannelBinder
以及 RocketMQ
的實現 RocketMQMessageChannelBinder
。json
- Message:是 Spring Framework 中的一個模塊,其做用就是統一消息的編程模型。
好比消息 Messaging 對應的模型就包括一個消息體 Payload 和消息頭 Header。服務器
Window搭建部署RocketMQ
下載
下載出來解壓到:D:\rocketmq 目錄,目錄最好不要帶空格和太深,不然服務運行可能會報錯
啓動NameServer服務
在啓動以前須要配置系統環境,否則會報錯。
Please set the ROCKETMQ_HOME variable in your environment!
系統環境變量名:ROCKETMQ_HOME
根據你解壓的目錄配置環境變量,好比個人變量值爲:D:\rocketmq
進入window命令窗口,進入D:\rocketmq\bin目錄下,執行
start mqnamesrv.cmd
如上則NameServer啓動成功。使用期間,窗口不要關閉。
啓動Broker服務
進入bin目錄下,輸入
start mqbroker.cmd -n localhost:9876
如上的 ip+port 是rocketmq的服務地址和端口。
運行如上命令,可能會報以下錯誤。找不到或沒法加載主類
若是出此狀況,打開bin-->runbroker.cmd,修改%CLASSPATH%成"%CLASSPATH%"
保存再次執行如上命令。執行成功後,提示boot success 表明成功。
示例
本示例實現三種消息的發佈以及訂閱接收。
建立 RocketMQ 消息生產者
建立 ali-rocketmq-producer 工程,端口爲:28081
- pom.xml添加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud-alibaba</artifactId> <groupId>com.easy</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>ali-rocketmq-producer</artifactId> <packaging>jar</packaging> <dependencies> <!--rocketmq依賴--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> <!--web依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
- 配置 Output 的 Binding 信息並配合
@EnableBinding
註解使其生效
application.yml配置
server: port: 28081 spring: application: name: ali-rocketmq-producer cloud: stream: rocketmq: binder: # RocketMQ 服務器地址 name-server: 127.0.0.1:9876 bindings: output1: {destination: test-topic1, content-type: application/json} output2: {destination: test-topic2, content-type: application/json} management: endpoints: web: exposure: include: '*' endpoint: health: show-details: always
ArProduceApplication.java
@SpringBootApplication @EnableBinding({MySource.class}) public class ArProduceApplication { public static void main(String[] args) { SpringApplication.run(ArProduceApplication.class, args); } }
- 消息生產者服務
MySource.java
package com.easy.arProduce; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface MySource { @Output("output1") MessageChannel output1(); @Output("output2") MessageChannel output2(); }
SenderService.java
package com.easy.arProduce; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.util.MimeTypeUtils; @Service public class SenderService { @Autowired private MySource source; /** * 發送字符串 * * @param msg */ public void send(String msg) { Message message = MessageBuilder.withPayload(msg) .build(); source.output1().send(message); } /** * 發送帶tag的字符串 * * @param msg * @param tag */ public void sendWithTags(String msg, String tag) { Message message = MessageBuilder.withPayload(msg) .setHeader(RocketMQHeaders.TAGS, tag) .build(); source.output1().send(message); } /** * 發送對象 * * @param msg * @param tag * @param <T> */ public <T> void sendObject(T msg, String tag) { Message message = MessageBuilder.withPayload(msg) .setHeader(RocketMQHeaders.TAGS, tag) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); source.output2().send(message); } }
編寫 TestController.java 控制器方便測試
package com.easy.arProduce; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "test") public class TestController { @Autowired SenderService senderService; @RequestMapping(value = "/send", method = RequestMethod.GET) public String send(String msg) { senderService.send(msg); return "字符串消息發送成功!"; } @RequestMapping(value = "/sendWithTags", method = RequestMethod.GET) public String sendWithTags(String msg) { senderService.sendWithTags(msg, "tagStr"); return "帶tag字符串消息發送成功!"; } @RequestMapping(value = "/sendObject", method = RequestMethod.GET) public String sendObject(int index) { senderService.sendObject(new Foo(index, "foo"), "tagObj"); return "Object對象消息發送成功!"; } }
建立 RocketMQ 消息消費者
建立 ali-rocketmq-consumer 工程,端口爲:28082
- pom.xml添加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud-alibaba</artifactId> <groupId>com.easy</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>ali-rocketmq-consumer</artifactId> <packaging>jar</packaging> <dependencies> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-配置 Input 的 Binding 信息並配合 @EnableBinding
註解使其生效
application.yml配置
server: port: 28082 spring: application: name: ali-rocketmq-consumer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 #rocketmq 服務地址 bindings: input1: {consumer.orderly: true} #是否排序 input2: {consumer.tags: tagStr} #訂閱 帶tag值爲tagStr的字符串 input3: {consumer.tags: tagObj} #訂閱 帶tag值爲tabObj的字符串 bindings: input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1} input2: {destination: test-topic1, content-type: application/plain, group: test-group2, consumer.maxAttempts: 1} input3: {destination: test-topic2, content-type: application/plain, group: test-group3, consumer.maxAttempts: 1} management: endpoints: web: exposure: include: '*' endpoint: health: show-details: always
ArConsumerApplication.java
package com.easy.arConsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; @SpringBootApplication @EnableBinding({MySource.class}) public class ArConsumerApplication { public static void main(String[] args) { SpringApplication.run(ArConsumerApplication.class, args); } }
- 消息消費者服務
MySource.java
package com.easy.arConsumer; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface MySource { @Input("input1") SubscribableChannel input1(); @Input("input2") SubscribableChannel input2(); @Input("input3") SubscribableChannel input3(); }
ReceiveService.java
package com.easy.arConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; @Service @Slf4j public class ReceiveService { @StreamListener("input1") public void receiveInput1(String receiveMsg) { log.info("input1 接收到了消息:" + receiveMsg); } @StreamListener("input2") public void receiveInput2(String receiveMsg) { log.info("input2 接收到了消息:" + receiveMsg); } @StreamListener("input3") public void receiveInput3(@Payload Foo foo) { log.info("input3 接收到了消息:" + foo); } }
使用示例
示例關聯項目
本示例咱們建立了兩個項目實現
-
ali-rocketmq-producer:RocketMQ 消息服務生產者,服務名:ali-rocketmq-producer,端口:28081
-
ali-rocketmq-consumer:RocketMQ 消息服務消費者,服務名:ali-rocketmq-producer,端口:28082
運行示例測試
首先要啓動ali-rocketmq-producer服務及ali-rocketmq-consumer服務
- 訪問消息服務生產者地址: http://localhost:28081/test/send?msg=yuntian
查看服務消費者控制檯,輸出
2019-12-04 15:37:47.859 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input1 接收到了消息:yuntian 2019-12-04 15:37:47.859 INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDA70E0014 cost: 1 ms
表示字符串消費成功被input1消費了
查看服務消費者控制檯,輸出
2019-12-04 15:38:09.586 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input2 接收到了消息:tagyuntian 2019-12-04 15:38:09.592 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input1 接收到了消息:tagyuntian 2019-12-04 15:38:09.592 INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDFCD30015 cost: 6 ms
表示帶tag的字符串成功被input2和input1消費了,由於input1也訂閱了test-topic1,而且沒有咱們沒有加tag過濾,默認表示接收全部消息,因此也能成功接收tagyuntian字符串
- 訪問消息服務生產者地址: http://localhost:28081/test/sendObject?index=1
查看服務消費者控制檯,輸出
2019-12-04 15:41:15.285 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input3 接收到了消息:Foo{id=1, bar='foo'}
表示input3成功接收到了tag帶tagObj的對象消息了,而input1卻沒有輸出消息,這是由於sendObject發佈的消息走的是test-topic2消息管道,因此不會發布給input1及input2訂閱者