Spring Cloud Stream 簡單使用

引入依賴

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <!-- or '*-stream-kafka' -->
        </dependency>
複製代碼

消息生產/消費

生產者

首先定義一個接口:java

/** * 使用 stream 首先定義一個接口 */
public interface StreamClient {

    String MSG = "myStreamMsg"; //發送消息的隊列名稱

    String RETURN_MSG = "returnMsg"; //消費消息成功返回消息的隊列名稱
    @Output(MSG)
    MessageChannel output();

    @Input(RETURN_MSG)
    SubscribableChannel input();
    
    // MessageChannel 發送消息類型
    // SubscribableChannel 訂閱消息類型
}
複製代碼

生產者:git

/* 這裏採用的 controoller 的方式來測試發送消息 */
@RestController
@EnableBinding(StreamClient.class)  // 這裏必需要綁定接口,否者就會啓動報錯,查找不到 streamClient
public class StreamSenderController {

    @Autowired
    private StreamClient streamClient;

     // 測試 stream 發送方法

/* @GetMapping("sendMsg") public void send() { streamClient.output().send(MessageBuilder.withPayload(new Date().toString()).build()); } @GetMapping("sendDate") public void send1() { streamClient.output().send(MessageBuilder.withPayload(new Date()).build()); }*/
    @GetMapping("sendPerson")
    public void send2() {
        Person person = new Person();
        person.setName("Berg");
        person.setSex("man");
        streamClient.output().send(MessageBuilder.withPayload(person).build()); // 發送消息
    }

    @StreamListener(value = StreamClient.RETURN_MSG)
    public void returnMsg(String msg) {
        System.out.println("reurn message:"+msg);
    } // 消費消息後的返回
}
複製代碼

消費者

首先定義一個接口:github

/**java
 * 使用 stream 首先定義一個接口
 */
public interface StreamClient {

    String MSG = "myStreamMsg";  // 消費消息隊列

    String RETURN_MSG = "returnMsg"; // 返回隊列
    @Input(MSG)
    SubscribableChannel input();

    @Output(RETURN_MSG)
    MessageChannel output();

}
複製代碼

消費者:spring

@Component
@EnableBinding(StreamClient.class)  // 這裏一樣須要綁定接口
public class StreamReceiver {
    private final Logger logger = LoggerFactory.getLogger(StreamReceiver.class);

/* @StreamListener(value = "myStreamMsg") public void receive(String msg) { logger.info("receive:{}", msg); } @StreamListener(value = "myStreamMsg") public void receive1(Date date) { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); logger.info("receive:{}", dateFormat.format(date)); }*/

    @StreamListener(StreamClient.MSG)  // 監聽消費消息隊列
    @SendTo(StreamClient.RETURN_MSG) // 返回的消息隊列
    public String receive2(Person p) {
        logger.info("name:{}",p.getName());
        logger.info("sex:{}",p.getSex());
        return "received message";
    }
}
複製代碼

問題

重複訂閱問題:

一般在生產環境,咱們的每一個服務都不會以單節點的方式運行在生產環境,當同一個服務啓動多個實例的時候,這些實例都會綁定到同一個消息通道的目標主題(Topic)上。默認狀況下,當生產者發出一條消息到綁定通道上,這條消息會產生多個副本被每一個消費者實例接收和處理(出現上述重複消費問題)。可是有些業務場景之下,咱們但願生產者產生的消息只被其中一個實例消費,這個時候咱們須要爲這些消費者設置消費組來實現這樣的功能。json

解決方案: 在消費者配置文件中添加以下配置bash

spring:
  cloud:
    stream:
      bindings:
        myStreamMsg: # 消息隊列名稱
          group: one # 若是這裏不定義分組,就會出現多個實例重複消費的問題
複製代碼

消息類型問題:

在發送消息時有可能時對象,也有可能時文本。所以不一樣類型之間的轉換可能會出現問題。默認爲 json 類型 解決方案: 在生產者配置文件中配置消息類型app

spring:
    stream:
      bindings:
        myStreamMsg:
          Content-Type: application/json  // 配置類型
複製代碼

代碼

github地址測試

相關文章
相關標籤/搜索