Spring Cloud Stream 是消息中間件組件,它集成了 kafka 和 rabbitmq 。本篇文章以 Rabbit MQ 爲消息中間件系統爲基礎,介紹 Spring Cloud Stream 的使用。若是你沒有用過消息中間件,能夠到 RabbitMQ 的官網看一下,或者參考這個 http://rabbitmq.mr-ping.com/。理解了消息中間件的設計,才能更好的使用它。java
一、異步處理web
好比用戶在電商網站下單,下單完成後會給用戶推送短信或郵件,發短信和郵件的過程就能夠異步完成。由於下單付款是核心業務,發郵件和短信並不屬於核心功能,而且可能耗時較長,因此針對這種業務場景能夠選擇先放到消息隊列中,有其餘服務來異步處理。spring
二、應用解耦:shell
假設公司有幾個不一樣的系統,各系統在某些業務有聯動關係,好比 A 系統完成了某些操做,須要觸發 B 系統及 C 系統。若是 A 系統完成操做,主動調用 B 系統的接口或 C 系統的接口,能夠完成功能,可是各個系統之間就產生了耦合。用消息中間件就能夠完成解耦,當 A 系統完成操做將數據放進消息隊列,B 和 C 系統去訂閱消息就能夠了。這樣各系統只要約定好消息的格式就行了。json
三、流量削峯安全
好比秒殺活動,一會兒進來好多請求,有的服務可能承受不住瞬時高併發而崩潰,因此針對這種瞬時高併發的場景,在中間加一層消息隊列,把請求先入隊列,而後再把隊列中的請求平滑的推送給服務,或者讓服務去隊列拉取。數據結構
四、日誌處理併發
kafka 最開始就是專門爲了處理日誌產生的。app
當碰到上面的幾種狀況的時候,就要考慮用消息隊列了。若是你碰巧使用的是 RabbitMQ 或者 kafka ,並且一樣也是在使用 Spring Cloud ,那能夠考慮下用 Spring Cloud Stream。異步
介紹下面的例子以前,假定你已經對 RabbitMQ 有必定的瞭解。
首先來認識一下 Spring Cloud Stream 中的幾個重要概念。
Destination Binders:目標綁定器,目標指的是 kafka 仍是 RabbitMQ,綁定器就是封裝了目標中間件的包。若是操做的是 kafka 就使用 kafka binder ,若是操做的是 RabbitMQ 就使用 rabbitmq binder。
Destination Bindings:外部消息傳遞系統和應用程序之間的橋樑,提供消息的「生產者」和「消費者」(由目標綁定器建立)
Message:一種規範化的數據結構,生產者和消費者基於這個數據結構經過外部消息系統與目標綁定器和其餘應用程序通訊。
可能看完了上面的三個概念仍然是一頭霧水,沒有關係,實踐過程當中天然就明白了。
由於用到的是 rabbitmq,因此在本地搭好 rabbitmq 環境,而後裝好 rabbitmq-management 插件,這樣就能夠訪問 web UI 界面了,默認是 15672 端口。
一、引用對應 rabbitmq 的 stream 包
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
二、在 application.yml 中增長配置
spring: profiles: stream-rabbit-customer-group1 cloud: stream: bindings: input: destination: default.messages binder: local_rabbit output: destination: default.messages binder: local_rabbit binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 32775 username: guest password: guest server: port: 8201
理解配置文件很重要,基本上理解清楚了配置,也就明白 spring cloud stream 是怎麼回事了。
spring.cloud.stream.binders
,上面提到了 stream 的 3 個重要概念的第一個 「Destination binders」。上面的配置文件中就配置了一個 binder,命名爲 local_rabbit,指定 type 爲 rabbit ,表示使用的是 rabbitmq 消息中間件,若是用的是 kafka ,則 type 設置爲 kafka。environment 就是設置使用的消息中間件的配置信息,包括 host、port、用戶名、密碼等。能夠設置多了個 binder,適配不一樣的場景。
spring.cloud.stream.bindings
,對應上面提到到 「Destination Bindings」。這裏面能夠配置多個 input 或者 output,分別表示消息的接收通道和發送通道,對應到 rabbitmq 上就是不一樣的 exchange。這個配置文件裏定義了兩個input 、兩個output,名稱分別爲 input、log_input、output、log_output。這個名稱不是亂起的,在咱們的程序代碼中會用到,用來標示某個方法接收哪一個 exchange 或者發送到哪一個 exchange 。
每一個通道下的 destination 屬性指 exchange 的名稱,binder 指定在 binders 裏設置的 binder,上面配置中指定了 local_rabbit 。
能夠看到 input、output 對應的 destination 是相同的,log_input、log_output 對應的 destination 也相同, 也就是對應相同的 exchange。一個表示消息來源,一個表示消息去向。
另外還能夠設置 group 。由於服務極可能不止一個實例,若是啓動多個實例,那麼不必每一個實例都消費同一個消息,只要把功能相同的實例的 group 設置爲同一個,那麼就會只有一個實例來消費消息,避免重複消費的狀況。若是設置了 group,那麼 group 名稱就會成爲 queue 的名稱,若是沒有設置 group ,那麼 queue 就會根據 destination + 隨機字符串的方式命名。
三、接下來作一個最簡單的例子,來演示如何接收消息。
首先來介紹一下 stream 內置的簡單消息通道(消息通道也就是指消息的來源和去向)接口定義,一個 Source 和 一個 Sink 。
Source.java
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); }
消息發送通道定義,定義了一個 MessageChannel 類型的 output() 方法,用 @Output
註解標示,並指定了 binding 的名稱爲 output。
Sink.java
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input(); }
消息接收通道定義,定義了一個 SubscribableChannel 類型的 input() 方法,表示訂閱一個消息的方法,並用 @Input
註解標識,而且指定了 binging 的名稱爲 input 。
建立一個簡單的消息接收方法:
@SpringBootApplication @EnableBinding(value = {Processor.class}) @Slf4j public class DefaultApplication { public static void main(String[] args) { SpringApplication.run(DefaultApplication.class, args); } }
在項目啓動類上加上註解 @EnableBinding(value = {Processor.class})
,代表啓用 stream ,並指定定義的 Channel 定義接口類。
而後,建立一個 service 服務類,用來訂閱消息,並對消息進行處理。
@Slf4j @Component public class DefaultMessageListener { @StreamListener(Processor.INPUT) public void processMyMessage(String message) { log.info("接收到消息:" + message); } }
在方法 processMyMessage()
上使用 @StreamListener
註解,表示對消息進行訂閱監控,指定 binding 的名稱,其中 Processor.INPUT 就是 Sink 的 input ,也就是字符串 input
,對應的上面的配置文件,就是 spring.cloud.stream.bindings.input。
啓動 DefaultApplication ,能夠在 rabbitmq 管理控制檯的 exchanges 中看到增長的這幾個 bindings 。
能夠看到 exchange 的名稱對應的就是 bindings 的兩個 input 和 兩個 output 的 destination 的值。
用 rabbitmq UI 控制檯發送消息測試
點擊上圖的 default.input.messages 進入 exchange 詳請頁面,在 publish message 部分填寫上 Payload ,而後點擊 Publish message 按鈕。
以後回到 DefaultApplication 的輸出控制檯,會看到消息已經被接收。
接下來模擬生產者和消費者處理消息的過程,模擬一個日誌處理的過程。
一、自定義消息通道接口,上面介紹了 stream 自帶的 Sink 和 Source,也僅僅能作個演示,真正的業務中仍是須要本身定義更加靈活的接口。
@Component public interface MyProcessor { String MESSAGE_INPUT = "log_input"; String MESSAGE_OUTPUT = "log_output"; String LOG_FORMAT_INPUT = "log_format_input"; String LOG_FORMAT_OUTPUT = "log_format_output"; @Input(MESSAGE_INPUT) SubscribableChannel logInput(); @Output(MESSAGE_OUTPUT) MessageChannel logOutput(); @Input(LOG_FORMAT_INPUT) SubscribableChannel logFormatInput(); @Output(LOG_FORMAT_OUTPUT) MessageChannel logFormatOutput(); }
二、建立消費者應用
配置文件以下 :
spring: profiles: stream-rabbit-customer-group1 cloud: stream: bindings: log_input: destination: kite.log.messages binder: local_rabbit group: logConsumer-group1 log_output: destination: kite.log.messages binder: local_rabbit group: logConsumer-group1 log_format_input: destination: kite.log.format.messages binder: local_rabbit group: logFormat-group1 log_format_input: destination: kite.log.format.messages binder: local_rabbit group: logFormat-group1 binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 32775 username: guest password: guest server: port: 8201
此配置文件要參照 MyProcessor 接口查看,定義了 4 個 binding,可是 destination 兩兩相同,也就是兩個 exchange。
建立 spring boot 啓動類
@SpringBootApplication @EnableBinding(value = {MyProcessor.class}) @Slf4j public class CustomerApplication { public static void main(String[] args) { SpringApplication.run(CustomerApplication.class, args); } }
用 @EnableBinding(value = {MyProcessor.class}) 註解引入 MyProcessor
建立消息接收處理服務
@Slf4j @Component public class LogMessageListener { /** * 經過 MyProcessor.MESSAGE_INPUT 接收消息 * 而後經過 SendTo 將處理後的消息發送到 MyProcessor.LOG_FORMAT_OUTPUT * @param message * @return */ @StreamListener(MyProcessor.MESSAGE_INPUT) @SendTo(MyProcessor.LOG_FORMAT_OUTPUT) public String processLogMessage(String message) { log.info("接收到原始消息:" + message); return "「" + message +"」"; } /** * 接收來自 MyProcessor.LOG_FORMAT_INPUT 的消息 * 也就是加工後的消息,也就是經過上面的 SendTo 發送來的 * 由於 MyProcessor.LOG_FORMAT_OUTPUT 和 MyProcessor.LOG_FORMAT_INPUT 是指向同一 exchange * @param message */ @StreamListener(MyProcessor.LOG_FORMAT_INPUT) public void processFormatLogMessage(String message) { log.info("接收到格式化後的消息:" + message); } }
三、建立一個消息生產者,用於發送原始日誌消息
配置文件:
spring: cloud: stream: bindings: log_output: destination: kite.log.messages binder: local_rabbit group: logConsumer-group1 binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 32775 username: guest password: guest server: port: 8202
僅僅指定了一個 binding log_output,用來發送消息,若是隻作生產者就不要指定 log_input,若是指定了 log_input ,應用就會認爲這個生產者服務也會消費消息,若是這時沒有在此服務中訂閱消息,當消息被髮送到這個服務時,由於並無訂閱消息,也就是沒有 @StreamListener 註解的方法,就會出現以下異常:
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel
建立 spring boot 啓動類
@Slf4j @RestController @EnableBinding(value = {MyProcessor.class}) public class MyMessageController { @Autowired private MyProcessor myProcessor; @GetMapping(value = "sendLogMessage") public void sendLogMessage(String message){ Message<String> stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(message).build(); myProcessor.logOutput().send(stringMessage); } }
一樣的引入 @EnableBinding(value = {MyProcessor.class})
建立一個 Controller 用來發送消息
@Slf4j @RestController @EnableBinding(value = {MyProcessor.class}) public class MyMessageController { @Autowired private MyProcessor myProcessor; @GetMapping(value = "sendLogMessage") public void sendLogMessage(String message){ Message<String> stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(message).build(); myProcessor.logOutput().send(stringMessage); } }
以後,訪問連接:
http://localhost:8202/sendLogMessage?message=原始日誌
能夠在消費服務端看到以下輸出:
消息除了能夠是字符串類型,還能夠是其餘類型,也能夠是實體類型,例如
@GetMapping(value = "sendObjectLogMessage") public void sendObjectLogMessage() { LogInfo logInfo = new LogInfo(); logInfo.setClientIp("192.168.1.111"); logInfo.setClientVersion("1.0"); logInfo.setUserId("198663383837434"); logInfo.setTime(Date.from(Instant.now())); Message < LogInfo > stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(logInfo).build(); myProcessor.logOutput().send(stringMessage); }
上面代碼發送了一個 LogInfo 實體對象,在消費者端依然能夠用字符串類型接收,由於 @StreamListener 註解會默認把實體轉爲 json 字符串。
另外,能夠試着啓動兩個消費者端,把 group 設置成相同的,這時,發送的消息只會被一個消費者接收。
若是把 group 設置成不同的,那麼發送的消息會被兩個消費者接收。
還能夠看其餘 Spring Cloud 系列:
Spring Cloud Eureka 實現高可用服務發現註冊中心
Spring Cloud Config 配置中心 看這一篇就夠了
服務註冊發現、配置中心集一體的 Spring Cloud Consul
不要吝惜你的「推薦」呦
歡迎關注,不按期更新本系列和其餘文章
古時的風箏
,進入公衆號能夠加入交流羣