消息驅動式微服務:Spring Cloud Stream & RabbitMQ

1. 概述

在本文中,咱們將向您介紹Spring Cloud Stream,這是一個用於構建消息驅動的微服務應用程序的框架,這些應用程序由一個常見的消息傳遞代理(如RabbitMQApache Kafka等)鏈接。html

Spring Cloud Stream構建在現有Spring框架(如Spring MessagingSpring Integration)之上。儘管這些框架通過了實戰測試,工做得很是好,可是實現與使用的message broker緊密耦合。此外,有時對某些用例進行擴展是困難的。java

Spring Cloud Stream背後的想法是一個很是典型的Spring Boot概念——抽象地講,讓Spring根據配置和依賴關係管理在運行時找出實現自動注入。這意味着您能夠經過更改依賴項和配置文件來更改message broker。能夠在這裏找到目前已經支持的各類消息代理。git

本文將使用RabbitMQ做爲message broker。在此以前,讓咱們瞭解一下broker(代理)的一些基本概念,以及爲何要在面向微服務的體系架構中須要它。github

2. 微服務中的消息

在微服務體系架構中,咱們有許多相互通訊以完成請求的小型應用程序—它們的主要優勢之一是改進了的可伸縮性。一個請求從多個下游微服務傳遞到完成是很常見的。例如,假設咱們有一個Service-A內部調用Service-BService-C來完成一個請求:
[外鏈圖片轉存失敗(img-jzvHHRXw-1562549429195)(https://user-gold-cdn.xitu.io/2019/7/7/16bccd47c4051b28?w=511&h=347&f=png&s=11713)]算法

是的,還會有其餘組件,好比Spring Cloud EurekaSpring Cloud Zuul等等,但咱們仍是專一關心這類架構的特有問題。spring

假設因爲某種緣由Service-B須要更多的時間來響應。也許它正在執行I/O操做或長時間的DB事務,或者進一步調用其它致使Service-B變得更慢的服務,這些都使其沒法更具效率。apache

如今,咱們能夠啓動更多的Service-B實例來解決這個問題,這樣很好,可是Service-A其實是響應很快的,它須要等待Service-B的響應來進一步處理。這將致使Service-A沒法接收更多的請求,這意味着咱們還必須啓動Service-A的多個實例。架構

另外一種方法解決相似狀況的是使用事件驅動的微服務體系架構。這基本上意味着Service-A不直接經過HTTP調用Service-BService-C,而是將請求或事件發佈給message broker(消息代理)。Service-BService-C將成爲message broker(消息代理)上此事件的訂閱者。
image併發

與依賴HTTP調用的傳統微服務體系架構相比,這有許多優勢:app

  • 提升可伸縮性和可靠性——如今咱們知道哪些服務是整個應用程序中的真正瓶頸。
  • 鼓勵鬆散耦合——Service-A不須要了解Service-BService-C。它只須要鏈接到message broker併發布事件。事件如何進一步編排取決於代理設置。經過這種方式,Service-A能夠獨立地運行,這是微服務的核心概念之一。
  • 與遺留系統交互——一般咱們不能將全部東西都移動到一個新的技術堆棧中。咱們仍然必須使用遺留系統,雖然速度很慢,可是很可靠。

3. RabbitMQ

高級消息隊列協議(AMQP)RabbitMQ用於消息傳遞的協議。雖然RabbitMQ支持其餘一些協議,可是AMQP因爲兼容性和它提供的大量特性而更受歡迎。

3.1 RabbitMQ架構設計

image

所以發佈者將消息發佈到RabbitMQ中稱爲Exchange(交換器)。Exchange(交換器)接收消息並將其路由到一個或多個Queues(隊列)。路由算法依賴於Exchange(交換器)類型和routing(路由)key/header(與消息一塊兒傳遞)。將Exchange(交換器)鏈接到Queues(隊列)的這些規則稱爲bindings(綁定)。

綁定能夠有4種類型:

  • Direct: 它根據routing key(路由鍵)將Exchange(交換器)類型直接路由到特定的Queues(隊列)。
  • Fanout:它將消息路由到綁定Exchange(交換器)中的全部Queues(隊列)。
  • Topic:它根據徹底匹配或部分據routing key(路由鍵)匹配將消息路由到(0、1或更多)的Queues(隊列)。
  • Headers:它相似於Topic(主題)交換類型,可是它是基routing header(路由頭)而不是routing key(路由鍵)來路由的。

image

經過Exchange(交換器)和Queues(隊列)發佈和消費消息的整個過程是經過一個Channel(通道)完成的。

有關路由的詳細信息,請訪問此連接

3.2 RabbitMQ 設置

3.2.1 安裝

咱們能夠從這裏下載並安裝基於咱們的操做系統的二進制文件。

然而,在本文中,咱們將使用cloudamqp.com提供的免費雲安裝。只需註冊服務並登陸便可。

在主儀表板上單擊建立新實例:

image

而後給你的實例起個名字,而後進入下一步:

image

而後選擇一個可用區:

image

最後,查看實例信息,點擊右下角的建立實例:

image

就是這樣。如今在雲中運行了一個RabbitMQ實例。有關實例的更多信息,請轉到您的儀表板並單擊新建立的實例:

image

咱們能夠看到咱們能夠訪問RabbitMQ實例的主機,好比從咱們的項目鏈接所需的用戶名和密碼:

image

咱們將在Spring應用程序中使用AMQP URL鏈接到這個實例,因此請在某個地方記下它。

您還能夠經過單擊左上角的RabbitMQ manager來查看管理器控制檯。這將採用它來管理的您的RabbitMQ實例。

image

Project 配置

如今咱們的設置已經準備好了,讓咱們建立咱們的服務:

  • cloud-stream-producer-rabbitmq: 做爲一個發佈者,將消息推送到RabbitMQ
  • cloud-stream-consumer-rabbitmq: 消費者消費消息

使用Spring Initializr建立一個腳手架項目。這將是咱們的producer項目,咱們將使用REST端點發布消息。

選擇您喜歡的Spring Boot版本,添加WebCloud Stream依賴項,生成Maven項目:
image

注意:

請注意cloud-stream依賴項。這也須要像RabbitMQKafka等綁定器依賴項才能工做。

因爲咱們將使用RabbitMQ,添加如下Maven依賴項:

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> 

或者,咱們也能夠將二者結合起來使用spring-cloud-starter-stream-rabbit:

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> 

使用一樣的方法,建立消費者項目,但僅使用spring-cloud-starter-stream-rabbit依賴項。

4. 建立生產者

如前所述,將消息從發佈者傳遞到隊列的整個過程是經過通道完成的。所以,讓咱們建立一個HelloBinding接口,其中包含咱們的消息機制greetingChannel:

interface HelloBinding { @Output("greetingChannel") MessageChannel greeting(); } 

由於這將發佈消息,因此咱們使用@Output註解。方法名能夠是咱們想要的任意名稱,固然,咱們能夠在一個接口中有多個Channel(通道)。

如今,讓咱們建立一個REST,它將消息推送到這個Channel(通道)

@RestController public class ProducerController { private MessageChannel greet; public ProducerController(HelloBinding binding) { greet = binding.greeting(); } @GetMapping("/greet/{name}") public void publish(@PathVariable String name) { String greeting = "Hello, " + name + "!"; Message<String> msg = MessageBuilder.withPayload(greeting) .build(); this.greet.send(msg); } } 

上面,咱們建立了一個ProducerController類,它有一個MessageChannel類型的屬性 greet。這是經過咱們前面聲明的方法在構造函數中初始化的。

注意: 咱們能夠用簡潔的方式作一樣的事情,可是咱們使用不一樣的名稱來讓您更清楚地瞭解事物是如何鏈接的。

而後,咱們有一個簡單的REST接口,它接收PathVariablename,並使用MessageBuilder建立一個String類型的消息。最後,咱們使用MessageChannel上的.send()方法來發布消息。

如今,咱們將在的主類中添加@EnableBinding註解,傳入HelloBinding告訴Spring加載。

@EnableBinding(HelloBinding.class) @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } } 

