Spring Cloud Stream

Spring Cloud Stream 是消息中間件組件,它集成了 kafka 和 rabbitmq 。Spring Cloud Stream是一個用於構建消息驅動的微服務應用程序的框架,是一個基於Spring Boot 建立的獨立生產級的,使用Spring Integration提供鏈接到消息代理的Spring應用。html

Spring Cloud Stream與各模塊之間的關係是:java

SCS 在 Spring Integration 的基礎上進行了封裝,提出了 Binder, Binding, @EnableBinding, @StreamListener 等概念;
SCS 與 Spring Boot Actuator 整合,提供了 /bindings, /channels endpoint;
SCS 與 Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties 等外部化配置類;
SCS 加強了消息發送失敗的和消費失敗狀況下的處理邏輯等功能。
SCS 是 Spring Integration 的增強,同時與 Spring Boot 體系進行了融合,也是 Spring Cloud Bus 的基礎。它屏蔽了底層消息中間件的實現細節,但願以統一的一套 API 來進行消息的發送/消費,底層消息中間件的實現細節由各消息中間件的 Binder 完成。git

Binder 是提供與外部消息中間件集成的組件,爲構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用於構造生產者和消費者。目前官方的實現有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 內部已經實現了 RocketMQ Binder。github

 

瞭解SpringCloud流的時候,咱們會發現,SpringCloud還有個Data Flow(數據流)的項目,下面是它們的區別:spring

  1. Spring Cloud Stream:數據流操做開發包,封裝了與Redis,Rabbit、Kafka等發送接收消息。是一套用於建立消息驅動(message-driven)微服務的框架。經過向主程序添加@EnableBinding,能夠當即鏈接到消息代理,經過向方法添加@StreamListener,您將收到流處理事件。數據庫

  2. Spring Cloud Data Flow:大數據操做工具,做爲Spring XD的替代產品,它是一個混合計算模型,結合了流數據與批量數據的處理方式。是構建數據集成和實時數據處理流水線的工具包。編程

  3. Spring Cloud Data Flow的其中一個章節是包含了Spring Cloud Stream,因此應該說Spring Cloud Data Flow的範圍更廣,是相似於一種解決方案的集合,而Spring Cloud Stream只是一套消息驅動的框架。springboot

  4. Spring Cloud Stream是在Spring Integration的基礎上發展起來的。它爲開發人員提供了一致的開發經驗,以構建能夠包含企業集成模式以與外部系統(例如數據庫,消息代理等)鏈接的應用程序。服務器

 

如圖所示,Spring Cloud Stream由一箇中間件中立的核組成。應用經過Spring Cloud Stream插入的input(至關於消費者consumer,它是從隊列中接收消息的)和output(至關於生產者producer,它是從隊列中發送消息的。)通道與外界交流。網絡

結論:

一、Spring Cloud Stream以消息做爲流的基本單位,因此它已經不是狹義上的IO流,而是廣義上的數據流動,從生產者到消費者的數據流動。

二、Spring Cloud Stream 最大的方便之處,莫過於抽象了事件驅動的一些概念,對於消息中間件的進一步封裝,能夠作到代碼層面對中間件的無感知,甚至於動態的切換中間件,切換topic。使得微服務開發的高度解耦,服務能夠關注更多本身的業務流程。

 

消息中間件幾大應用場景

一、異步處理

好比用戶在電商網站下單,下單完成後會給用戶推送短信或郵件,發短信和郵件的過程就能夠異步完成。由於下單付款是核心業務,發郵件和短信並不屬於核心功能,而且可能耗時較長,因此針對這種業務場景能夠選擇先放到消息隊列中,有其餘服務來異步處理。

二、應用解耦:

假設公司有幾個不一樣的系統,各系統在某些業務有聯動關係,好比 A 系統完成了某些操做,須要觸發 B 系統及 C 系統。若是 A 系統完成操做,主動調用 B 系統的接口或 C 系統的接口,能夠完成功能,可是各個系統之間就產生了耦合。用消息中間件就能夠完成解耦,當 A 系統完成操做將數據放進消息隊列,B 和 C 系統去訂閱消息就能夠了。這樣各系統只要約定好消息的格式就行了。

