springcloud學習(七)之Stream

前言

MQ消息中間件⼴泛應⽤在應⽤解耦合、異步消息處理、流量削峯等場景中。java

不一樣的MQ消息中間件內部機制包括使⽤⽅式都會有所不一樣,⽐如RabbitMQ中有Exchange(交換機/交換器)這⼀概念, kafka有Topic、 Partition分區這些概念, MQ消息中間件的差別性不利於咱們上層的開發應⽤,當咱們的系統但願從原有的RabbitMQ切換到Kafka時,咱們會發現⽐較困難,不少要操做可能重來(由於應⽤程序和具體的某⼀款MQ消息中間件耦合在⼀起了)。git

Spring Cloud Stream進⾏了很好的上層抽象,可讓咱們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細節差別,就像Hibernate屏蔽掉了具體數據庫(Mysql/Oracle⼀樣)。如此⼀來,咱們學習、開發、維護MQ都會變得輕鬆。⽬前Spring Cloud Stream⽀持RabbitMQ和Kafka。spring

本質: 屏蔽掉了底層不一樣MQ消息中間件之間的差別,統⼀了MQ的編程模型,下降了學習、開發、維護MQ的成本sql

Stream簡介

Spring Cloud Stream 是⼀個構建消息驅動微服務的框架。應⽤程序經過inputs(至關於消息消費者consumer)或者outputs(至關於消息⽣產者producer)來與Spring Cloud Stream中的binder對象交互,⽽Binder對象是⽤來屏蔽底層MQ細節的,它負責與具體的消息中間件交互。數據庫

說⽩了:對於咱們來講,只須要知道如何使⽤Spring Cloud Stream與Binder對象交互便可編程

Stream的幾個經常使用註解

註解 描述
@Input(在消費者⼯程中使⽤) 註解標識輸⼊通道,經過該輸⼊通道接收到的消息進⼊應⽤程序
@Output(在⽣產者⼯程中使⽤) 註解標識輸出通道,發佈的消息將經過該通道離開應⽤程序
@StreamListener(在消費者⼯程中使⽤,監聽message的到來) 監聽隊列,⽤於消費者的隊列的消息的接收(有消息監聽.....)
@EnableBinding 把Channel和Exchange(對於RabbitMQ)綁定在⼀起

Stream開發實戰

生產者端

在父工程下新建子模塊lagou-cloud-stream-producer-9090,引入jar:json

<!--spring cloud stream 依賴(rabbit) -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

添加rabbit相關配置:app

spring:
  application:
    name: lagou-cloud-stream-producer
  cloud:
    stream:
      binders: # 綁定MQ服務信息(此處咱們是RabbitMQ)
        lagouRabbitBinder: # 給Binder定義的名稱,⽤於後⾯的關聯
          type: rabbit # MQ類型,若是是Kafka的話,此處配置kafka
          environment: # MQ環境配置(⽤戶名、密碼等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 關聯整合通道和binder對象
        output: # output是咱們定義的通道名稱,此處不能亂改
          destination: lagouExchange # 要使⽤的Exchange名稱(消息隊列主題名稱)
          content-type: text/plain # application/json # 消息類型設置,⽐如json
          binder: lagouRabbitBinder # 關聯MQ服務

定義一個發送消息的接口及實現類:框架

public interface IMessageProduder {
    void sendMessage(String message);
}
//綁定的通道,output。springcloud stream內對輸出通道的定義
@EnableBinding(Source.class)
public class MessageProducerImpl implements IMessageProduder {

    @Autowired
    private Source source;

    @Override
    public void sendMessage(String message) {
        //向mq發送消息(並不直接操做mq,而是操做springcloud stream
        //使用source指定的output通道向外發送消息
        source.output().send(MessageBuilder.withPayload(message).build());
    }
}

消費者端

父工程下新建消費者子模塊lagou-cloud-stream-consumer-9091,引入jar座標和服務端同樣,這裏再也不贅述。
application.yml裏面配置與rabbit相關參數,惟一與服務者端不一樣的是input和output參數:

其餘都保持一致。
在消費端定義service類來接受消息:異步

@EnableBinding(Sink.class)
public class MessageConsumerService {

    @StreamListener(Sink.INPUT)
    public void receiveMessage(Message<String> message){
        System.out.println("----接受到的消息---->"+message);
    }
}

測試

咱們在服務提供者端寫一個測試類來發送消息:

@SpringBootTest(classes = {StreamProducerApplication9090.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class MessageProducerTest
{
    @Autowired
    private IMessageProduder iMessageProduder;

    @Test
    public void testSendMessage(){
        iMessageProduder.sendMessage("hello world----!!");
    }

}

咱們先啓動服務消費者,而後運行服務提供者端的測試類,看服務消費者端的控制檯輸出了接收到的信息:

Stream之消息分組

咱們將服務消費者複製一份,新消費者的端口是9092,前一個消費者端口是9091。
這樣咱們繼續測試,會發現同一個服務提供者發送的消息,被兩個消費者都接收到並進行處理了。
這明顯是有問題的,好比電商網站的訂單,確定只須要處理一次就行。

爲了解決這個問題,rabbitmq有個消息分組的概念,只要兩個消費者實例處在一個組裏,那麼這個組裏只有一個消費者會處理這個消息。

咱們僅僅須要在服務消費者端設置 spring.cloud.stream.bindings.input.group 屬性,多個消費者實例配置爲同⼀個group名稱(在同⼀個group中的多個消費者只有⼀個能夠獲取到消息並消費)。以下:

擴展:前面咱們都是先啓動服務消費者,而後再啓動服務提供者發送消息,也就是消息是臨時性的,並無持久化存儲。當咱們設置了分組以後,消息就會持久化存儲。咱們先發送消息,而後再啓動服務消費者客戶端,也可以接收到消息。

案例源碼

stream案例的源碼地址

歡迎訪問個人博客:https://www.liuyj.top

相關文章
相關標籤/搜索