使用 rocketmq-spring-boot-starter 來配置、發送和消費 RocketMQ 消息

簡介:本文將 rocktmq-spring-boot 的設計實現作一個簡單的介紹,讀者能夠經過本文了解將 RocketMQ Client 端集成爲 spring-boot-starter 框架的開發細節,而後經過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發送和消費 RocketMQ 消息。git

image.png

做者 | 遼天
來源 | 阿里巴巴雲原生公衆號github

導讀:本文將 rocktmq-spring-boot 的設計實現作一個簡單的介紹,讀者能夠經過本文了解將 RocketMQ Client 端集成爲 spring-boot-starter 框架的開發細節,而後經過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發送和消費 RocketMQ 消息。web

在 Spring 生態中玩轉 RocketMQ 系列文章:spring

《如何在 Spring 生態中玩轉 RocketMQ?》
《羅美琪和春波特的故事...》
《RocketMQ-Spring 畢業兩週年,爲何能成爲 Spring 生態中最受歡迎的 messaging 實現?》
本文配套可交互教程已登陸阿里雲知行動手實驗室,PC 端登陸 start.aliyun.com 在瀏覽器中當即體驗。apache

image.png

經過本文,您將瞭解到:編程

Spring 的消息框架介紹
rocketmq-spring-boot 具體實現
使用示例設計模式

前言瀏覽器

上世紀 90 年代末,隨着 Java EE(Enterprise Edition) 的出現,特別是 Enterprise Java Beans 的使用須要複雜的描述符配置和死板複雜的代碼實現,增長了廣大開發者的學習曲線和開發成本,由此基於簡單的 XML 配置和普通 Java 對象(Plain Old Java Objects)的 Spring 技術應運而生,依賴注入(Dependency Injection), 控制反轉(Inversion of Control)和麪向切面編程(AOP)的技術更加敏捷地解決了傳統 Java 企業及版本的不足。bash

隨着 Spring 的持續演進,基於註解(Annotation)的配置逐漸取代了 XML 文件配置,2014 年 4 月 1 日,Spring Boot 1.0.0 正式發佈,它基於「約定大於配置」(Convention over configuration)這一理念來快速地開發、測試、運行和部署 Spring 應用,並能經過簡單地與各類啓動器(如 spring-boot-web-starter)結合,讓應用直接以命令行的方式運行,不需再部署到獨立容器中。這種簡便直接快速構建和開發應用的過程,可使用約定的配置而且簡化部署,受到愈來愈多的開發者的歡迎。服務器

Apache RocketMQ 是業界知名的分佈式消息和流處理中間件,簡單地理解,它由 Broker 服務器和客戶端兩部分組成:

其中客戶端一個是消息發佈者客戶端(Producer),它負責向 Broker 服務器發送消息;另一個是消息的消費者客戶端(Consumer),多個消費者能夠組成一個消費組,來訂閱和拉取消費 Broker 服務器上存儲的消息。

爲了利用 Spring Boot 的快速開發和讓用戶可以更靈活地使用 RocketMQ 消息客戶端,Apache RocketMQ 社區推出了 spring-boot-starter 實現。隨着分佈式事務消息功能在 RocketMQ 4.3.0 版本的發佈,近期升級了相關的 spring-boot 代碼,經過註解方式支持分佈式事務的回查和事務消息的發送。

本文將對當前的設計實現作一個簡單的介紹,讀者能夠經過本文了解將 RocketMQ Client 端集成爲 spring-boot-starter 框架的開發細節,而後經過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發送和消費 RocketMQ 消息。

Spring 中的消息框架

順便在這裏討論一下在 Spring 中關於消息的兩個主要的框架,即 Spring Messaging 和 Spring Cloud Stream。它們都可以與 Spring Boot 整合並提供了一些參考的實現。和全部的實現框架同樣,消息框架的目的是實現輕量級的消息驅動的微服務,能夠有效地簡化開發人員對消息中間件的使用複雜度,讓系統開發人員能夠有更多的精力關注於核心業務邏輯的處理。

1. Spring Messaging

Spring Messaging 是 Spring Framework 4 中添加的模塊,是 Spring 與消息系統集成的一個擴展性的支持。它實現了從基於 JmsTemplate 的簡單的使用 JMS 接口到異步接收消息的一整套完整的基礎架構,Spring AMQP 提供了該協議所要求的相似的功能集。在與 Spring Boot 的集成後,它擁有了自動配置能力,可以在測試和運行時與相應的消息傳遞系統進行集成。