三、流量削峯

好比秒殺活動,一會兒進來好多請求,有的服務可能承受不住瞬時高併發而崩潰,因此針對這種瞬時高併發的場景,在中間加一層消息隊列,把請求先入隊列,而後再把隊列中的請求平滑的推送給服務,或者讓服務去隊列拉取。

四、日誌處理

kafka 最開始就是專門爲了處理日誌產生的。

當碰到上面的幾種狀況的時候,就要考慮用消息隊列了。若是你碰巧使用的是 RabbitMQ 或者 kafka ,並且一樣也是在使用 Spring Cloud ,那能夠考慮下用 Spring Cloud Stream。

使用 Spring Cloud Stream && RabbitMQ

介紹下面的例子以前,假定你已經對 RabbitMQ 有必定的瞭解。

Destination Binders:目標綁定器,目標指的是 kafka 仍是 RabbitMQ,綁定器就是封裝了目標中間件的包。若是操做的是 kafka 就使用 kafka binder ,若是操做的是 RabbitMQ 就使用 rabbitmq binder。

Destination Bindings:外部消息傳遞系統和應用程序之間的橋樑,提供消息的「生產者」和「消費者」(由目標綁定器建立)

Message:一種規範化的數據結構,生產者和消費者基於這個數據結構經過外部消息系統與目標綁定器和其餘應用程序通訊。

主要概念(Main Concepts)

首先來認識一下 Spring Cloud Stream 中的幾個重要概念:

應用模型:應用程序經過 inputs 或者 outputs 來與 Spring Cloud Stream 中Binder 交互,經過咱們配置來綁定,而 Spring Cloud Stream 的 Binder 負責與中間件交互。因此,咱們只須要搞清楚如何與 Spring Cloud Stream 交互就能夠方便使用消息驅動的方式。

 

抽象綁定器(The Binder Abstraction)

Spring Cloud Stream實現Kafkat和RabbitMQ的Binder實現,也包括了一個TestSupportBinder,用於測試。你也能夠寫根據API去寫本身的Binder.

 

Spring Cloud Stream 一樣使用了Spring boot的自動配置,而且抽象的Binder使Spring Cloud Stream的應用得到更好的靈活性,好比:咱們能夠在application.yml或application.properties中指定參數進行配置使用Kafka或者RabbitMQ,而無需修改咱們的代碼。

 

在前面咱們測試的項目中並無修改application.properties,自動配置得益於Spring Boot

​ 經過 Binder ,能夠方便地鏈接中間件,能夠經過修改application.yml中的spring.cloud.stream.bindings.input.destination 來進行改變消息中間件(對應於Kafka的topic,RabbitMQ的exchanges)

​ 在這二者間的切換甚至不須要修改一行代碼。

  1. 發佈-訂閱(Persistent Publish-Subscribe Support)

    以下圖是經典的Spring Cloud Stream的 發佈-訂閱 模型,生產者 生產消息發佈在shared topic(共享主題)上,而後 消費者 經過訂閱這個topic來獲取消息

其中topic對應於Spring Cloud Stream中的destinations(Kafka 的topic,RabbitMQ的 exchanges)

官方文檔這塊原理說的有點深,就沒寫,詳見官方文檔

消費組(Consumer Groups)

儘管發佈-訂閱 模型經過共享的topic鏈接應用變得很容易,可是經過建立特定應用的多個實例的來擴展服務的能力一樣重要,可是若是這些實例都去消費這條數據,那麼極可能會出現重複消費的問題,咱們只須要同一應用中只有一個實例消費該消息,這時咱們能夠經過消費組來解決這種應用場景, 當一個應用程序不一樣實例放置在一個具備競爭關係的消費組中,組裏面的實例中只有一個可以消費消息

設置消費組的配置爲spring.cloud.stream.bindings.<channelName>.group

