九. SpringCloud Stream消息驅動

1. 消息驅動概述

1.1 是什麼

在實際應用中有不少消息中間件,好比如今企業裏經常使用的有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,學習全部這些消息中間件無疑須要大量時間經歷成本,那有沒有一種技術,使咱們再也不須要關注具體的消息中間件的細節,而只須要用一種適配綁定的方式,自動的在各類消息中間件內切換呢?消息驅動就是這樣的技術,它能 屏蔽底層消息中間件的差別,下降切換成本,統一消息的編程模型java

SpringCloud Stream是一個構件消息驅動微服務的框架。應用程序經過inputs和outputs來與SpringCloud Stream中的綁定器(binder)對象交互,經過配置來綁定,而SpringCloud Stream的綁定器對象負責與消息中間件交互,因此,咱們只須要搞清楚如何與SpringCloud Stream交互就能夠方便使用消息驅動的方式。可是 截至到目前 SpringCloud Stream僅支持RabbitMQ和Kafkaweb

1.2 設計思想

標準MQ模型spring

  • 生產者 / 消費者之間靠消息媒介傳遞信息內容 - Messag
  • 消息必須走特定的通道 - Message Channel
  • 消息通道里的消息如何被消費呢?誰負責處理? - 消息通道 MessageChannel 的子接口 SubscribableChannel,由 MessageHandler 消息處理器所訂閱
image-20210304184605474

爲何使用Cloud Stream數據庫

好比說咱們用到了RabbitMQ和Kafka,因爲這兩個消息中間件的架構上的不一樣,像RabbitMQ有exchange,Kafka有Topic和Partitions分區,這些中間件的差別性致使實際項目開發給咱們形成了必定的困擾,咱們若是用了兩個消息隊列的其中一種,後面的業務需求若是又要往另一種消息隊列進行遷移,這無疑是一個災難,一大堆東西都要從新推到重作,由於它跟咱們的系統耦合了,這時候SpringCloud Stream給咱們提供了一種解耦合的方式。編程

image-20210304185448484

stream憑什麼能夠統一底層差別json

在沒有綁定器這個概念的狀況下,咱們的SpringBoot應用要直接與消息中間件進行信息交互的時候,因爲各消息中間件構建的初衷不一樣,它們的實現細節上會有較大的差別性。api

經過定義綁定器做爲中間層,完美的實現了 應用程序與消息中間件細節之間的隔離。Stream對消息中間件的進一步封裝(經過嚮應用程序暴露統一的Channel通道,使得應用程序不須要再考慮各類不一樣的消息中間件實現),能夠作到代碼層面對中間件的無感知,甚至於動態的切換中間件(如RabbitMQ切換爲Kafka),使得微服務開發的高度解耦,服務能夠更多的關注本身的業務流程。架構

在消息綁定器中,INPUT對應於消費者,OUTPUT對應於生產者app

Stream中的消息通訊方式遵循了 發佈-訂閱模式,用Topic(主題)進行廣播(RabbitMQ中對應於Exchange交換機,Kafka中就是Topic)。負載均衡

1.3 SpringCloud Stream標準流程套路
  • Binder 很方便的鏈接中間件,屏蔽差別
  • Channel 通道,是隊列Queue的一種抽象,在消息通信系統中就是實現了存儲和轉發的媒介,經過Channel對隊列進行配置
  • SourceSink 簡單的能夠理解爲參照對象是SpringCloud Stream自身,從Stream發佈消息就是輸出,接受消息就是輸入
image-20210304191045523
1.4 SpringCloud Stream編碼API與經常使用註解
image-20210304191011194
組成 說明
Middleware 中間件,目前只支持RabbitMQ和Kafka
Binder Binder是應用與消息中間件之間的封裝,目前實行了RabbitMQ和Kafka的Binder,經過Binder能夠很方便的鏈接中間件,能夠動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些均可以經過配置文件來實現
@Input 註解標識輸入通道,經過該輸入通道接收到的消息進入應用程序
@Output 註解標識輸出通道,發佈的消息將經過該通道離開應用程序
@StreamListner 監聽隊列,用於消費者的隊列的消息接收
@EnableBinding 使信道Channel和交換機/主題(Exchange/Topic)綁定在一塊兒