單純對於客戶端而言,Spring Messaging 提供了一套抽象的 API 或者說是約定的標準,對消息發送端和消息接收端的模式進行規定,不一樣的消息中間件提供商能夠在這個模式下提供本身的 Spring 實現:在消息發送端須要實現的是一個 XXXTemplate 形式的 Java Bean,結合 Spring Boot 的自動化配置選項提供多個不一樣的發送消息方法;在消息的消費端是一個 XXXMessageListener 接口(實現方式一般會使用一個註解來聲明一個消息驅動的 POJO),提供回調方法來監聽和消費消息,這個接口一樣可使用 Spring Boot 的自動化選項和一些定製化的屬性。

若是有興趣深刻的瞭解 Spring Messaging 及針對不一樣的消息產品的使用,推薦閱讀這個文件。參考 Spring Messaging 的既有實現,RocketMQ 的 spring-boot-starter 中遵循了相關的設計模式並結合 RocketMQ 自身的功能特色提供了相應的 API(如順序、異步和事務半消息等)。

2. Spring Cloud Stream

Spring Cloud Stream 結合了 Spring Integration 的註解和功能,它的應用模型以下:

image.png

該圖片引自 spring cloud stream

Spring Cloud Stream 框架中提供一個獨立的應用內核,它經過輸入(@Input)和輸出(@Output)通道與外部世界進行通訊,消息源端(Source)經過輸入通道發送消息,消費目標端(Sink)經過監聽輸出通道來獲取消費的消息。這些通道經過專用的 Binder 實現與外部代理鏈接。開發人員的代碼只須要針對應用內核提供的固定的接口和註解方式進行編程,而不須要關心運行時具體的 Binder 綁定的消息中間件。在運行時,Spring Cloud Stream 可以自動探測並使用在 classpath 下找到的Binder。

這樣開發人員能夠輕鬆地在相同的代碼中使用不一樣類型的中間件:僅僅須要在構建時包含進不一樣的 Binder。在更加複雜的使用場景中,也能夠在應用中打包多個 Binder 並讓它本身選擇 Binder,甚至在運行時爲不一樣的通道使用不一樣的 Binder。

Binder 抽象使得 Spring Cloud Stream 應用能夠靈活的鏈接到中間件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的靈活配置配置能力,這樣的配置能夠經過外部配置的屬性和 Spring Boot 支持的任何形式來提供(包括應用啓動參數、環境變量和 application.yml 或者 application.properties 文件),部署人員能夠在運行時動態選擇通道鏈接 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。

Binder SPI 的方式來讓消息中間件產品使用可擴展的 API 來編寫相應的 Binder,並集成到 Spring Cloud Steam 環境,目前 RocketMQ 尚未提供相關的 Binder,咱們計劃在下一步將完善這一功能,也但願社區裏有這方面經驗的同窗積極嘗試,貢獻 PR 或建議。

spring-boot-starter的實現

在開始的時候咱們已經知道,spring boot starter 構造的啓動器對於使用者是很是方便的,使用者只要在 pom.xml引入starter 的依賴定義,相應的編譯,運行和部署功能就所有自動引入。所以經常使用的開源組件都會爲 Spring 的用戶提供一個 spring-boot-starter 封裝給開發者,讓開發者很是方便集成和使用,這裏咱們詳細的介紹一下 RocketMQ(客戶端)的 starter 實現過程。

1. spring-boot-starter 的實現步驟

對於一個 spring-boot-starter 實現須要包含以下幾個部分:

1)在 pom.xml 的定義

定義最終要生成的 starter 組件信息
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>

定義依賴包

它分爲兩個部分:Spring 自身的依賴包和 RocketMQ 的依賴包。

image.png

2)配置文件類

定義應用屬性配置文件類 RocketMQProperties,這個 Bean 定義一組默認的屬性值。用戶在使用最終的 starter 時,能夠根據這個類定義的屬性來修改取值,固然不是直接修改這個類的配置,而是 spring-boot 應用中對應的配置文件:src/main/resources/application.properties。

3)定義自動加載類

定義 src/resources/META-INF/spring.factories 文件中的自動加載類, 其目的是讓 spring boot 更具文中中所指定的自動化配置類來自動初始化相關的 Bean、Component 或 Service,它的內容以下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration
在 RocketMQAutoConfiguration 類的具體實現中,定義開放給用戶直接使用的 Bean 對象包括:

