Spring Cloud Stream是什麼:html
Spring Cloud Stream是Spring Cloud的一個子項目,是一個能讓咱們更加方便操做MQ的框架,其目的用於構建與消息中間件鏈接的高度可伸縮的消息事件驅動的微服務java
簡單來講Spring Cloud Stream就是一個簡化了MQ操做的框架,其架構圖以下:node
Spring Cloud Stream編程模型:web
關於圖中的概念:spring
如今有一個微服務項目:content-center,該微服務做爲生產者,咱們來爲這個微服務集成Spring Cloud Stream,第一步添加stream依賴:apache
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
第二步,在啓動類上添加@EnableBinding
註解,以下:編程
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @EnableBinding(Source.class) ...
第三步,在配置文件中,添加與stream相關的配置項:json
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: # 生產者爲output output: # 用於指定topic destination: stream-test-topic
完成以上步驟後,項目就已經集成了Spring Cloud Stream,如今咱們來使用Spring Cloud Stream編寫生產者,具體代碼以下:架構
package com.zj.node.contentcenter.controller.content; import lombok.RequiredArgsConstructor; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * 生產者 * * @author 01 * @date 2019-08-10 **/ @RestController @RequiredArgsConstructor public class TestProducerController { private final Source source; @GetMapping("/test-stream") public String testStream(){ Message<String> message = MessageBuilder .withPayload("消息體") .build(); source.output() .send(message); return "send message success!"; } }
啓動項目,測試該接口是否能成功執行:app
而後爲另外一個做爲消費者的微服務項目:user-center,集成Spring Cloud Stream,因爲依賴配置是同樣的,這裏就不進行重複了,可是配置和註解裏的類須要更改一下。首先是配置以下:
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: # 消費者爲input input: # 用於指定topic destination: stream-test-topic # rocketmq必須配置group,不然啓動會報錯 # 若是使用的是其餘MQ,則不是必須配置的 group: binder-group
啓動類的註解以下:
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; @EnableBinding(Sink.class) ...
完成集成後,使用Spring Cloud Stream編寫消費者,具體代碼以下:
package com.zj.node.usercenter.rocketmq; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Service; /** * 消費者 * * @author 01 * @date 2019-08-10 **/ @Slf4j @Service public class TestStreamConsumer { @StreamListener(Sink.INPUT) public void receive(String messageBody) { log.info("經過stream收到了消息,messageBody = {}", messageBody); } }
完成代碼的編寫後啓動項目,因爲先前咱們已經經過生產者往RocketMQ投遞了消息,因此此時控制檯會輸出接收到的消息,以下:
經過以上小節的學習,咱們已經瞭解了Spring Cloud Stream的基本使用。從以上示例能夠得知,input用於綁定一個topic消費消息,output則反之,用於綁定一個topic投遞消息。
但在實際的項目中,可能會有多個topic,甚至在極端場景下,不一樣的topic可能使用不一樣的MQ實現,而stream默認提供的input和output都只能綁定一個topic,因此這個時候就須要用到stream的自定義接口來實現多個「input」和「output」綁定不一樣的topic了。
在以上小節的示例中能夠得知,生產者發送消息時使用的是Source
接口裏的output
方法,而消費者發送消息時使用的是Sink
接口裏的input
方法,而且都須要配置到啓動類的@EnableBinding
註解裏。因此實際上咱們須要自定義接口的源碼與這兩個接口的源碼幾乎一致,只是名稱有所不一樣而已,使用上也只是將Source
和Sink
改成自定義的接口便可。
接下來簡單演示一下如何自定義接口並使用,咱們基於上一小節的例子進行改造。首先是生產者,定義一個用於發送消息的接口,具體代碼以下:
package com.zj.node.contentcenter.rocketmq; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定義發送消息接口,與stream默認提供的Source源碼是相似的 * * @author 01 * @date 2019-08-10 **/ public interface MySource { /** * Name of the output channel. */ String MY_OUTPUT = "my-output"; /** * @return output channel */ @Output(MY_OUTPUT) MessageChannel output(); }
而後在啓動類的@EnableBinding
中,添加這個接口:
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @EnableBinding({Source.class, MySource.class}) ...
在配置文件中添加以下配置:
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: # 生產者爲output output: # 用於指定topic destination: stream-test-topic # 自定義的」output「,這裏的名稱須要與MySource接口裏的MY_OUTPUT相對應 my-output: # 綁定不一樣的topic destination: stream-my-topic
修改生產者的代碼以下便可:
package com.zj.node.contentcenter.controller.content; import com.zj.node.contentcenter.rocketmq.MySource; import lombok.RequiredArgsConstructor; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * 生產者 * * @author 01 * @date 2019-08-03 **/ @RestController @RequiredArgsConstructor public class TestProducerController { private final MySource mySource; @GetMapping("/test-stream") public String testStream(){ Message<String> message = MessageBuilder .withPayload("消息體") .build(); mySource.output() .send(message); return "send message success!"; } }
而後啓動項目訪問該接口,測試消息是否能正常發送:
改造完生產者後接着改造消費者,首先定義一個用於消費消息的接口,具體代碼以下:
package com.zj.node.usercenter.rocketmq; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定義消費消息接口,與stream默認提供的Sink源碼是相似的 * * @author 01 * @date 2019-08-10 **/ public interface MySink { /** * Input channel name. */ String MY_INPUT = "my-input"; /** * @return input channel. */ @Input(MY_INPUT) SubscribableChannel input(); }
一樣須要在啓動類的@EnableBinding
中,添加這個接口:
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; @EnableBinding({Sink.class, MySink.class}) ...
在配置文件中添加以下配置:
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: # 消費者爲input input: # 用於指定topic destination: stream-test-topic # rocketmq必須配置group,不然啓動會報錯 # 若是使用的是其餘MQ,則不是必須配置的 group: binder-group # 自定義的」input「,這裏的名稱須要與MySink接口裏的MY_INPUT相對應 my-input: # 綁定不一樣的topic destination: stream-my-topic group: my-group
修改消費者的代碼以下:
package com.zj.node.usercenter.rocketmq; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Service; /** * 消費者 * * @author 01 * @date 2019-08-10 **/ @Slf4j @Service public class TestStreamConsumer { @StreamListener(MySink.MY_INPUT) public void receive(String messageBody) { log.info("自定義接口 - 經過stream收到了消息,messageBody = {}", messageBody); } }
啓動項目,因爲先前咱們已經經過生產者往RocketMQ投遞了消息,因此此時控制檯會輸出接收到的消息,以下:
咱們都知道Spring Boot Actuator組件用於暴露監控端點,不少監控工具都須要依賴該組件的監控端點實現監控。而項目集成了Stream及Actuator後也會暴露相應的監控端點,首先須要在項目裏集成Actuator,添加依賴以下:
<!-- actuator --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
在配置文件中添加以下配置:
management: endpoints: web: exposure: # 暴露全部監控端點 include: '*' endpoint: health: # 顯示健康檢測詳情 show-details: always
訪問http://127.0.0.1:{項目端口}/actuator
能夠獲取全部暴露出來的監控端點,Stream的相關監控端點也在其列,以下圖:
/actuator/bindings
端點能夠用於查看bindings相關信息:
/actuator/channels
端點用於查看channels的相關信息,而「input」和「output」就是所謂的channel,能夠認爲這些channel是topic的抽象:
在/actuator/health
端點中能夠查看binder及RocketMQ的狀態,主要是用於查看MQ的鏈接狀況,若是鏈接不上其status則爲DOWN:
先前在Spring Cloud Alibaba RocketMQ - 構建異步通訊的微服務一文的末尾中,咱們介紹了RocketMQ的事務消息而且也演示瞭如何編碼實現。在本文學習了Spring Cloud Stream以後,咱們來結合Stream對以前實現事務消息的代碼進行重構。
首先修改配置文件以下:
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: output: producer: # 開啓事務消息,這樣經過output這個channel發送的消息都是半消息 transactional: true # 生產者所在的事務組名稱 group: tx-test-producer-group bindings: # 生產者爲output output: # 用於指定topic destination: stream-test-topic
而後重構TestProducerService
,具體代碼以下:
package com.zj.node.contentcenter.service.test; import com.alibaba.fastjson.JSON; import com.zj.node.contentcenter.dao.content.NoticeMapper; import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper; import com.zj.node.contentcenter.domain.entity.content.Notice; import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.UUID; /** * @author 01 * @date 2019-08-08 **/ @Service @RequiredArgsConstructor public class TestProducerService { private final NoticeMapper noticeMapper; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; private final Source source; public String testSendMsg(Notice notice) { // 生成事務id String transactionId = UUID.randomUUID().toString(); // 經過stream發送消息,這裏實際發送的就是半消息 source.output().send( MessageBuilder.withPayload("消息體") // header是消息的頭部分,能夠用做傳參 .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .setHeader("notice_id", notice.getId()) // 對象須要轉換成json,不然默認是調用對象的toString方法轉換爲字符串 .setHeader("notice", JSON.toJSONString(notice)) .build() ); return "send message success"; } @Transactional(rollbackFor = Exception.class) public void updateNotice(Integer noticeId, Notice notice) { Notice newNotice = new Notice(); newNotice.setId(noticeId); newNotice.setContent(notice.getContent()); noticeMapper.updateByPrimaryKeySelective(newNotice); } @Transactional(rollbackFor = Exception.class) public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) { updateNotice(noticeId, notice); // 寫入事務日誌 rocketmqTransactionLogMapper.insertSelective( RocketmqTransactionLog.builder() .transactionId(transactionId) .log("updateNotice") .build() ); } }
最後是重構TestTransactionListener
,具體代碼以下:
package com.zj.node.contentcenter.rocketmq; import com.alibaba.fastjson.JSON; import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper; import com.zj.node.contentcenter.domain.entity.content.Notice; import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog; import com.zj.node.contentcenter.service.test.TestProducerService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; /** * 本地事務監聽器 * * @author 01 * @date 2019-08-08 **/ @Slf4j @RequiredArgsConstructor // 這裏的txProducerGroup須要與配置文件裏配置的一致 @RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group") public class TestTransactionListener implements RocketMQLocalTransactionListener { private final TestProducerService service; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; /** * 用於執行本地事務的方法 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); log.info("執行本地事務方法. 事務id: {}", transactionId); Integer noticeId = Integer.parseInt((String) headers.get("notice_id")); // 因爲從header裏獲取的對象是json格式因此須要進行轉換 Notice notice = JSON.parseObject((String) headers.get("notice"), Notice.class); try { // 執行帶有事務註解的方法 service.updateNoticeWithRocketMQLog(noticeId, notice, transactionId); // 正常執行向MQ Server發送commit消息 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事務方法發生異常,消息將被回滾", e); // 發生異常向MQ Server發送rollback消息 return RocketMQLocalTransactionState.ROLLBACK; } } /** * 用於回查本地事務的執行結果 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); log.warn("回查本地事務狀態. 事務id: {}", transactionId); // 按事務id查詢日誌數據 RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne( RocketmqTransactionLog.builder() .transactionId(transactionId) .build() ); // 若是能按事務id查詢出來數據表示本地事務執行成功,沒有數據則表示本地事務執行失敗 if (transactionLog == null) { log.warn("本地事務執行失敗,事務日誌不存在,消息將被回滾. 事務id: {}", transactionId); return RocketMQLocalTransactionState.ROLLBACK; } return RocketMQLocalTransactionState.COMMIT; } }
擴展文章: