Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.html
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.java
Spring Cloud Stream是一個用於構建與共享消息傳遞系統鏈接的高度可擴展的事件驅動型微服務的框架。git
應用程序經過inputs或outputs來與Spring Cloud Stream中binder對象交互,binder對象負責與消息中間件交互。也就是說:Spring Cloud Stream可以屏蔽底層消息中間件【RabbitMQ,kafka等】的差別,下降切換成本,統一消息的編程模型。所以,若是咱們想要使用消息驅動,咱們不須要了解各類消息中間件,咱們只須要了解Spring Cloud Stream就行了。web
SpringCloud Stream經過Spring Integration來鏈接消息代理中間件以實現消息事件驅動。spring
SpringCloud Stream還爲一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發佈-訂閱,消費組,分區三個核心概念。編程
目前支持的消息中間件官網能夠查詢:像RabbitMQ,kafka都是支持的,本篇文章基於RabbitMQ消息中間件。json
標準消息隊列的特色:api
如何統一底層差別?經過定義binder綁定器做爲中間層,完美地實現了應用程序與消息中間件細節之間的隔離。經過嚮應用程序暴露統一的Channel通道,使得應用程序再也不須要考慮各類不一樣的消息中間件的實現。app
Stream中的消息通訊方式遵循發佈-訂閱模式,使用Topic主題進行廣播。框架
組成 | 說明 |
---|---|
Middleware | 中間件,目前只支持RabbitMQ和Kafka |
Binder | Binder是應用與消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder, 經過Binder能夠很方便鏈接中間件,能夠動態地改變消息類型 |
@Input | 註解標識輸入通道,經過該輸出通道接收到地消息進入應用程序 |
@Output | 註解標識輸出通道,發佈的消息將經過該通道離開應用程序 |
@StreamListener | 監聽隊列,用於消費者的隊列的消息接收 |
@EnableBinding | 指信道channel和exchange綁定在一塊兒 |
cloud-stream-rabbitmq-provider8801
,做爲生產者進行發消息模塊。cloud-stream-rabbitmq-consumer8802
,做爲消息接收模塊。cloud-stream-rabbitmq-consumer8803
,做爲消息接收模塊。<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
其實沒有涉及到服務註冊發現,但爲了完整性,仍是將該服務註冊進服務註冊中心的配置加上。
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務信息; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關的環境配置 spring: rabbitmq: host: [hostname] port: 5672 username: guest password: guest bindings: # 服務的整合處理 output: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設置消息類型,本次爲json,文本則設置「text/plain」 binder: defaultRabbit # 設置要綁定的消息服務的具體設置 eureka: client: # 客戶端進行Eureka註冊的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒) lease-expiration-duration-in-seconds: 5 # 若是如今超過了5秒的間隔(默認是90秒) instance-id: send-8801.com # 在信息列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變爲IP地址
@SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
@EnableBinding(Source.class) //定義消息的推送管道 public class MessageProviderImpl implements IMessageProvider { /* * 消息發送管道 */ @Resource private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("==> serial + " + serial); return null; } }
@RestController public class SendMessageController { @Resource private IMessageProvider messageProvider; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.send(); } }
啓動7001註冊中心,啓動8801生產者模塊,接着登陸rabbitMQ的web圖形化界面,咱們將會看到一個新建的exchange:studyExchange,這是咱們在yml中配置的。
接着訪問接口,localhost:8801/sendMessage
,控制檯輸出serial不斷,rabbitMQ中也成功發送消息,測試成功。
一樣引入spring-cloud-starter-stream-rabbit
依賴。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務信息; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關的環境配置 spring: rabbitmq: host: [hostname] port: 5672 username: guest password: guest bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設置消息類型,本次爲對象json,若是是文本則設置「text/plain」 binder: defaultRabbit # 設置要綁定的消息服務的具體設置 eureka: client: # 客戶端進行Eureka註冊的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒) lease-expiration-duration-in-seconds: 5 # 若是如今超過了5秒的間隔(默認是90秒) instance-id: receive-8802.com # 在信息列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變爲IP地址
@SpringBootApplication public class StreamMQMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class, args); } }
@Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消費者1號,----->接受到的消息: " + message.getPayload() + "\t port: " + serverPort); } }
在剛剛兩個服務啓動以後,再啓動剛剛建立的8802模塊。
訪問:localhost:8801/sendMessage
,8802模塊控制檯成功打印消息。
按照上面的步驟運行下來,貌似沒什麼問題,其實並非?
爲了演示這個問題,咱們仿照8802模塊,克隆一份8803模塊,並依次啓動7001註冊中心,8801消息生產者,880二、8803消息消費者。
咱們只需訪問:localhost:8801/sendMessage
就可以顯示消息重複消費的問題,對此,咱們能夠經過Stream中的分組消費來解決,同一個組的消費者存在競爭關係,只能有一個能夠進行消費。
咱們經過rabbitMQ的管理頁面就能看到,這兩個消費者默認的組流水號是不一樣的,解決的辦法也很簡單,指定他們的流水號相同便可。
咱們在8802和8803中的yml中配置:spring.cloud.stream.bindings.input.group=A
便可實現輪詢消費。
消息持久化是很關鍵的步驟,若是不具有消息持久化的功能,假設某一消費者忽然宕機,生產者持續發送消息,消費者沒法消費,會致使消息丟失。
加上group屬性以後,就已經具有了消息持久化,演示也很簡單,關閉消費者服務,生產者不斷髮送信息,重啓消費者服務,發現啓動以後,將宕機時的錯過的消息消費。
本系列文章爲《尚硅谷SpringCloud教程》的學習筆記【版本稍微有些不一樣,後續遇到bug再作相關說明】,主要作一個長期的記錄,爲之後學習的同窗提供示例,代碼同步更新到Gitee:https://gitee.com/tqbx/spring-cloud-learning,而且以標籤的形式詳細區分每一個步驟,這個系列文章也會同步更新。