Spring Cloud Stream學習筆記

1 環境

2 簡介

Spring Cloud Stream是一個用於構建消息驅動的微服務應用的框架,其提供的一系列抽象屏蔽了不一樣類型消息中間件使用上的差別,同時也大大簡化了Spring在整合消息中間件時的使用複雜度。html

Spring Cloud Stream 提供了Binder(負責與消息中間件進行交互)java

3 初見

1 建立項目 添加web rabbitmq stream依賴

在這裏插入圖片描述

2 rabbitmq配置

# 其餘參數默認配置
spring.rabbitmq.host=你的host

3 消息接收器

// 該註解表示綁定Sink消息通道
@EnableBinding(Sink.class)
public class MsgReceiver {

    private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);

    // 自帶 消費者
    @StreamListener(Sink.INPUT)
    public void receive(Object payload){
        logger.info("received: " + payload);
    }

}

4 在rabbitmq中發送消息

在這裏插入圖片描述

5 查看結果

在這裏插入圖片描述

4 自定義消息通道

1 自定義接口

public interface MyChannel {
    String INPUT = "test-input";
    String OUTPUT = "test-output";

    // 收
    @Input(INPUT)
    SubscribableChannel input();

    // 發
    @Output(OUTPUT)
    MessageChannel output();
}

2 自定義接收器

// 綁定自定義消息通道
@EnableBinding(MyChannel.class)
public class MsgReceiver1 {

    private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class);

    // 收
    @StreamListener(MyChannel.INPUT)
    public void receive(Object payload){
        logger.info("received1: " + payload + ":" + new Date());
    }

}

3 controller進行測試

package com.sundown.stream.controller;

import com.sundown.stream.bean.ChatMessage;
import com.sundown.stream.msg.MyChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

@RestController
public class HelloController {

    @Autowired
    MyChannel myChannel;

    @GetMapping("/hello")
    public void hello(){
        String message = "welcome spring cloud stream";
 myChannel.output().send(MessageBuilder.withPayload(message).build());
    }
}

4 消息輸入輸出(通道對接)

spring.cloud.stream.bindings.test-input.destination=test-topic
spring.cloud.stream.bindings.test-output.destination=test-topic

5 啓動、訪問

在這裏插入圖片描述
在這裏插入圖片描述

5 消息分組

  • 消息分組(肥水不留外人田 你可能不知道流向哪家田 可是確實是本身人)

1 打包 訪問(未使用消息分組)

在這裏插入圖片描述

  • 啓動java -jar stream-0.0.1-SNAPSHOT.jar
    java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081運行訪問http://localhost:8080/hello
  • 結果(屢次消費)
    在這裏插入圖片描述在這裏插入圖片描述
  • 如今我不想一條消息被屢次消費(假設消費者是一個集羣 --> 多我的作同一件事 題外話:分佈式 --> 一件事分給多我的作) 是否有什麼辦法呢
    消息分組幫咱們解決(指定輸入 輸出 有麼有負載均衡的味道)

2 消息分組配置

spring.cloud.stream.bindings.test-input.destination=test-topic
spring.cloud.stream.bindings.test-output.destination=test-topic

spring.cloud.stream.bindings.test-input.group=gg
spring.cloud.stream.bindings.test-output.group=gg
  • 爲了驗證是否能成功 從新打包運行 和上面同樣 訪問接口
    在這裏插入圖片描述
    在這裏插入圖片描述
  • 清空2個控制檯的信息 再次訪問接口
    在這裏插入圖片描述
    在這裏插入圖片描述

6 消息分區

  • 爲一些具備相同特徵的消息設置每次都被同一個消費實例進行消費。

1 消息分區配置

  • properties配置
spring.cloud.stream.bindings.test-input.destination=test-topic
spring.cloud.stream.bindings.test-output.destination=test-topic

spring.cloud.stream.bindings.test-input.group=gg
spring.cloud.stream.bindings.test-output.group=gg

