Spring Cloud Stream - 構建消息事件驅動的微服務

Spring Cloud Stream簡介

Spring Cloud Stream是什麼:html

Spring Cloud Stream是Spring Cloud的一個子項目,是一個能讓咱們更加方便操做MQ的框架,其目的用於構建與消息中間件鏈接的高度可伸縮的消息事件驅動的微服務java

簡單來講Spring Cloud Stream就是一個簡化了MQ操做的框架,其架構圖以下:
Spring Cloud Stream - 構建消息事件驅動的微服務node

  • 圖片來自官方文檔,從圖中能夠看到應用經過input和output與Binder進行交互,而Binder是一個讓咱們的微服務與MQ集成的組件。圖中的Middleware便是消息中間件,目前支持Kafka、RabbitMQ以及RocketMQ

Spring Cloud Stream編程模型:
Spring Cloud Stream - 構建消息事件驅動的微服務web

  • 圖片來自官方文檔,微服務(Application)集成了Stream後,Stream的Destination Binder會建立兩個Binding,左邊的Binding鏈接着RabbitMQ,右邊的Binding鏈接着Kafka。左邊的Binding從RabbitMQ消費消息,而後通過圖中代碼的處理後,把處理結果經過右邊的Binding投遞到Kafka。簡單來講,就是這個微服務消費了RabbitMQ裏的消息並對其進行處理,最後將處理的結果投遞到Kafka中。Input和Output是消息相對與微服務的走向,input表示微服務接收消息,output表示微服務投遞消息或發送消息

關於圖中的概念:spring

  • Destination Binder(目標綁定器):與消息中間件通訊的組件,用於實現消息的消費和投遞
  • Destination Bindings(目標綁定):Binding是鏈接應用程序跟消息中間件的橋樑,用於消息的消費和生產,由binder建立

使用Spring Cloud Stream

如今有一個微服務項目:content-center,該微服務做爲生產者,咱們來爲這個微服務集成Spring Cloud Stream,第一步添加stream依賴:apache

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
  • Tips:該項目的Spring Cloud版本爲:Greenwich.SR1;Spring Cloud Alibaba版本爲:2.1.0.RELEASE

第二步,在啓動類上添加@EnableBinding註解,以下:編程

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding(Source.class)
...

第三步,在配置文件中,添加與stream相關的配置項:json

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        # 生產者爲output
        output:
          # 用於指定topic
          destination: stream-test-topic

完成以上步驟後,項目就已經集成了Spring Cloud Stream,如今咱們來使用Spring Cloud Stream編寫生產者,具體代碼以下:架構

package com.zj.node.contentcenter.controller.content;

import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 生產者
 *
 * @author 01
 * @date 2019-08-10
 **/
@RestController
@RequiredArgsConstructor
public class TestProducerController {

    private final Source source;

    @GetMapping("/test-stream")
    public String testStream(){
        Message<String> message = MessageBuilder
                .withPayload("消息體")
                .build();
        source.output()
                .send(message);

        return "send message success!";
    }
}

啓動項目,測試該接口是否能成功執行:
Spring Cloud Stream - 構建消息事件驅動的微服務app


而後爲另外一個做爲消費者的微服務項目:user-center,集成Spring Cloud Stream,因爲依賴配置是同樣的,這裏就不進行重複了,可是配置和註解裏的類須要更改一下。首先是配置以下:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        # 消費者爲input
        input:
          # 用於指定topic
          destination: stream-test-topic
          # rocketmq必須配置group,不然啓動會報錯
          # 若是使用的是其餘MQ,則不是必須配置的
          group: binder-group

啓動類的註解以下:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
...

完成集成後,使用Spring Cloud Stream編寫消費者,具體代碼以下:

package com.zj.node.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

/**
 * 消費者
 *
 * @author 01
 * @date 2019-08-10
 **/
@Slf4j
@Service
public class TestStreamConsumer {

    @StreamListener(Sink.INPUT)
    public void receive(String messageBody) {
        log.info("經過stream收到了消息,messageBody = {}", messageBody);
    }
}

完成代碼的編寫後啓動項目,因爲先前咱們已經經過生產者往RocketMQ投遞了消息,因此此時控制檯會輸出接收到的消息,以下:
Spring Cloud Stream - 構建消息事件驅動的微服務


Spring Cloud Stream自定義接口

經過以上小節的學習,咱們已經瞭解了Spring Cloud Stream的基本使用。從以上示例能夠得知,input用於綁定一個topic消費消息,output則反之,用於綁定一個topic投遞消息。

