Spring Cloud Stream應用與自定義RocketMQ Binder:編程模型

前言: 本文做者張天,節選自筆者與其合著的《Spring Cloud微服務架構進階》,即將在八月出版問世。本文將其中Spring Cloud Stream應用與自定義Rocketmq Binder的內容抽取出來,主要介紹Spring Cloud Stream的相關概念,並概述相關的編程模型。前端

概述

Spring Cloud Stream 簡介

Spring Cloud Stream 是一個用來爲微服務應用構建消息驅動能力的框架。它能夠基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程序。他經過使用Spring Integration來鏈接消息代理中間件以實現消息事件驅動。Spring Cloud Stream 爲一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發佈-訂閱、消費組、分區的三個核心概念。Spring Cloud Stream目前僅支持RabbitMQ、Kafka。java

消息隊列簡介

消息隊列中間件是分佈式系統中最爲重要的組件之一,主要解決應用耦合,異步消息,流量削鋒等問題,是大型分佈式系統不可缺乏的中間件。消息隊列技術是分佈式應用間交換信息的一種技術,消息可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀走。經過消息隊列,應用程序能夠相對獨立地執行,它們不須要知道彼此的位置,只須要處理從消息隊列發送來的消息和向消息隊列發送消息。c++

消息隊列的主要特色是異步處理和解耦。其主要的使用場景就是將比較耗時並且不須要同步返回結果的操做做爲消息放入消息隊列。同時因爲使用了消息隊列,只要保證消息格式不變,消息的發送方和接受者並不須要彼此聯繫,也不須要受對方的影響,即解耦。web

消息隊列的使用場景有:編程

  • 跨系統的異步通訊,須要異步交互的場景均可以使用消息隊列。
  • 消息驅動的架構(EDA),系統分解爲消息隊列,消息隊列製造者和消息隊列消費者,一個是處理流程能夠根據需求拆分紅多個階段,每一個階段之間經過隊列鏈接起來。
  • 流量削鋒,它是消息隊列中的經常使用場景之一,通常在秒殺或團搶活動中使用普遍。秒殺活動,通常會由於流量過大,致使流量暴增,應用掛掉,爲解決這個問題,通常須要在應用前端加入消息隊列,來緩和流量的暴增。

在軟件的正常功能開發過程當中,開發人員並不須要去刻意的尋找消息隊列的使用場景,而是當出現性能瓶頸時,去查看業務邏輯是否存在能夠異步處理的耗時操做,若是存在的話即可以引入消息隊列來解決。不然盲目的使用消息隊列可能會增長維護和開發的成本卻沒法獲得可觀的性能提高,那就得不償失了。json

常見的消息隊列

目前業界有四款經常使用的消息隊列,它們分別是RabbitMQ、RocketMQ、ActiveMQ和Kafka。咱們這裏主要介紹前兩種。安全

RabbitMQ

RabbitMQ在2007年發佈,是一個在AMQP(高級消息隊列協議)基礎上完成的,可複用的企業消息系統,是當前最流行的消息中間件之一。 RabbitMQ的主要特性有:服務器

  • 可靠性: RabbitMQ提供了多種技術可讓你在性能和可靠性之間進行權衡。這些技術包括持久性機制、投遞確認、發佈者證明和高可用性機制;
  • 靈活的路由:消息在到達隊列前是經過交換機進行路由的。RabbitMQ爲典型的路由邏輯提供了多種內置交換機類型。若是你有更復雜的路由需求,能夠將這些交換機組合起來使用,你甚至能夠實現本身的交換機類型,而且當作RabbitMQ的插件來使用;
  • 消息集羣:在相同局域網中的多個RabbitMQ服務器能夠聚合在一塊兒,做爲一個獨立的邏輯代理來使用;
  • 隊列高可用:隊列能夠在集羣中的機器上進行鏡像,以確保在硬件問題下還保證消息安全;
  • 多種協議的支持:RabbitMQ支持多種消息隊列協議;
  • 多語言支持:RabbitMQ的服務器端用Erlang語言編寫,其客戶端支持基本全部編程語言;
  • 管理界面: RabbitMQ有一個易用的用戶界面,使得用戶能夠監控和管理消息Broker的許多方面;
  • 跟蹤機制:若是消息異常,RabbitMQ提供消息跟蹤機制,使用者能夠跟蹤發現異常;
  • 插件機制:提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件;