RocketMQProperties 加載應用屬性配置文件的處理類;
RocketMQTemplate 發送端用戶發送消息的發送模板類;
ListenerContainerConfiguration 容器 Bean 負責發現和註冊消費端消費實現接口類,這個類要求:由 @RocketMQMessageListener 註解標註;實現 RocketMQListener 泛化接口。

4)最後具體地進行 RpcketMQ 相關的封裝

在發送端(producer)和消費端(consumer)客戶端分別進行封裝,在當前的實現版本提供了對 Spring Messaging 接口的兼容方式。

2. 消息發送端實現

1)普通發送端

發送端的代碼封裝在 RocketMQTemplate POJO 中,下圖是發送端的相關代碼的調用關係圖:

image.png

爲了與 Spring Messaging 的發送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象類,來支持相關的消息轉換和發送方法,這些方法最終會代理給 doSend() 方法、doSend() 以及 RocoketMQ 所特有的一些方法如異步,單向和順序等方法直接添加到 RoketMQTempalte 中,這些方法直接代理調用到 RocketMQ 的 Producer API 來進行消息的發送。

2)事務消息發送端

對於事務消息的處理,在消息發送端進行了部分的擴展,參考上面的調用關係類圖。

RocketMQTemplate 里加入了一個發送事務消息的方法 sendMessageInTransaction(),而且最終這個方法會代理到 RocketMQ 的 TransactionProducer 進行調用,在這個 Producer 上會註冊其關聯的 TransactionListener 實現類,以便在發送消息後可以對 TransactionListener 裏的方法實現進行調用。

3. 消息消費端實現

image.png

在消費端 Spring-Boot 應用啓動後,會掃描全部包含 @RocketMQMessageListener 註解的類(這些類須要集成 RocketMQListener 接口,並實現 onMessage()方法),這個 Listener 會一對一的被放置到。

DefaultRocketMQListenerContainer 容器對象中,容器對象會根據消費的方式(併發或順序),將 RocketMQListener 封裝到具體的 RocketMQ 內部的併發或者順序接口實現。在容器中建立 RocketMQ Consumer 對象,啓動並監聽定製的 Topic 消息,若是有消費消息,則回調到 Listener 的 onMessage() 方法。

使用示例
上面的一章介紹了 RocketMQ 在 spring-boot-starter 方式的實現,這裏經過一個最簡單的消息發送和消費的例子來介紹如何使這個 rocketmq-spring-boot-starter。

1. RocketMQ 服務端的準備

1)啓動 NameServer 和 Broker

要驗證 RocketMQ 的 Spring-Boot 客戶端,首先要確保 RocketMQ 服務正確的下載並啓動。能夠參考 RocketMQ 主站的快速開始來進行操做。確保啓動 NameServer 和 Broker 已經正確啓動。

2)建立實例中所須要的 Topics

在執行啓動命令的目錄下執行下面的命令行操做:

bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic

2. 編譯 rocketmq-spring-boot-starter

目前的 spring-boot-starter 依賴尚未提交的 Maven 的中心庫,用戶使用前須要自行下載 git 源碼,而後執行 mvn clean install 安裝到本地倉庫。

git clone https://github.com/apache/roc...
cd rocketmq-spring-boot-starter
mvn clean install

3. 編寫客戶端代碼

用戶若是使用它,須要在消息的發佈和消費客戶端的 maven 配置文件 pom.xml 中添加以下的依賴:

image.png

屬性 spring-boot-starter-rocketmq-version 的取值爲:1.0.0-SNAPSHOT, 這與上一步驟中執行安裝到本地倉庫的版本一致。

1)消息發送端的代碼

發送端的配置文件 application.properties:

image.png

發送端的 Java 代碼:

image.png

2)消息消費端代碼

消費端的配置文件 application.properties:

image.png

消費端的 Java 代碼:

image.png

這裏只是簡單的介紹了使用 spring-boot 來編寫最基本的消息發送和接收的代碼,若是須要了解更多的調用方式,如: 異步發送,對象消息體,指定 tag 標籤以及指定事務消息,請參看 github 的說明文檔和詳細的代碼。咱們後續還會對這些高級功能進行陸續的介紹。

做者簡介
遼天,阿里巴巴技術專家,Apache RocketMQ 內核控,擁有多年分佈式系統研發經驗,對 Microservice、Messaging 和 Storage 等領域有深入理解, 目前專一 RocketMQ 內核優化以及 Messaging 生態建設。
原文連接本文爲阿里雲原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索