但在實際的項目中,可能會有多個topic,甚至在極端場景下,不一樣的topic可能使用不一樣的MQ實現,而stream默認提供的input和output都只能綁定一個topic,因此這個時候就須要用到stream的自定義接口來實現多個「input」和「output」綁定不一樣的topic了。

在以上小節的示例中能夠得知,生產者發送消息時使用的是Source接口裏的output方法,而消費者發送消息時使用的是Sink接口裏的input方法,而且都須要配置到啓動類的@EnableBinding註解裏。因此實際上咱們須要自定義接口的源碼與這兩個接口的源碼幾乎一致,只是名稱有所不一樣而已,使用上也只是將SourceSink改成自定義的接口便可。

接下來簡單演示一下如何自定義接口並使用,咱們基於上一小節的例子進行改造。首先是生產者,定義一個用於發送消息的接口,具體代碼以下:

package com.zj.node.contentcenter.rocketmq;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 自定義發送消息接口,與stream默認提供的Source源碼是相似的
 *
 * @author 01
 * @date 2019-08-10
 **/
public interface MySource {

    /**
     * Name of the output channel.
     */
    String MY_OUTPUT = "my-output";

    /**
     * @return output channel
     */
    @Output(MY_OUTPUT)
    MessageChannel output();
}

而後在啓動類的@EnableBinding中,添加這個接口:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding({Source.class, MySource.class})
...

在配置文件中添加以下配置:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        # 生產者爲output
        output:
          # 用於指定topic
          destination: stream-test-topic
        # 自定義的」output「,這裏的名稱須要與MySource接口裏的MY_OUTPUT相對應  
        my-output:
          # 綁定不一樣的topic
          destination: stream-my-topic

修改生產者的代碼以下便可:

package com.zj.node.contentcenter.controller.content;

import com.zj.node.contentcenter.rocketmq.MySource;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 生產者
 *
 * @author 01
 * @date 2019-08-03
 **/
@RestController
@RequiredArgsConstructor
public class TestProducerController {

    private final MySource mySource;

    @GetMapping("/test-stream")
    public String testStream(){
        Message<String> message = MessageBuilder
                .withPayload("消息體")
                .build();
        mySource.output()
                .send(message);

        return "send message success!";
    }
}

而後啓動項目訪問該接口,測試消息是否能正常發送:
Spring Cloud Stream - 構建消息事件驅動的微服務


改造完生產者後接着改造消費者,首先定義一個用於消費消息的接口,具體代碼以下:

package com.zj.node.usercenter.rocketmq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * 自定義消費消息接口,與stream默認提供的Sink源碼是相似的
 *
 * @author 01
 * @date 2019-08-10
 **/
public interface MySink {

    /**
     * Input channel name.
     */
    String MY_INPUT = "my-input";

    /**
     * @return input channel.
     */
    @Input(MY_INPUT)
    SubscribableChannel input();
}

一樣須要在啓動類的@EnableBinding中,添加這個接口:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding({Sink.class, MySink.class})
...

在配置文件中添加以下配置:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        # 消費者爲input
        input:
          # 用於指定topic
          destination: stream-test-topic
          # rocketmq必須配置group,不然啓動會報錯
          # 若是使用的是其餘MQ,則不是必須配置的
          group: binder-group
        # 自定義的」input「,這裏的名稱須要與MySink接口裏的MY_INPUT相對應    
        my-input:
          # 綁定不一樣的topic
          destination: stream-my-topic
          group: my-group

修改消費者的代碼以下:

package com.zj.node.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

/**
 * 消費者
 *
 * @author 01
 * @date 2019-08-10
 **/
@Slf4j
@Service
public class TestStreamConsumer {

    @StreamListener(MySink.MY_INPUT)
    public void receive(String messageBody) {
        log.info("自定義接口 - 經過stream收到了消息,messageBody = {}", messageBody);
    }
}

啓動項目,因爲先前咱們已經經過生產者往RocketMQ投遞了消息,因此此時控制檯會輸出接收到的消息,以下:
Spring Cloud Stream - 構建消息事件驅動的微服務


Spring Cloud Stream的監控

咱們都知道Spring Boot Actuator組件用於暴露監控端點,不少監控工具都須要依賴該組件的監控端點實現監控。而項目集成了Stream及Actuator後也會暴露相應的監控端點,首先須要在項目裏集成Actuator,添加依賴以下:

<!-- actuator -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

在配置文件中添加以下配置:

management:
  endpoints:
    web:
      exposure:
        # 暴露全部監控端點
        include: '*'
  endpoint:
    health:
      # 顯示健康檢測詳情
      show-details: always