下面舉一個DD博客中的例子:

下圖中,經過網絡傳遞過來的消息經過主題,按照分組名進行傳遞到消費者組中

此時能夠經過spring.cloud.stream.bindings.input.group=Group-Aspring.cloud.stream.bindings.input.group=Group-B進行指定消費組

全部訂閱指定主題的組都會收到發佈消息的一個備份,每一個組中只有一個成員會收到該消息;若是沒有指定組,那麼默認會爲該應用分配一個匿名消費者組,與全部其它組處於 訂閱-發佈 關係中。ps:也就是說若是管道沒有指定消費組,那麼這個匿名消費組會與其它組一塊兒消費消息,出現了重複消費的問題。

  1. 消費者類型(Consumer Types)

    1)支持有兩種消費者類型:

    • Message-driven (消息驅動型,有時簡稱爲異步)
    • Polled (輪詢型,有時簡稱爲 同步)

    在Spring Cloud 2.0版本前只支持 Message-driven這種異步類型的消費者,消息一旦可用就會傳遞,而且有一個線程能夠處理它;當你想控制消息的處理速度時,可能須要用到同步消費者類型。

    2)持久化

    通常來講全部擁有訂閱主題的消費組都是持久化的,除了匿名消費組。 Binder的實現確保了全部訂閱關係的消費訂閱是持久的,一個消費組中至少有一個訂閱了主題,那麼被訂閱主題的消息就會進入這個組中,不管組內是否中止。

    注意: 匿名訂閱自己是非持久化的,可是有一些Binder的實現(好比RabbitMQ)則能夠建立非持久化的組訂閱

    一般狀況下,當有一個應用綁定到目的地的時候,最好指定消費消費組。擴展Spring Cloud Stream應用程序時,必須爲每一個輸入綁定指定一個使用者組。這樣作能夠防止應用程序的實例接收重複的消息(除非須要這種行爲,這是不尋常的)。

  2. 分區支持(Partitioning Support)

    在消費組中咱們能夠保證消息不會被重複消費,可是在同組下有多個實例的時候,咱們沒法肯定每次處理消息的是否是被同一消費者消費,分區的做用就是爲了確保具備共同特徵標識的數據由同一個消費者實例進行處理,固然前邊的例子是狹義的,通訊代理(broken topic)也能夠被理解爲進行了一樣的分區劃分。Spring Cloud Stream 的分區概念是抽象的,能夠爲不支持分區Binder實現(例如RabbitMQ)也可使用分區。

 

 

注意:要使用分區處理,你必須同時對生產者和消費者進行配置。

編程模型(Programming Model)

爲了理解編程模型,須要熟悉下列核心概念:

  • Destination Binders(目的地綁定器): 負責與外部消息系統集成交互的組件
  • Destination Bindings(目的地綁定): 在外部消息系統和應用的生產者和消費者之間的橋樑(由Destination Binders建立)
  • Message (消息): 用於生產者、消費者經過Destination Binders溝通的規範數據。
  1. Destination Binders(目的地綁定器)

    Destination Binders是Spring Cloud Stream與外部消息中間件提供了必要的配置和實現促進集成的擴展組件。集成了生產者和消費者的消息的路由、鏈接和委託、數據類型轉換、用戶代碼調用等。

    儘管Binders幫咱們處理了許多事情,咱們仍須要對他進行配置。以後會講

  2. Destination Bindings (目的地綁定)

    如前所述,Destination Bindings 提供鏈接外部消息中間件和應用提供的生產者和消費者中間的橋樑。

    使用@EnableBinding 註解打在一個配置類上來定義一個Destination Binding,這個註解自己包含有@Configuration,會觸發Spring Cloud Stream的基本配置。

接下來的例子展現徹底配置且正常運行的Spring Cloud Stream應用,由INPUT接收消息轉換成String 類型並打印在控制檯上,而後轉換出一個大寫的信息返回到OUTPUT中。

@SpringBootApplication @EnableBinding(Processor.class) public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public String handle(String value) { System.out.println("Received: " + value); return value.toUpperCase(); } }

