前言: 本文做者張天,節選自筆者與其合著的《Spring Cloud微服務架構進階》,即將在八月出版問世。本文將其中Spring Cloud Stream應用與自定義Rocketmq Binder的內容抽取出來,主要介紹Spring Cloud Stream的相關概念,並概述相關的編程模型。前端
Spring Cloud Stream 是一個用來爲微服務應用構建消息驅動能力的框架。它能夠基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程序。他經過使用Spring Integration來鏈接消息代理中間件以實現消息事件驅動。Spring Cloud Stream 爲一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發佈-訂閱、消費組、分區的三個核心概念。Spring Cloud Stream目前僅支持RabbitMQ、Kafka。java
消息隊列中間件是分佈式系統中最爲重要的組件之一,主要解決應用耦合,異步消息,流量削鋒等問題,是大型分佈式系統不可缺乏的中間件。消息隊列技術是分佈式應用間交換信息的一種技術,消息可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀走。經過消息隊列,應用程序能夠相對獨立地執行,它們不須要知道彼此的位置,只須要處理從消息隊列發送來的消息和向消息隊列發送消息。c++
消息隊列的主要特色是異步處理和解耦。其主要的使用場景就是將比較耗時並且不須要同步返回結果的操做做爲消息放入消息隊列。同時因爲使用了消息隊列,只要保證消息格式不變,消息的發送方和接受者並不須要彼此聯繫,也不須要受對方的影響,即解耦。web
消息隊列的使用場景有:編程
在軟件的正常功能開發過程當中,開發人員並不須要去刻意的尋找消息隊列的使用場景,而是當出現性能瓶頸時,去查看業務邏輯是否存在能夠異步處理的耗時操做,若是存在的話即可以引入消息隊列來解決。不然盲目的使用消息隊列可能會增長維護和開發的成本卻沒法獲得可觀的性能提高,那就得不償失了。json
目前業界有四款經常使用的消息隊列,它們分別是RabbitMQ、RocketMQ、ActiveMQ和Kafka。咱們這裏主要介紹前兩種。安全
RabbitMQ在2007年發佈,是一個在AMQP(高級消息隊列協議)基礎上完成的,可複用的企業消息系統,是當前最流行的消息中間件之一。 RabbitMQ的主要特性有:服務器
RabbitMQ的優勢有:微信
RabbitMQ的缺點有:架構
RocketMQ出自阿里公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,並作出了本身的一些改進,消息可靠性上比 Kafka 更好。RocketMQ在阿里集團被普遍應用在訂單,交易,充值,流計算,消息推送,日誌流式處理,binglog分發等場景。
RocketMQ的主要特性有:
RocketMQ的缺點有:
如圖是Stream源碼的流程圖。Stream首先會動態註冊相關BeanDefinition,而且處理@StreamListener註解;而後在Bean實例初始化以後,會調用BindingService進行服務綁定;BindingService在綁定服務時會首先獲取特定的Binder綁定器,而後綁定Producer和Consumer;最後Stream的相關實例就會進行發送和接受消息的處理。
Spring Cloud Stream提供了一系列的預先定義的註解來聲明輸入型和輸出型channel,業務系統基於這些channel與消息中間件進行通訊,而不是直接與消息中間件進行通訊。
經過給業務應用的配置類添加@EnableBinding
註解來將一個Spring應用轉變成Spring Cloud Stream應用。@EnableBinding
註解自己擁有@Configuration
元註解來進行相關配置而且會觸發Spring Cloud Stream框架的初始化機制。
@Configuration
@EnableIntegration
public @interface EnableBinding {
...
Class<?>[] value() default {};
}
複製代碼
@EnableBinding
註解可使用聲明輸入型和輸出行channel的接口類做爲其value屬性值。@EnableBinding
註解只能使用在業務系統的Configuration類上,能夠提供儘量多的接口類做爲該註解的value屬性值,好比說@EnableBinding(value={Order.class, Payment.class})
,Order和Payment都是聲明瞭channel的接口類。 在Spring Cloud Stream應用中,接口類能夠經過被@Input
和@Output
註解修飾的函數來聲明的輸入型和輸出型channels。
public interface OnlineStore{
@Input
SubscribableChannel orders(); #聲明輸入型channel,表示接收訂單
@Output
MessageChannel stock(); #聲明輸出型channel,表示向供應商進貨
}
複製代碼
使用這個接口類看成@EnableBinding
的value屬性值能夠觸發Stream框架的初始化機制,建立兩個channel,名字分別爲orders和stock,orders是輸入型channel,而stock是輸出型channel。
@EnableBinding(OnlineStore.class)
public class ShopConfiguration {
...
}
複製代碼
使用@Input
和@Output
註解,編程人員能夠給每一個信道一個自定義的名稱,使用這個自定義信道,能夠與消息對立中相應的Channel進行交互。
public interface OnlineStore{
@Input("inboundOrders")
SubscribableChannel orders();
}
複製代碼
在上邊代碼示例中,自定義信道的名稱爲inboundOrders,Stream框架會建立出名爲inboundOrders的信道。
Spring Cloud Stream提供了預先設置的三種接口來定義輸入型channel和輸出型channel,它們是Source、Sink和Processor。Source用來聲明輸出型channel,它的信道名稱爲output。Sink用來聲明輸入型channel,它的信道名稱爲input。Processor則用來聲明輸出輸入型的channel。
# Source
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
# Sink
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
# Processor
public interface Processor extends Source, Sink {
}
複製代碼
使用Spring Integration註解或者Spring Cloud Stream的@StreamListener註解能夠進行消息的發送和消費。@StreamListener
註解基於Spring Messaging註解(好比說@MessageMapping,@JmsListener,@RabbitListener),除此以外,該註解添加了內容(content)類型管理和類型強制等特性。
做爲Spring Integration的補充,Spring Cloud Stream提供了它本身的@StreamListener註解,該註解構建在Spring Messaging註解的基礎上,好比說@MessageMapping、@JmsListener和@RabbitListener
。@StreamListener
註解提供了更加簡便處理輸入消息的模型。
Spring Cloud Stream提供了可擴展的消息轉換(MessageConverter)機制來處理數據轉換,並將轉換後的數據分配給對應的被@StreamListener
修飾的方法。下面這個例子展現了一個處理外部訂單消息的應用。
@EnableBinding(Sink.class)
public class OrderHandler {
@Autowired
OrderService orderService;
@StreamListener(Sink.INPUT)
public void handle(Order order) {
orderService.handle(order);
}
}
複製代碼
假設,輸入的Message對象有一個string類型的Payload和一個值爲application/json的contentType。在使用@StreamListener
時,MessageConverter
會使用消息的contentType來解析String類型的Payload並賦值給Order對象。 就像其餘的Spring Messaging方法同樣,被@StreamListener
註解的方法的參數可使用@Payload
和@Headers
進行註解。對於返回數據的方法,必須使用@SendTo
註解來指定該返回數據發送到哪一個輸出型channel。
@EnableBinding(Processor.class)
public class TransformProcessor {
@Autowired
VotingService votingService;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public VoteResult handle(Vote vote) {
return votingService.record(vote);
}
}
複製代碼
Spring Cloud Stream支持將消息分配到多個@StreamListener
修飾的方法。爲了能使用該分配機制,一個方法必須首先知足下列條件:
使用註解的condition屬性中的SpEL表達式能夠設置@StreamListener
接收消息的條件判斷。全部匹配了該condition的方法都會在同一個線程中被調用,可是方法調用相對順序不能保證。
下面就是一個@StreamListener
分配消息的例子。在這個例子中,全部頭部屬性type對應的值爲food的消息都會被分配給receiveFoodOrder方法,全部頭部屬性type對應的值爲compute的消息都會被分配給receiveComputeOrder方法。
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='food'")
public void receiveFoodOrder(@Payload FoodOrder foodOrder) {
// handle the message
}
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='compute'")
public void receiveComputeOrder(@Payload ComputeOrder computeOrder) {
// handle the message
}
}
複製代碼
本文主要介紹了Spring Cloud Stream中涉及到的相關概念,重點介紹了Spring Cloud Stream的編程模型,爲後面文章實戰應用和自定義奠基一些基礎。Spring Cloud Stream封裝了多種消息中間件的操做接口,目前只有kafka和rabbitmq,下一篇將會介紹如何自已實現一個Rocketmq的綁定器。