訪問http://127.0.0.1:{項目端口}/actuator能夠獲取全部暴露出來的監控端點,Stream的相關監控端點也在其列,以下圖:
Spring Cloud Stream - 構建消息事件驅動的微服務

/actuator/bindings端點能夠用於查看bindings相關信息:
Spring Cloud Stream - 構建消息事件驅動的微服務

/actuator/channels端點用於查看channels的相關信息,而「input」和「output」就是所謂的channel,能夠認爲這些channel是topic的抽象:
Spring Cloud Stream - 構建消息事件驅動的微服務

/actuator/health端點中能夠查看binder及RocketMQ的狀態,主要是用於查看MQ的鏈接狀況,若是鏈接不上其status則爲DOWN:
Spring Cloud Stream - 構建消息事件驅動的微服務


Spring Cloud Stream + RocketMQ實現事務消息

先前在Spring Cloud Alibaba RocketMQ - 構建異步通訊的微服務一文的末尾中,咱們介紹了RocketMQ的事務消息而且也演示瞭如何編碼實現。在本文學習了Spring Cloud Stream以後,咱們來結合Stream對以前實現事務消息的代碼進行重構。

首先修改配置文件以下:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
        bindings:
          output:
            producer:
              # 開啓事務消息,這樣經過output這個channel發送的消息都是半消息
              transactional: true
              # 生產者所在的事務組名稱
              group: tx-test-producer-group
      bindings:
        # 生產者爲output
        output:
          # 用於指定topic
          destination: stream-test-topic

而後重構TestProducerService,具體代碼以下:

package com.zj.node.contentcenter.service.test;

import com.alibaba.fastjson.JSON;
import com.zj.node.contentcenter.dao.content.NoticeMapper;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

/**
 * @author 01
 * @date 2019-08-08
 **/
@Service
@RequiredArgsConstructor
public class TestProducerService {

    private final NoticeMapper noticeMapper;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    private final Source source;

    public String testSendMsg(Notice notice) {
        // 生成事務id
        String transactionId = UUID.randomUUID().toString();
        // 經過stream發送消息,這裏實際發送的就是半消息
        source.output().send(
                MessageBuilder.withPayload("消息體")
                        // header是消息的頭部分,能夠用做傳參
                        .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                        .setHeader("notice_id", notice.getId())
                        // 對象須要轉換成json,不然默認是調用對象的toString方法轉換爲字符串
                        .setHeader("notice", JSON.toJSONString(notice))
                        .build()
        );

        return "send message success";
    }

    @Transactional(rollbackFor = Exception.class)
    public void updateNotice(Integer noticeId, Notice notice) {
        Notice newNotice = new Notice();
        newNotice.setId(noticeId);
        newNotice.setContent(notice.getContent());

        noticeMapper.updateByPrimaryKeySelective(newNotice);
    }

    @Transactional(rollbackFor = Exception.class)
    public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {
        updateNotice(noticeId, notice);
        // 寫入事務日誌
        rocketmqTransactionLogMapper.insertSelective(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .log("updateNotice")
                        .build()
        );
    }
}

最後是重構TestTransactionListener,具體代碼以下:

package com.zj.node.contentcenter.rocketmq;

import com.alibaba.fastjson.JSON;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import com.zj.node.contentcenter.service.test.TestProducerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/**
 * 本地事務監聽器
 *
 * @author 01
 * @date 2019-08-08
 **/
@Slf4j
@RequiredArgsConstructor
// 這裏的txProducerGroup須要與配置文件裏配置的一致
@RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")
public class TestTransactionListener implements RocketMQLocalTransactionListener {

    private final TestProducerService service;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    /**
     * 用於執行本地事務的方法
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("執行本地事務方法. 事務id: {}", transactionId);
        Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));
        // 因爲從header裏獲取的對象是json格式因此須要進行轉換
        Notice notice = JSON.parseObject((String) headers.get("notice"), Notice.class);

        try {
            // 執行帶有事務註解的方法
            service.updateNoticeWithRocketMQLog(noticeId, notice, transactionId);
            // 正常執行向MQ Server發送commit消息
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("本地事務方法發生異常,消息將被回滾", e);
            // 發生異常向MQ Server發送rollback消息
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 用於回查本地事務的執行結果
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.warn("回查本地事務狀態. 事務id: {}", transactionId);

        // 按事務id查詢日誌數據
        RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .build()
        );

        // 若是能按事務id查詢出來數據表示本地事務執行成功,沒有數據則表示本地事務執行失敗
        if (transactionLog == null) {
            log.warn("本地事務執行失敗,事務日誌不存在,消息將被回滾. 事務id: {}", transactionId);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

擴展文章:

相關文章
相關標籤/搜索