經過SendTo註解將方法內返回值轉發到其餘消息通道中,這裏由於沒有定義接收通道,提示消息已丟失,解決方法是新建一個接口,以下

public interface MyPipe{ //方法1 @Input(Processor.OUTPUT) //這裏使用Processor.OUTPUT是由於要同一個管道,或者名稱相同 SubscribableChannel input(); //還能夠以下這樣=====二選一便可========== //方法2 String INPUT = "output"; @Input(MyPipe.INPUT) SubscribableChannel input(); }

而後在在上邊的方法下邊加一個方法,並在@EnableBinding註解中改爲@EnableBinding({Processor.class, MyPipe.class})

@StreamListener(MyPipe.INPUT) public void handleMyPipe(String value) { System.out.println("Received: " + value); }

Spring Cloud Stream已經爲咱們提供了三個綁定消息通道的默認實現

  • Sink:經過指定消費消息的目標來標識消息使用者的約定。
  • Source:與Sink相反,用於標識消息生產者的約定。
  • Processor:集成了Sink和Source的做用,標識消息生產者和使用者

他們的源碼分別爲:

public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input(); } public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); } public interface Processor extends Source, Sink { }

Sink和Source中分別經過@Input和@Output註解定義了輸入通道和輸出通道,經過使用這兩個接口中的成員變量來定義輸入和輸出通道的名稱,Processor因爲繼承自這兩個接口,因此同時擁有這兩個通道。

注意:擁有多條管道的時候不能有輸入輸出管道名相同的,不然會出現發送消息被本身接收或報錯的狀況

咱們能夠根據上述源碼的方式來定義咱們本身的輸入輸出通道,定義輸入通道須要返回SubscribaleChannel接口對象,這個接口繼承自MessageChannel接口,它定義了維護消息通道訂閱者的方法;定義輸出通道則須要返回MessageChannel接口對象,它定義了向消息通道發送消息的方法。

自定義消息通道 發送與接收

依照上面的內容,咱們也能夠建立本身的綁定通道 若是你實現了上邊的MyPipe接口,那麼直接使用這個接口就好

  1. 和主類同包下建一個MyPipe接口,實現以下
package com.cnblogs.hellxz; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.SubscribableChannel; public interface MyPipe { //方法1 // @Input(Source.OUTPUT) //Source.OUTPUT的值是output,咱們自定義也是同樣的 // SubscribableChannel input(); //使用@Input註解標註的輸入管道須要使用SubscribableChannel來訂閱通道 //========二選一使用=========== //方法2 String INPUT = "output"; @Input(MyPipe.INPUT) SubscribableChannel input(); }

這裏用Source.OUTPUT和第二種方法 是同樣的,咱們只要將消息發送到名爲output的管道中,那麼監聽output管道的輸入流一端就能得到數據

  1. 擴展主類,添加監聽output管道方法
@StreamListener(MyPipe.INPUT) public void receiveFromMyPipe(Object payload){ logger.info("Received: "+payload); }
  1. 在主類的頭上的@EnableBinding改成@EnableBinding({Sink.class, MyPipe.class}),加入了Mypipe接口的綁定

  2. 在test/java下建立com.cnblogs.hellxz,並在包下新建一個測試類,以下

    package com.cnblogs.hellxz; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @EnableBinding(value = {Source.class}) @SpringBootTest public class TestSendMessage { @Autowired private Source source; //注入接口和注入MessageChannel的區別在於發送時需不須要調用接口內的方法 @Test public void testSender() { source.output().send(MessageBuilder.withPayload("Message from MyPipe").build()); //假設注入了MessageChannel messageChannel; 由於綁定的是Source這個接口, //因此會使用其中的惟一產生MessageChannel的方法,那麼下邊的代碼會是 //messageChannel.send(MessageBuilder.withPayload("Message from MyPipe").build()); } }
  3. 啓動主類,清空輸出,運行測試類,而後你就會獲得在主類的控制檯的消息以log形式輸出Message from MyPipe

咱們是經過注入消息通道,並調用他的output方法聲明的管道得到的MessageChannel實例,發送的消息

管道注入過程當中可能會出現的問題

經過注入消息通道的方式雖然很直接,可是也容易犯錯,當一個接口中有多個通道的時候,他們返回的實例都是MessageChannel,這樣經過@Autowired注入的時候每每會出現有多個實例找到沒法肯定須要注入實例的錯誤,咱們能夠經過@Qualifier指定消息通道的名稱,下面舉例:

  1. 在主類包內建立一個擁有多個輸出流的管道

    /** * 多個輸出管道 */ public interface MutiplePipe { @Output("output1") MessageChannel output1(); @Output("output2") MessageChannel output2(); }
  2. 建立一個測試類

    @RunWith(SpringRunner.class) @EnableBinding(value = {MutiplePipe.class}) //開啓綁定功能 @SpringBootTest //測試 public class TestMultipleOutput { @Autowired private MessageChannel messageChannel; @Test public void testSender() { //向管道發送消息 messageChannel.send(MessageBuilder.withPayload("produce by multiple pipe").build()); } }

    啓動測試類,會出現剛纔說的不惟一的bean,沒法注入

    Caused by: org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type 'org.springframework.messaging.MessageChannel' available: expected single matching bean but found 6: output1,output2,input,output,nullChannel,errorChannel

    咱們在@Autowired旁邊加上@Qualifier("output1"),而後測試就能夠正常啓動了

    經過上邊的錯誤,咱們能夠清楚的看到,每一個MessageChannel都是使用消息通道的名字作爲bean的名稱。

    這裏咱們沒有使用監聽這個管道,僅爲了測試並發現問題

經常使用配置

消費組和分區的設置

給消費者設置消費組和主題

  1. 設置消費組: spring.cloud.stream.bindings.<通道名>.group=<消費組名>
  2. 設置主題: spring.cloud.stream.bindings.<通道名>.destination=<主題名>

給生產者指定通道的主題:spring.cloud.stream.bindings.<通道名>.destination=<主題名>

消費者開啓分區,指定實例數量與實例索引

  1. 開啓消費分區: spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
  2. 消費實例數量: spring.cloud.stream.instanceCount=1 (具體指定)
  3. 實例索引: spring.cloud.stream.instanceIndex=1 #設置當前實例的索引值

生產者指定分區鍵

  1. 分區鍵: spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分區鍵>
  2. 分區數量: spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分區數量>

什麼是消息驅動?

SpringCloud Stream消息驅動能夠簡化開發人員對消息中間件的使用複雜度,讓系統開發人員更多盡力專一與核心業務邏輯的開發。SpringCloud Stream基於SpringBoot實現,自動配置化的功能能夠幫助咱們快速上手學習,相似與咱們以前學習的orm框架,能夠平滑的切換多種不一樣的數據庫。

目前SpringCloud Stream 目前只支持 RabbitMQ和kafka。

 

stream這個項目讓咱們沒必要經過繁瑣的自定義ampq來創建exchange,通道名稱,以及隊列名稱和路由方式。只須要簡單幾步咱們就輕鬆使用stream完成推送到rabbitmq和kafafa,並完成監聽工做。

消息驅動原理

綁定器

經過定義綁定器做爲中間層,實現了應用程序與消息中間件細節之間的隔離。經過嚮應用程序暴露統一的Channel經過,是的應用程序不須要再考慮各類不一樣的消息中間件的實現。當須要升級消息中間件,或者是更換其餘消息中間件產品時,咱們須要作的就是更換對應的Binder綁定器而不須要修改任何應用邏輯 。

在該模型圖上有以下幾個核心概念:

  • Source: 當須要發送消息時,咱們就須要經過Source,Source將會把咱們所要發送的消息(POJO對象)進行序列化(默認轉換成JSON格式字符串),而後將這些數據發送到Channel中;
  • Sink: 當咱們須要監聽消息時就須要經過Sink來,Sink負責從消息通道中獲取消息,並將消息反序列化成消息對象(POJO對象),而後交給具體的消息監聽處理進行業務處理;
  • Channel: 消息通道是Stream的抽象之一。一般咱們向消息中間件發送消息或者監聽消息時須要指定主題(Topic)/消息隊列名稱,但這樣一旦咱們須要變動主題名稱的時候須要修改消息發送或者消息監聽的代碼,可是經過Channel抽象,咱們的業務代碼只須要對Channel就能夠了,具體這個Channel對應的是那個主題,就能夠在配置文件中來指定,這樣當主題變動的時候咱們就不用對代碼作任何修改,從而實現了與具體消息中間件的解耦;
  • Binder: Stream中另一個抽象層。經過不一樣的Binder能夠實現與不一樣消息中間件的整合,好比上面的示例咱們所使用的就是針對Kafka的Binder,經過Binder提供統一的消息收發接口,從而使得咱們能夠根據實際須要部署不一樣的消息中間件,或者根據實際生產中所部署的消息中間件來調整咱們的配置。

 

消息驅動有通道,綁定MQ。

生產者消息傳遞到通道里面以後,通道是跟MQ作綁定,封裝的。消息一旦到MQ以後,發送給消費者通道,而後消費者進行消費 。綁定部分是底層幫助實現的。

封裝也只是實現了部分功能。MQ的功能不是百分百都實現了的。

Spring Cloud Stream介紹

Spring Cloud Stream是一個用於構建消息驅動的微服務應用程序的框架,是一個基於Spring Boot 建立的獨立生產級的,使用Spring Integration提供鏈接到消息代理的Spring應用。介紹持久發佈 - 訂閱(persistent publish-subscribe)的語義,消費組(consumer groups)分區(partitions)的概念。

你能夠添加@EnableBinding註解在你的應用上,從而當即鏈接到消息代理,在方法上添加@StreamListener以使其接收流處理事件,下面的例子展現了一個Sink應用接收外部信息

@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {

  public static void main(String[] args) {
    SpringApplication.run(VoteRecordingSinkApplication.class, args);
  }

  @StreamListener(Sink.INPUT)
  public void processVote(Vote vote) {
      votingService.recordVote(vote);
  }
}

@EnableBinding註解會帶着一個或多個接口做爲參數(舉例中使用的是Sink的接口),一個接口每每聲名了輸入和輸出的渠道,Spring Stream提供了SourceSinkProcessor這三個接口,你也能夠本身定義接口。

stream默認提供的消費者和生產者接口:

public interface Sink {
  String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); } public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); }