RabbitMQ的優勢有:微信

  • 因爲erlang語言的特性,mq 性能較好,高併發;
  • 健壯、穩定、易用、跨平臺、支持多種語言、文檔齊全;
  • 有消息確認機制和持久化機制,可靠性高;
  • 高度可定製的路由;
  • 管理界面較豐富,在互聯網公司也有較大規模的應用;
  • 社區活躍度高;

RabbitMQ的缺點有:架構

  • 儘管結合erlang語言自己的併發優點,性能較好,可是不利於作二次開發和維護;
  • 實現了代理架構,意味着消息在發送到客戶端以前能夠在中央節點上排隊。此特性使得RabbitMQ易於使用和部署,可是使得其運行速度較慢,由於中央節點增長了延遲,消息封裝後也比較大;
  • 須要學習比較複雜的接口和協議,學習和維護成本較高;
RocketMQ

RocketMQ出自阿里公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,並作出了本身的一些改進,消息可靠性上比 Kafka 更好。RocketMQ在阿里集團被普遍應用在訂單,交易,充值,流計算,消息推送,日誌流式處理,binglog分發等場景。

RocketMQ的主要特性有:

  • 是一個隊列模型的消息中間件,具備高性能、高可靠、高實時、分佈式特色;
  • Producer、Consumer、隊列均可以分佈式;
  • Producer向一些隊列輪流發送消息,隊列集合稱爲Topic,Consumer若是作廣播消費,則一個consumer實例消費這個Topic對應的全部隊列,若是作集羣消費,則多個Consumer實例平均消費這個topic對應的隊列集合;
  • 可以保證嚴格的消息順序;
  • 提供豐富的消息拉取模式;
  • 高效的訂閱者水平擴展能力;
  • 實時的消息訂閱機制;
  • 億級消息堆積能力;
  • 較少的依賴;

RocketMQ的優勢有:

  • 單機支持 1 萬以上持久化隊列;
  • RocketMQ 的全部消息都是持久化的,先寫入系統 PAGECACHE,而後刷盤,能夠保證內存與磁盤都有一份數據;
  • 模型簡單,接口易用(JMS 的接口不少場合並不太實用);
  • 性能很是好,能夠大量堆積消息在broker中;
  • 支持多種消費,包括集羣消費、廣播消費等。
  • 各個環節分佈式擴展設計,主從HA;

RocketMQ的缺點有:

  • 支持的客戶端語言很少,目前是java及c++,其中c++不成熟;
  • RocketMQ社區關注度及成熟度也不及前二者;
  • 沒有web管理界面,提供了一個CLI(命令行界面)管理工具帶來查詢、管理和診斷各類問題;
  • 沒有在消息隊列的核心部分實現JMS等接口;

原理簡介

如圖是Stream源碼的流程圖。Stream首先會動態註冊相關BeanDefinition,而且處理@StreamListener註解;而後在Bean實例初始化以後,會調用BindingService進行服務綁定;BindingService在綁定服務時會首先獲取特定的Binder綁定器,而後綁定Producer和Consumer;最後Stream的相關實例就會進行發送和接受消息的處理。

編程模型

Spring Cloud Stream提供了一系列的預先定義的註解來聲明輸入型和輸出型channel,業務系統基於這些channel與消息中間件進行通訊,而不是直接與消息中間件進行通訊。

聲明和綁定Channels

經過給業務應用的配置類添加@EnableBinding註解來將一個Spring應用轉變成Spring Cloud Stream應用。@EnableBinding註解自己擁有@Configuration元註解來進行相關配置而且會觸發Spring Cloud Stream框架的初始化機制。

@Configuration
@EnableIntegration
public @interface EnableBinding {
    ...
    Class<?>[] value() default {};
}
複製代碼

@EnableBinding註解可使用聲明輸入型和輸出行channel的接口類做爲其value屬性值。@EnableBinding註解只能使用在業務系統的Configuration類上,能夠提供儘量多的接口類做爲該註解的value屬性值,好比說@EnableBinding(value={Order.class, Payment.class}),Order和Payment都是聲明瞭channel的接口類。 在Spring Cloud Stream應用中,接口類能夠經過被@Input@Output註解修飾的函數來聲明的輸入型和輸出型channels。

public interface OnlineStore{
    @Input
    SubscribableChannel orders();  #聲明輸入型channel,表示接收訂單
    @Output
    MessageChannel stock();         #聲明輸出型channel,表示向供應商進貨
}
複製代碼