# 開啓消費分區(消費者上配置)
spring.cloud.stream.bindings.test-input.consumer.partitioned=true
# 消費者實例個數(消費者上配置)
spring.cloud.stream.instance-count=2
# 當前實例下標(消費者上配置)
spring.cloud.stream.instance-index=0

2 controller配置

@RestController
public class HelloController {

    @Autowired
    MyChannel myChannel;

    @GetMapping("/hello")
    public void hello(){
        String message = "welcome spring cloud stream";
        // 先寫死
        int whichPart = 1;
        System.out.println("發送消息:" + message + ",發往分區:" + whichPart);
        myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build());
    }
}

3 訪問

  • 打包運行java -jar stream-0.0.1-SNAPSHOT.jar --spring.cloud.stream.instance-index=0
    java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081 --spring.cloud.stream.instance-index=0(別忘了先關閉啓動類 否則打包會報錯)
    -訪問http://localhost:8080/hello
    在這裏插入圖片描述

4 如果隨機訪問呢

@GetMapping("/hello")
    public void hello(){
        String message = "welcome spring cloud stream";
        int whichPart = new Random().nextInt(2);
        System.out.println("發送消息:" + message + ",發往分區:" + whichPart);
        myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build());
    }
  • 和上面同樣打包訪問
    在這裏插入圖片描述

7 定時器

雖然定時任務能夠用cron表達式 可是對於一些特殊的定時任務 可使用stream+rabbitmq更合適 好比幾分鐘後執行
rabbitmq插件安裝web

1 配置

  • properties
spring.rabbitmq.host=xxx

spring.cloud.stream.bindings.test-input.destination=topic
spring.cloud.stream.bindings.test-output.destination=topic

spring.cloud.stream.rabbit.bindings.test-input.consumer.delayed-exchange=true
spring.cloud.stream.rabbit.bindings.test-output.producer.delayed-exchange=true

#spring.cloud.stream.bindings.test-input.destination=test-topic
#spring.cloud.stream.bindings.test-output.destination=test-topic
#
#spring.cloud.stream.bindings.test-input.group=gg
#spring.cloud.stream.bindings.test-output.group=gg
#
## 開啓消費分區(消費者上配置)
#spring.cloud.stream.bindings.test-input.consumer.partitioned=true
## 消費者實例個數(消費者上配置)
#spring.cloud.stream.instance-count=2
## 當前實例下標(消費者上配置)
#spring.cloud.stream.instance-index=0
#
## 生產者配置
#spring.cloud.stream.bindings.test-output.producer.partition-key-expression=headers['whichPart']
## 消費節點數量
#spring.cloud.stream.bindings.test-output.producer.partition-count=2
  • 自定義通道
// 綁定自定義消息通道
@EnableBinding(MyChannel.class)
public class MsgReceiver1 {

    private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class);

    // 收
    @StreamListener(MyChannel.INPUT)
    public void receive(Object payload){
        // 添加日期 一會好對比
        logger.info("received1: " + payload + ":" + new Date());
    }

}
  • controller
@RestController
public class HelloController {
    private static final Logger logger = LoggerFactory.getLogger(HelloController.class);

    @Autowired
    MyChannel myChannel;

    @GetMapping("/delay")
    public void delay(){
        String message = "welcome spring cloud stream";
        logger.info("send msg:" + new Date());
        // x-delay --> 延遲3s
        myChannel.output().send(MessageBuilder.withPayload(message).setHeader("x-delay", 3000).build());
    }
}

2 啓動 訪問

在這裏插入圖片描述

  • 打開rabbitmq查看
    在這裏插入圖片描述
  • 查看idea控制檯
    在這裏插入圖片描述

8 小結

stream自帶的與自定義(添加destination=xxx)之間的相似和區別
解決重複消費 分組(group)
消息分組單個實例訪問(開啓消費分區 實例個數 實例下標 生產者配置 消費節點數)
定時器 rabbitmq相關的插件安裝運行 後端代碼實現(配置delayed-exchange和destination以及controller 發送時添加setHeader("x-delay", 3000) 3s延時)spring

相關文章
相關標籤/搜索