@Input註解區分了一個輸入channel,經過它接收消息到應用中,使用@Output註解 區分輸出channel,消息經過它離開應用,使用這兩個註解能夠帶一個channel的名字做爲參數,若是未提供channel名稱,則使用帶註釋的方法的名稱。

你可使用Spring Cloud Stream 現成的接口,也可使用@Autowired注入這個接口,下面在測試類中舉例

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class LoggingConsumerApplicationTests {

    @Autowired
    private Sink sink;

    @Test
    public void contextLoads() {
        assertNotNull(this.sink.input());
    }
}

 

 

首先,stream提供了默認的輸入和輸出經過。若是咱們不須要多個通道,能夠經過@Enbalebing(Sink.Class)來綁定輸入通道。對應的application裏面的

 # rabbitmq默認地址配置
  rabbitmq:
    host: asdf.me
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input:
         destination: push-exchange
        output:
          destination: push-exchange

 

這樣會自動創建一個exchange爲push-exchange名字的輸出通道。同理@Enbalebing(Input.Class)是綁定輸入通道的。下面建立一個生產者和消費者:

@EnableBinding(Source.class)
public class Producer {
    @Autowired
    @Output(Source.OUTPUT)
    private MessageChannel channel;
 
    public void send() {
        channel.send(MessageBuilder.withPayload("producer" + UUID.randomUUID().toString()).build());
    }

 

消費者:

@EnableBinding(Sink.class)
public class Consumer {
    @StreamListener(Sink.INPUT)
    public void receive(Message<String> message) {
        System.out.println("接收到MQ消息:" + JSONObject.toJSONString(message));
    }
}

stream默認提供的消費者和生產者接口:

public interface Sink {
  String INPUT = "input";
 
