Spring Cloud Stream 的幾個概念java
Spring Cloud Stream is a framework for building message-driven microservice applications.spring
官方定義 Spring Cloud Stream 是一個構建消息驅動微服務的框架。express
須要JAVA Spring Cloud大型企業分佈式微服務雲構建的B2B2C電子商務平臺源碼 一零三八七七四六二六bash
應用程序經過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,經過咱們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與中間件交互。因此,咱們只須要搞清楚如何與 Spring Cloud Stream 交互就能夠方便使用消息驅動的方式 。app
Binder框架
Binder 是 Spring Cloud Stream 的一個抽象概念,是應用與消息中間件之間的粘合劑。目前 Spring Cloud Stream 實現了 Kafka 和 Rabbit MQ 的binder。 經過 binder ,能夠很方便的鏈接中間件,能夠動態的改變消息的 destinations(對應於 Kafka 的topic,Rabbit MQ 的 exchanges),這些均可以經過外部配置項來作到。 甚至能夠任意的改變中間件的類型而不須要修改一行代碼。分佈式
Publish-Subscribeide
消息的發佈(Publish)和訂閱(Subscribe)是事件驅動的經典模式。Spring Cloud Stream 的數據交互也是基於這個思想。生產者把消息經過某個 topic 廣播出去(Spring Cloud Stream 中的 destinations)。其餘的微服務,經過訂閱特定 topic 來獲取廣播出來的消息來觸發業務的進行。 這種模式,極大的下降了生產者與消費者之間的耦合。即便有新的應用的引入,也不須要破壞當前系統的總體結構。微服務
Consumer Groupsui
「Group」,若是使用過 Kafka 的童鞋並不會陌生。Spring Cloud Stream 的這個分組概念的意思基本和 Kafka 一致。 微服務中動態的縮放同一個應用的數量以此來達到更高的處理能力是很是必須的。對於這種狀況,同一個事件防止被重複消費,只要把這些應用放置於同一個 「group」 中,就可以保證消息只會被其中一個應用消費一次。 Durability 消息事件的持久化是必不可少的。Spring Cloud Stream 能夠動態的選擇一個消息隊列是持久化,仍是 present。 Bindings bindings 是咱們經過配置把應用和spring cloud stream 的 binder 綁定在一塊兒,以後咱們只須要修改 binding 的配置來達到動態修改topic、exchange、type等一系列信息而不須要修改一行代碼。 基於 RabbitMQ 使用
消息接收 Spring Cloud Stream 基本用法,須要定義一個接口,以下是內置的一個接口。
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
複製代碼
註釋__ @Input__ 對應的方法,須要返回 __ SubscribableChannel __ ,而且參入一個參數值。
這就接口聲明瞭一個__ binding __命名爲 「input」 。
其餘內容經過配置指定:
spring:
cloud:
stream:
bindings:
input:
destination: mqTestDefault
複製代碼
destination:指定了消息獲取的目的地,對應於MQ就是 exchange,這裏的exchange就是 mqTestDefault
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
// 監聽 binding 爲 Sink.INPUT 的消息
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("通常監聽收到:" + message.getPayload());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
複製代碼
定義一個 class (這裏直接在啓動類),而且添加註解@EnableBinding(Sink.class) ,其中 Sink 就是上述的接口。同時定義一個方法(此處是 input)標明註解爲 __ @StreamListener(Processor.INPUT) __,方法參數爲 Message 。
啓動後,默認是會建立一個臨時隊列,臨時隊列綁定的exchange爲 「mqTestDefault」,routing key爲 「#」。 全部發送 exchange 爲「mqTestDefault」 的MQ消息都會被投遞到這個臨時隊列,而且觸發上述的方法。 以上代碼就完成了最基本的消費者部分。
消息發送
消息的發送同消息的接受,都須要定義一個接口,不一樣的是接口方法的返回對象是 MessageChannel,下面是 Spring Cloud Stream 內置的接口:
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
複製代碼
這就接口聲明瞭一個 binding 命名爲 「output」 ,不一樣於上述的 「input」,這個binding 聲明瞭一個消息輸出流,也就是消息的生產者。
spring:
cloud:
stream:
bindings:
output:
destination: mqTestDefault
contentType: text/plain
複製代碼
destination:指定了消息發送的目的地,對應 RabbitMQ,會發送到 exchange 是 mqTestDefault 的全部消息隊列中。
代碼中調用:
@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {
@Autowired
@Qualifier("output")
MessageChannel output;
@Override
public void run(String... strings) throws Exception {
// 字符串類型發送MQ
System.out.println("字符串信息發送");
output.send(MessageBuilder.withPayload("你們好").build());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
複製代碼
經過注入MessageChannel的方式,發送消息。
以上代碼就完成了最基本的生產者部分。
自定義消息發送接收
自定義接口
Spring Cloud Stream 內置了兩種接口,分別定義了 binding 爲 「input」 的輸入流,和 「output」 的輸出流,而在咱們實際使用中,每每是須要定義各類輸入輸出流。使用方法也很簡單。
interface OrderProcessor {
String INPUT_ORDER = "inputOrder";
String OUTPUT_ORDER = "outputOrder";
@Input(INPUT_ORDER)
SubscribableChannel inputOrder();
@Output(OUTPUT_ORDER)
MessageChannel outputOrder();
}
複製代碼
一個接口中,能夠定義無數個輸入輸出流,能夠根據實際業務狀況劃分。上述的接口,定義了一個訂單輸入,和訂單輸出兩個 binding。
使用時,須要在 @EnableBinding 註解中,添加自定義的接口。 使用 @StreamListener 作監聽的時候,須要指定 OrderProcessor.INPUT_ORDER
spring:
cloud:
stream:
defaultBinder: defaultRabbit
bindings:
inputOrder:
destination: mqTestOrder
outputOrder:
destination: mqTestOrder
複製代碼
如上配置,指定了 destination 爲 mqTestOrder 的輸入輸出流。
分組與持久化
上述自定義的接口配置中,Spring Cloud Stream 會在 RabbitMQ 中建立一個臨時的隊列,程序關閉,對應的鏈接關閉的時候,該隊列也會消失。而在實際使用中,咱們須要一個持久化的隊列,而且指定一個分組,用於保證應用服務的縮放。
只須要在消費者端的 binding 添加配置項 spring.cloud.stream.bindings.[channelName].group = XXX 。對應的隊列就是持久化,而且名稱爲:mqTestOrder XXX。
rabbitMQ routing key 綁定
用慣了 rabbitMQ 的童鞋,在使用的時候,發現 Spring Cloud Stream 的消息投遞,默認是根據 destination + group 進行區分,全部的消息都投遞到 routing key 爲 「#‘’ 的消息隊列裏。 若是咱們須要進一步根據 routing key 來進行區分消息投遞的目的地,或者消息接受,須要進一步配,Spring Cloud Stream 也提供了相關配置:
spring:
cloud:
stream:
bindings:
inputProductAdd:
destination: mqTestProduct
group: addProductHandler # 擁有 group 默認會持久化隊列
outputProductAdd:
destination: mqTestProduct
rabbit:
bindings:
inputProductAdd:
consumer:
bindingRoutingKey: addProduct.* # 用來綁定消費者的 routing key
outputProductAdd:
producer:
routing-key-expression: '''addProduct.*''' # 須要用這個來指定 RoutingKey
複製代碼
spring.cloud.stream.rabbit.bindings.[channelName].consumer.bindingRoutingKey 指定了生成的消息隊列的routing key
spring.cloud.stream.rabbit.bindings.[channelName].producer.routing-key-expression 指定了生產者消息投遞的routing key
DLX 隊列
DLX 做用
DLX:Dead-Letter-Exchange(死信隊列)。利用DLX, 當消息在一個隊列中變成死信(dead message)以後,它能被從新publish到另外一個Exchange,這個Exchange就是DLX。消息變成死信一貫有一下幾種狀況: 消息被拒絕(basic.reject/ basic.nack)而且requeue=false
隊列達到最大長度
DLX也是一個正常的Exchange,和通常的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性,當這個隊列中有死信時,RabbitMQ就會自動的將這個消息從新發布到設置的Exchange上去,進而被路由到另外一個隊列,能夠監聽這個隊列中消息作相應的處理。
結論 Spring Cloud Stream 最大的方便之處,莫過於抽象了事件驅動的一些概念,對於消息中間件的進一步封裝,能夠作到代碼層面對中間件的無感知,甚至於動態的切換中間件,切換topic。使得微服務開發的高度解耦,服務能夠關注更多本身的業務流程。java B2B2C Springcloud多租戶電子商城系統