2. Spring Cloud Stream 案例

新建三個子模塊分別對應於消息的生產者和消費者:

模塊名 微服務功能
cloud-stream-rabbitmq-provider8801 生產者,發送消息模塊
cloud-stream-rabbitmq-consumer8802 消費者,接收消息模塊
cloud-stream-rabbitmq-consumer8803 消費者,接收消息模塊
2.1 消息驅動之消息生產者

新建Module:cloud-stream-rabbitmq-provider8801做爲消息的生產者用來發送消息,在其POM文件中除引入web、actuator、eureka-client等必要啓動器外,還須要引入SpringCloud Stream對應實現RabbitMQ的啓動器依賴:

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

編寫其配置文件application.yml:

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務信息
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: mpolaris.top
                port: 5672
                username: admin
                password: 1234321
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱,OUTPUT表示這是消息的發送方
          # 表示要使用的Exchange名稱定義
          destination: testExchange 
          # 設置消息類型,本次爲json,文本則設置「text/plain」
          content-type: application/json 
          # 設置要綁定的消息服務的具體設置
          default-binder: defaultRabbit

eureka:
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    # 設置心跳的時間間隔(默認是30秒)
    lease-renewal-interval-in-seconds: 2 
    # 若是如今超過了5秒的間隔(默認是90秒)
    lease-expiration-duration-in-seconds: 5 
    # 在信息列表時顯示主機名稱yml
    instance-id: send-8801.com  
    # 訪問的路徑變爲IP地址
    prefer-ip-address: true

編寫其主啓動類

編寫業務類,在業務類中分別要編寫 發送消息接口 及其 實現類,並在發送接口消息的實現類中 添加 @EnableBinding 註解 用來綁定消息的推送管道,消息生產者綁定的消息推送管道爲 org.springframework.cloud.stream.messaging.Source

public interface IMessageProvider {
    public String send();
}
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @Author polaris
 * @Date 2021/3/4 21:46
 */
@EnableBinding(Source.class) //定義消息的推送管道
public class MessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output; //消息發送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build()); //發送消息
        System.out.println("==> serial:" + serial);
        return null;
    }
}

注意咱們在service的實現類中再也不須要@Service註解,由於這個service再也不是傳統意義上的和Controller、DAO數據等進行交互的service,而是要綁定綁定器打交道的service。

而後編寫其業務層的Controller:

@RestController
public class SendMessageController {
    @Autowired
    private IMessageProvider messageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage() {
        return messageProvider.send();
    }
}

啓動服務註冊中心後和RabbitMQ後,啓動消息生產者微服務,咱們在RabbitMQ的控制面板中能夠看見多出了一個名爲testExchange的交換機,這個交換機偏偏就是咱們以前在配置文件中配置的交換機名字testExchange。

而後咱們訪問 http://localhost:8801/sendMessage 使用消息生產者微服務發送消息,在其微服務後臺咱們看到了打印的消息。

在RabbitMQ的控制面板中咱們也看到了確實發送了消息。

image-20210304215848131
2.2 消息驅動之消息消費者

新建Module:cloud-stream-rabbitmq-consumer8802/8803做爲消息的生產者用來接收消息,其POM文件中引入的啓動器依賴和消息生產者微服務的依賴幾乎相同,而後編寫其配置文件application.yml,其配置文件的書寫和消息生產者的幾乎一致,特別須要注意的是,消息生產者微服務用到的通道爲OUTPUT,而消息消費者微服務用到的通道爲INPUT,其餘的配置文件信息就只須要注意端口號、註冊服務名的區別便可:

spring:
  cloud:
      bindings: 
        input: # 這個名字是一個通道的名稱,INPUT表示消息消費者

編寫主啓動類

編寫消息消費者的業務類,因爲是消費者,因此只須要編寫其Controller便可,在其Controller上一樣須要添加 @EnableBinding 註解用來綁定消息的推送管道,消息消費者綁定的消息推送管道爲import org.springframework.cloud.stream.messaging.Sink,在接收消息的方法中須要使用 @StreamListner 註解來監聽其綁定的消息推送管道:

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {
    
    @Value("${server.port}")
    private String serverPort;
    
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消費者" + serverPort + "號,收到消息:" 
                           + message.getPayload());
    }
}