image

最後,咱們必須告訴Spring如何鏈接到RabbitMQ(經過前面的AMQP URL),並將greetingChannel鏈接到一可用的消費者。

這兩個都是在application.properties中定義的:

spring.rabbitmq.addresses=<amqp url>

spring.cloud.stream.bindings.greetingChannel.destination = greetings

server.port=8080

5. 建立消費者

如今,咱們須要監聽以前建立的通道greetingChannel。讓咱們爲它建立一個綁定:

public interface HelloBinding { String GREETING = "greetingChannel"; @Input(GREETING) SubscribableChannel greeting(); } 

與生產者綁定的兩個很是明顯區別。由於咱們正在消費消息,因此咱們使用SubscribableChannel@Input註解鏈接到greetingChannel,消息數據將被推送這裏。

如今,讓咱們建立處理數據的方法:

@EnableBinding(HelloBinding.class) public class HelloListener { @StreamListener(target = HelloBinding.GREETING) public void processHelloChannelGreeting(String msg) { System.out.println(msg); } } 

在這裏,咱們建立了一個HelloListener類,在processHelloChannelGreeting方法上添加@StreamListener註解。這個方法須要一個字符串做爲參數,咱們剛剛在控制檯打印了這個參數。咱們還在類添加@EnableBinding啓用了HelloBinding

一樣,咱們在這裏使用@EnableBinding,而不是主類,以便告訴咱們如何使用。

看看咱們的主類,咱們沒有任何修改:

@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } } 

application.properties配置文件中,咱們須要定義與生產者同樣的屬性,除了修改端口以外

spring.rabbitmq.addresses=<amqp url>  
spring.cloud.stream.bindings.greetingChannel.destination=greetings  
server.port=9090

6. 所有測試

讓咱們同時啓動生產者和消費者服務。首先,讓咱們經過點擊端點http://localhost:8080/greet/john來生產消息。

在消費者日誌中看到消息內容:
image

咱們使用如下命令啓動另外一個消費者服務實例(在另外一個端口(9091)上):

$ mvn spring-boot:run -Dserver.port=9091

如今,當咱們點擊生產者REST端點生產消息時,咱們看到兩個消費者都收到了消息:

image

這多是咱們在一些用例中想要的。可是,若是咱們只想讓一個消費者消費一條消息呢?爲此,咱們須要在application.properties中建立一個消費者組。消費者的配置文件:

spring.cloud.stream.bindings.greetingChannel.group = greetings-group

如今,再次在不一樣的端口上運行消費者的2個實例,並經過生產者生產消息再次查看:

image

這一切也能夠在RabbitMQ管理器控制檯看到:

image

image

7. 結論

在本文中,咱們解釋了消息傳遞的主要概念、它在微服務中的角色以及如何使用Spring Cloud Stream實現它。咱們使用RabbitMQ做爲消息代理,可是咱們也可使用其餘流行的代理,比如Kafka,只需更改配置和依賴項。

與往常同樣,本文使用的示例代碼能夠在GitHub得到完整的源代碼

相關文章
相關標籤/搜索