以前在寫Spring Boot基礎教程的時候寫過一篇《Spring Boot中使用RabbitMQ》。在該文中,咱們經過簡單的配置和註解就能實現向RabbitMQ中生產和消費消息。實際上咱們使用的對RabbitMQ的starter就是經過Spring Cloud Stream中對RabbitMQ的支持來實現的。下面咱們就經過本文來了解一下Spring Cloud Stream。java
Spring Cloud Stream是一個用來爲微服務應用構建消息驅動能力的框架。它能夠基於Spring Boot來建立獨立的、可用於生產的Spring應用程序。它經過使用Spring Integration來鏈接消息代理中間件以實現消息事件驅動的微服務應用。Spring Cloud Stream爲一些供應商的消息中間件產品提供了個性化的自動化配置實現,而且引入了發佈-訂閱、消費組以及消息分區這三個核心概念。簡單的說,Spring Cloud Stream本質上就是整合了Spring Boot和Spring Integration,實現了一套輕量級的消息驅動的微服務框架。經過使用Spring Cloud Stream,能夠有效地簡化開發人員對消息中間件的使用複雜度,讓系統開發人員能夠有更多的精力關注於核心業務邏輯的處理。因爲Spring Cloud Stream基於Spring Boot實現,因此它秉承了Spring Boot的優勢,實現了自動化配置的功能幫忙咱們能夠快速的上手使用,可是目前爲止Spring Cloud Stream只支持下面兩個著名的消息中間件的自動化配置:git
RabbitMQ
Kafka
下面咱們經過構建一個簡單的示例來對Spring Cloud Stream有一個初步認識。該示例主要目標是構建一個基於Spring Boot的微服務應用,這個微服務應用將經過使用消息中間件RabbitMQ來接收消息並將消息打印到日誌中。因此,在進行下面步驟以前請先確認已經在本地安裝了RabbitMQ,具體安裝步驟請參考此文。github
stream-hello
pom.xml
中的依賴關係,引入Spring Cloud Stream對RabbitMQ的支持,具體以下:<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
SinkReceiver
,具體以下:@EnableBinding(Sink.class) public class SinkReceiver { private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class); @StreamListener(Sink.INPUT) public void receive(Object payload) { logger.info("Received: " + payload); } }
@SpringBootApplication public class SinkApplication { public static void main(String[] args) { SpringApplication.run(SinkApplication.class, args); } }
到這裏,咱們快速入門示例的編碼任務就已經完成了。下面咱們分別啓動RabbitMQ以及該Spring Boot應用,而後作下面的試驗,看看它們是如何運做的。spring
... INFO 16272 --- [main] o.s.c.s.b.r.RabbitMessageChannelBinder : declaring queue for inbound: input.anonymous.Y8VsFILmSC27eS5StsXp6A, bound to: input INFO 16272 --- [main] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@3c78e551 [delegate=amqp://guest@127.0.0.1:5672/] INFO 16272 --- [main] o.s.integration.channel.DirectChannel : Channel 'input.anonymous.Y8VsFILmSC27eS5StsXp6A.bridge' has 1 subscriber(s). INFO 16272 --- [main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.Y8VsFILmSC27eS5StsXp6A ...
從上面的日誌內容中,咱們能夠得到如下信息:bash
guest
用戶建立了一個指向127.0.0.1:5672
位置的RabbitMQ鏈接,在RabbitMQ的控制檯中咱們也能夠發現它。
input.anonymous.Y8VsFILmSC27eS5StsXp6A
的隊列,並經過RabbitMessageChannelBinder
將本身綁定爲它的消費者。這些信息咱們也能在RabbitMQ的控制檯中發現它們。
下面咱們能夠在RabbitMQ的控制檯中進入input.anonymous.Y8VsFILmSC27eS5StsXp6A
隊列的管理頁面,經過Publish Message
功能來發送一條消息到該隊列中。框架
此時,咱們能夠在當前啓動的Spring Boot應用程序的控制檯中看到下面的內容:spring-boot
INFO 16272 --- [C27eS5StsXp6A-1] com.didispace.HelloApplication : Received: [B@7cba610e
咱們能夠發如今應用控制檯中輸出的內容就是SinkReceiver
中receive
方法定義的,而輸出的具體內容則是來自消息隊列中獲取的對象。這裏因爲咱們沒有對消息進行序列化,因此輸出的只是該對象的引用,在後面的小節中咱們會詳細介紹接收消息後的處理。微服務
在順利完成上面快速入門的示例後,咱們簡單解釋一下上面的步驟是如何將咱們的Spring Boot應用鏈接上RabbitMQ來消費消息以實現消息驅動業務邏輯的。單元測試
首先,咱們對Spring Boot應用作的就是引入spring-cloud-starter-stream-rabbit
依賴,該依賴包是Spring Cloud Stream對RabbitMQ支持的封裝,其中包含了對RabbitMQ的自動化配置等內容。從下面它定義的依賴關係中,咱們還能夠知道它等價於spring-cloud-stream-binder-rabbit
依賴。測試
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> </dependencies>
接着,咱們再來看看這裏用到的幾個Spring Cloud Stream的核心註解,它們都被定義在SinkReceiver
中:
@EnableBinding
,該註解用來指定一個或多個定義了@Input
或@Output
註解的接口,以此實現對消息通道(Channel)的綁定。在上面的例子中,咱們經過@EnableBinding(Sink.class)
綁定了Sink
接口,該接口是Spring Cloud Stream中默認實現的對輸入消息通道綁定的定義,它的源碼以下:public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }
它經過@Input
註解綁定了一個名爲input
的通道。除了Sink
以外,Spring Cloud Stream還默認實現了綁定output
通道的Source
接口,還有結合了Sink
和Source
的Processor
接口,實際使用時咱們也能夠本身經過@Input
和@Output
註解來定義綁定消息通道的接口。當咱們須要爲@EnableBinding
指定多個接口來綁定消息通道的時候,能夠這樣定義:@EnableBinding(value = {Sink.class, Source.class})
。
@StreamListener
:該註解主要定義在方法上,做用是將被修飾的方法註冊爲消息中間件上數據流的事件監聽器,註解中的屬性值對應了監聽的消息通道名。在上面的例子中,咱們經過@StreamListener(Sink.INPUT)
註解將receive
方法註冊爲對input
消息通道的監聽處理器,因此當咱們在RabbitMQ的控制頁面中發佈消息的時候,receive
方法會作出對應的響應動做。上面咱們經過RabbitMQ的控制檯完成了發送消息來驗證了消息消費程序的功能,雖然這種方法比較low,可是經過上面的步驟,相信你們對RabbitMQ和Spring Cloud Stream的消息消費已經有了一些基礎的認識。下面咱們經過編寫生產消息的單元測試用例來完善咱們的入門內容。
@RunWith(SpringRunner.class) @EnableBinding(value = {SinkApplicationTests.SinkSender.class}) public class SinkApplicationTests { @Autowired private SinkSender sinkSender; @Test public void sinkSenderTester() { sinkSender.output().send(MessageBuilder.withPayload("produce a message :http://blog.didispace.com").build()); } public interface SinkSender { String OUTPUT = "input"; @Output(SinkSender.OUTPUT) MessageChannel output(); } }
INFO 50947 --- [L2W-c2AcChb2Q-1] com.didispace.stream.SinkReceiver : Received: produce a message :http://blog.didispace.com
在上面的單元測試中,咱們經過@Output(SinkSender.OUTPUT)
定義了一個輸出經過,而該輸出通道的名稱爲input
,與前文中的Sink中定義的消費通道同名,因此這裏的單元測試與前文的消費者程序組成了一對生產者與消費者。到這裏,本文的內容就次結束,若是您可以獨立的完成上面的例子,那麼對於Spring Cloud Stream的基礎使用算是入門了。可是,Spring Cloud Stream的使用遠不止於此,在近期的博文中,我講繼續更新這部份內容,幫助他們來理解和用好Spring Cloud Stream來構建消息驅動的微服務!
本文完整實例:
若是您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!
本文內容部分節選自個人《Spring Cloud微服務實戰》,但對依賴的Spring Boot和Spring Cloud版本作了升級。
本文首發於個人博客: http://blog.didispace.com