而後啓動消息發送消費者服務,用生產者發送消息,咱們能夠發如今消費者端能夠成功接收到消息。

3. 分組消費和持久化

3.1 重複消費問題

當生產者發送消息後,此時的咱們的消費者都接受了消息並進行了消費,也就是說同一條消息被多個消息消費者所消費。

上述的問題就是消息的 重複消費 問題,那麼這個問題爲何如此重要呢?其實重複消費這個問題自己不可怕,可怕的是沒考慮到重複消費以後,怎麼保證冪等性。(冪等性 通俗的說,就一個數據,或者一個請求,重複不少次,須要確保對應的數據是不會改變的,不能出錯)。分佈式微服務應用爲了實現高可用和負載均衡,實際上同一功能的服務都會部署多個具體的服務實例。舉個例子,假設有一個系統,有一條消息要求往數據庫裏插入一條數據,要是這個消息重複消費兩次,結果就是向數據庫裏插入了兩條數據,這樣數據就錯了,就違背了冪等性原則,可是要是該消息消費到第二次的時候,能夠判斷一下已經消費過了,而後直接將該消息丟棄,這就實現了只插入一條數據,一條消息重複出現了兩次,可是隻有第一次真正被消費了,數據庫裏也就只插入了一條數據,這就保證了系統的冪等性。

上面簡單的介紹了消息的重複消費問題,那如何解決這種重複消費問題呢,那就須要咱們進行 分組和持久化屬性組 操做,利用SpringCloud Stream中的消息分組來解決這個問題,須要注意的是在Stream中處於同一組中的多個消息消費者是競爭關係,也就是保證生產者所發送的同一個消息只會被其中一個消費者消費一次。 不一樣組的消費者是能夠對消息進行全面消費(重複消費)的,只有同一組內纔會發生競爭關係

在RabbitMQ中,默認分組group是不一樣的,組流水號不同,被認爲不一樣組,咱們查看testExchange交換機,能夠發現8802和8803兩個消息消費者處於不一樣的組,因此8801消息生產者發送的消息能夠被這兩個消費者重複消費:

image-20210304230322826
3.2 分組解決重複消費問題

上面在RabbitMQ控制面板中咱們看到的組流水號是系統隨機分配的,這樣無疑很差控制,因此咱們應該自定義配置分組,將8802/8803兩個消息消費者微服務分爲同一個組,以此來解決消息的重複消費問題。

先來演示如何自定義分組

在8802/8803微服務中的配置文件中分別添加組名屬性:

spring:
  cloud:
    stream:
      bindings:
        input:
          group: A/B # 分組名稱

這裏咱們將8802設置爲A組,8803設置爲B組,而後咱們將消息消費方的兩個微服務重啓,咱們再次查看其組流水號,發現再也不是長長的隨機組流水號,而變成了咱們自定義的分組:

image-20210304230642039

此時因爲8802/8803位於兩個不一樣分組下,因此沒有競爭關係,消息生產者發送消息後,仍然能夠重複消費。

下面咱們將這兩個消息消費方微服務分到相同的消費組中,這樣每次就只有一個消費者,消息生產者發送的消息只能被8802或8803其中一個接受到,這樣就避免了重複消費,將8802和8803的分組名都改成A,再次重啓兩個消息消費方微服務,此時咱們能夠看到在分組A下已經有了兩個消費者。

image-20210304231043210

再用生產者發送5條消息,咱們發現8802/8803分別消費了3條和2條不一樣的消息,而沒有出現重複消費的問題。

3.3 持久化

經過上述,解決了重複消費問題,再來看看持久化

加上了group就自動支持持久化了

下面來演示一下持久化

  • 中止8802/8803並去除掉8802分組group:A(8803的分組group A沒有去掉)

  • 8801發送4條消息到rabbitmq

  • 先啓動8802(無分組屬性配置),後臺沒有打出來消息(消息丟失故障)

  • 再啓動8803(有分組屬性配置),後臺打出了4條消息(消費持久化消息)

相關文章
相關標籤/搜索