在使用MQ以前,咱們回顧一下前兩篇博文的內容.java
RocketMQ
的四個概念,分別是:Producer
,Consumer
,Message
和Broker
RocketMQ
和其後臺系統在本篇博文中,咱們會使用使用SpringBoot構建兩個微服務,一個做爲生產者,一個做爲消費者,經過RocketMQ
傳遞消息,瞭解在Java
中使用RocketMQ的方法.git
在灰皮書第一篇文章中,我畫了下面這個圖:github
如今咱們本地的RocketMQ
也部署起來了,接下來咱們建立兩個微服務經過MQ來收發消息,實現基本的流程.spring
首先咱們建立兩個基於SpringBoot
的微服務,分別是:apache
rocketmq-consumer
消息消費者rocketmq-producer
消息生產者兩個服務裏面,rocketmq-consumer
的端口號是2001,rocketmq-producer
的端口號是2002springboot
分別在兩個微服務寫兩個測試方法,啓動測試:app
rocketmq-consumeride
@RestController public class ConsumerController { @GetMapping("/consumer") public String index() { return "rocketmq-consumer"; } }
rocketmq-producerspring-boot
@RestController public class ProducerController { @GetMapping("/producer") public String index() { return "rocketmq-producer"; } }
啓動測試,兩個接口都成功訪問.微服務
根據咱們最上面的圖,服務A發送消息到服務B,在這裏,咱們用rocketmq-producer
來發送消息,消息發送到rocketmq
之後,由服務Brocketmq-consumer
消費消息.
使用rocketmq發送消息有不少種方式,由於咱們使用的是SpringBoot
,這裏直接使用官方提供的rocketmq-spring-boot-starter
包來開發
在github
上有個項目:RocketMQ-Spring
它就是RocketMq官方提供的整合了SpringBoot
的rocketmq工具包,git地址以下:https://github.com/apache/rocketmq-spring
固然,你也可使用原生的rocketmq-client
包,在官方的示例中,使用的就是這種方式,具體能夠查看官方文檔,下面咱們直接使用rocketmq-spring-boot-starter
來發送消息.
咱們能夠看到有不少的版本能夠用:
這裏咱們使用2.0.3
這個版本吧,具體的官方細節能夠查看https://github.com/apache/rocketmq-spring/blob/release-2.0.3/README_zh_CN.md
首先是pom座標:
<!--add dependency in pom.xml--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
而後再rocketmq-producer
的配置文件中配置rocketmq的name-server
和group
## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=producer
rocketmq-spring-boot-starter
中提供了一個RocketMQTemplate
來方便咱們發送消息,咱們能夠直接注入這個類來使用.
RocketMQTemplate
有send
方法和convertAndSend
方法,均可以用來發送消息,區別是,前者的方法入參是rocketmq
規定的Message
類型,然後者能夠發送對象,而且幫咱們轉換,源碼以下:
/** * Send a message to the given destination. * @param destination the target destination * @param message the message to send */ void send(D destination, Message<?> message) throws MessagingException; /** * Convert the given Object to serialized form, possibly using a * {@link org.springframework.messaging.converter.MessageConverter}, * wrap it as a message and send it to a default destination. * @param payload the Object to use as payload */ void convertAndSend(Object payload) throws MessagingException;
下面咱們直接發送消息到mq
@Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/producer") public String index() { rocketMQTemplate.convertAndSend("test-topic", "消息發送成功!"); return "rocketmq-producer"; }
convertAndSend
方法有兩個參數,第一個參數是消息要發送到的topic
,也就是目的地,第二個參數就是消息自己,至於topic究竟是什麼,這個咱們後面詳細來講,咱們只須要知道,咱們的消息發送到了rocketmq
的一個叫作test-topic的地方便可.
而且,因爲咱們在灰皮書第二章的時候,啓動mq的時候,指定了autoCreateTopicEnable=true
,也就是說,咱們使用RocketMQTemplate
發送的消息,就算topic以前不存在,rocket也會幫咱們建立好.
編碼完成,重啓項目,咱們只要訪問http://localhost:2002/producer
就會發送消息到mq,咱們能夠經過rocketmq-console
查看咱們發送的消息
能夠看到mq自動爲咱們建立了topic:
在message頁籤,能夠查看到咱們剛纔發送的消息:
詳細的消息內容:
在上面的例子中,咱們直接發送字符串到MQ,通常來講,咱們發送的消息體是一個java對象,在這裏也是能夠的,咱們改造一下代碼:
@GetMapping("/producer") public String index() { rocketMQTemplate.convertAndSend("test-topic", new User("張三", 20)); return "rocketmq-producer"; } @Data class User implements Serializable { private static final long serialVersionUID = -3486413003967431764L; private String name; private Integer age; User() {} User(String name, Integer age) { this.name = name; this.age = age; } }
這樣咱們發送了一個User對象到RocketMQ
中,咱們再去rocketmq-console
查看:
能夠看到,消息成功發送到了mq中,須要注意的是,這裏咱們發送的對象要實現Serializable
接口,否則會拋異常.
那麼咱們發送的消息的內容是怎麼序列化的呢?
RocketMQ的消息體都是以
byte[]
方式存儲的,若是內容體是java.lang.String
類型時,統一按照UTF-8
編碼轉成byte[]
;若是消息內容不是String類型的,則採用jackson-databind
序列化成JSON格式的字符串後,再統一按照UTF-8
編碼轉換成byte[]
以上釋義源於RocketMQ
官方文檔,因此說,有問題多看看官方文檔能很大程度上解決咱們的疑惑!
好了,咱們的消息發送成功了,接下來咱們在rocketmq-consumer
應用中消費以前發送出來的消息.
在開發以前咱們先想一下: 消息的生產者隨着用戶的請求,不斷的往MQ中發送消息,那麼消費者在消費消息的時候,是怎麼知道它要取哪一條消息呢?
咱們以前的文章中提到過一個topic
,生產者在發送消息的時候,會指定一個topic,消息會發送到某個topic下,那麼天然而然的,消費者在獲取消息的時候,也是須要知道它要從哪一個topic
裏面去獲取消息的.
而獲取消息,則是經過監聽器
來完成的.
建立一個監聽器:
@Component @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer") @Slf4j public class Consumerlistener implements RocketMQListener<User> { @Override public void onMessage(User message) { log.info("收到消息 : {}", message); } }
@RocketMQMessageListener
註解中咱們指定了2個參數:
其次,咱們自定義的監聽器還要實現RocketMQListener<T>
接口,該接口的泛型類型就是咱們生產者發送消息的消息類型,以前咱們發送的是User
對象,所以這裏也是User
對象
實現RocketMQListener
接口的onMessage
方法,方法的入參就是咱們發送出來的消息,在這個方法中咱們能夠進行本身的業務處理.
啓動服務rocketmq-consumer
,能夠看到正常消費到了消息:
以上,咱們成功的在咱們的微服務中使用RocketMQ
進行了消息的發送和消費.
不只僅是簡單的消息,RocketMQ
還支持更高級的功能,好比事務消息
、消息軌跡
等,這些高級特效咱們會下後面的進階文章中詳細講解.
在本篇博文中,咱們使用RocketMQ
官方提供的pom包進行了消息的發送和接收,也成功的在rocketmq-console
中查看到了消息.
在這個工程中,咱們接觸了不少新的概念:
以上這些概念,以及前面篇文章中遺留下來的概念,咱們將在下一篇文章中詳細介紹.
我的公衆號<橙耘自留地>日前已經開通,後續博主發佈的文章都會一併更新到公衆號,若有須要,自行查閱.
關於橙耘自留地,是我我的聚焦互聯網技術棧學習分享的一個平臺,創立之初是由於目前業內各類技術課程資料層次不齊,褒貶不一,有時候一門課花費高價買入,其實內含草包,有時偶爾低價得之,卻又大有乾貨.所以我會根據你們的意見和評價,選擇不一樣的技術棧去學習,一爲提高我本身的技術,二爲你們梳理出質量比較好的課程,以做參考.同時,相關的學習心得也會一併更新到博客和公衆號.