  @Input(Sink.INPUT)
  SubscribableChannel input();
}
public interface Source {
    String OUTPUT = "output";
 
    @Output("output")
    MessageChannel output();
}

能夠看出,會去找到咱們在application.yaml裏面定義的input,output下面的destination。分別做爲輸入和輸出通道。咱們也能夠本身定義接口來實現:

    String WS_INPUT = "ws-consumer";
    String EMAIL_INPUT = "email-consumer";
    String SMS_INPUT = "sms-consumer";
    @Input(MqMessageInputConfig.EMAIL_INPUT)
    SubscribableChannel emailChannel();

    @Input(MqMessageInputConfig.WS_INPUT)
    SubscribableChannel wsChannel();

    @Input(MqMessageInputConfig.SMS_INPUT)
    SubscribableChannel smChannel();
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MqMessageOutputConfig {
    String MESSAGE_OUTPUT = "message-producter";

    @Output(MqMessageOutputConfig.MESSAGE_OUTPUT)
    MessageChannel outPutChannel();

}
坑1.須要注意的是,最好不要自定義輸入輸出在同一個類裏面。這樣,若是咱們只調用生產者發送消息。會致使提示Dispatcher has no subscribers for channel。而且會讓咱們發送消息的次數莫名減小几回。詳細狀況能夠查看gihub官方issue,也給出的這種解決方法 官方解決方式
創建一個testjunit類,而後使用生產者發送消息。消費者監聽隊列獲取消息。
接收到MQ消息:{"headers":{"amqp_receivedDeliveryMode":"PERSISTENT","amqp_receivedRoutingKey":"my-test-channel","amqp_receivedExchange":"my-test-channel","amqp_deliveryTag":1,"amqp_consumerQueue":"my-test-channel.anonymous.vYA2O6ZSQE-S9MOnE0ZoJQ","amqp_redelivered":false,"id":"805e7fc3-a046-e07a-edf5-def58d9c8eab","amqp_consumerTag":"amq.ctag-QwsmRKg5f0DGSp-7wbpYxQ","contentType":"text/plain","timestamp":1523930106483},"payload":"22222222222a7d24456-5b11-4c25-9270-876e7bbc556a"} 
坑2.stream生成的exchang默認是topic模式。就是按照前綴匹配,發送消息給對應的隊列。
 

 

  •  *(星號):能夠(只能)匹配一個單詞
  • #(井號):能夠匹配多個單詞(或者零個)

  • fanout:廣播模式,發送到全部的隊列

  • direct:直傳。徹底匹配routingKey的隊列能夠收到消息。

坑3.默認消息異常以後,都會往死消息隊列裏面寫,然而異常是放到一個header裏面去的。默認消息隊列支持的最大frame_max 是128kb,超過這個大小,服務器就主動給你關閉鏈接,而後把你的消息會不斷的重試。
坑4.看到國內好多博客,使用@Input和@output都是用MessageChannel,這是不對的。@Output對MessageChannel,@Input對應SubscribableChannel 。切記!
坑5.我使用的stream版本是1.2.1,springboot版本時1.5.6。沒有辦法使用routingkey屬性,即在spring.cloud.stream.rabbit這個屬性沒法顯示。應該是個人stream版本偏低吧。遇到這種狀況,你們果斷換新版本,或者使用自帶的ampq來實現吧。
坑6.stream的destination對應生成rabbitmq的exchange。加上了group後,例如destination:wx-consumer,group:queue。那麼通過stream後隊列名稱會變成wx-consumer.queue。若是使用group對應的是持久化隊列,不會被rabbitmq刪除。
 示例:https://www.cnblogs.com/fengzheng/p/11576661.html
相關文章
相關標籤/搜索