使用這個接口類看成@EnableBinding的value屬性值能夠觸發Stream框架的初始化機制,建立兩個channel,名字分別爲orders和stock,orders是輸入型channel,而stock是輸出型channel。

@EnableBinding(OnlineStore.class)
public class ShopConfiguration {
   ...
}
複製代碼

自定義信道

使用@Input@Output註解,編程人員能夠給每一個信道一個自定義的名稱,使用這個自定義信道,能夠與消息對立中相應的Channel進行交互。

public interface OnlineStore{
    @Input("inboundOrders")
    SubscribableChannel orders();
}
複製代碼

在上邊代碼示例中,自定義信道的名稱爲inboundOrders,Stream框架會建立出名爲inboundOrders的信道。

Spring Cloud Stream提供了預先設置的三種接口來定義輸入型channel和輸出型channel,它們是Source、Sink和Processor。Source用來聲明輸出型channel,它的信道名稱爲output。Sink用來聲明輸入型channel,它的信道名稱爲input。Processor則用來聲明輸出輸入型的channel。

# Source
public interface Source {
  String OUTPUT = "output";
  @Output(Source.OUTPUT)
  MessageChannel output();
}
# Sink
public interface Sink {
  String INPUT = "input";
  @Input(Sink.INPUT)
  SubscribableChannel input();
}
# Processor
public interface Processor extends Source, Sink {
}
複製代碼

產生和消費消息

使用Spring Integration註解或者Spring Cloud Stream的@StreamListener註解能夠進行消息的發送和消費。@StreamListener註解基於Spring Messaging註解(好比說@MessageMapping,@JmsListener,@RabbitListener),除此以外,該註解添加了內容(content)類型管理和類型強制等特性。

做爲Spring Integration的補充,Spring Cloud Stream提供了它本身的@StreamListener註解,該註解構建在Spring Messaging註解的基礎上,好比說@MessageMapping、@JmsListener和@RabbitListener@StreamListener註解提供了更加簡便處理輸入消息的模型。

Spring Cloud Stream提供了可擴展的消息轉換(MessageConverter)機制來處理數據轉換,並將轉換後的數據分配給對應的被@StreamListener修飾的方法。下面這個例子展現了一個處理外部訂單消息的應用。

@EnableBinding(Sink.class)
public class OrderHandler {
  @Autowired
  OrderService orderService;
  @StreamListener(Sink.INPUT)
  public void handle(Order order) {
    orderService.handle(order);
  }
}
複製代碼

假設,輸入的Message對象有一個string類型的Payload和一個值爲application/json的contentType。在使用@StreamListener時,MessageConverter會使用消息的contentType來解析String類型的Payload並賦值給Order對象。 就像其餘的Spring Messaging方法同樣,被@StreamListener註解的方法的參數可使用@Payload@Headers進行註解。對於返回數據的方法,必須使用@SendTo註解來指定該返回數據發送到哪一個輸出型channel。

@EnableBinding(Processor.class)
public class TransformProcessor {
  @Autowired
  VotingService votingService;
  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}
複製代碼

Spring Cloud Stream支持將消息分配到多個@StreamListener修飾的方法。爲了能使用該分配機制,一個方法必須首先知足下列條件:

  • 方法不能有返回值。
  • 方法必須是單獨一類消息的處理函數。

使用註解的condition屬性中的SpEL表達式能夠設置@StreamListener接收消息的條件判斷。全部匹配了該condition的方法都會在同一個線程中被調用,可是方法調用相對順序不能保證。

下面就是一個@StreamListener分配消息的例子。在這個例子中,全部頭部屬性type對應的值爲food的消息都會被分配給receiveFoodOrder方法,全部頭部屬性type對應的值爲compute的消息都會被分配給receiveComputeOrder方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='food'")
    public void receiveFoodOrder(@Payload FoodOrder foodOrder) {
       // handle the message
    }
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='compute'")
    public void receiveComputeOrder(@Payload ComputeOrder computeOrder) {
       // handle the message
    }
}
複製代碼

小結

本文主要介紹了Spring Cloud Stream中涉及到的相關概念,重點介紹了Spring Cloud Stream的編程模型,爲後面文章實戰應用和自定義奠基一些基礎。Spring Cloud Stream封裝了多種消息中間件的操做接口,目前只有kafka和rabbitmq,下一篇將會介紹如何自已實現一個Rocketmq的綁定器。

訂閱最新文章,歡迎關注個人公衆號

微信公衆號
相關文章
相關標籤/搜索