Spring Cloud Alibaba 實戰(九) - Spring Cloud Stream

1 定義

  • 一個用於構建消息驅動的微服務的框架

用人話說就是 : 致力於簡化MQ通訊的框架html

2 編程模型

◆ Destination Binder (目標綁定器)spring

  • 與消息中間件通訊的組件

◆ Destination Bindings (目標綁定)
Binding是鏈接應用程序跟消息中間件的橋樑,用於消息的消費和生產,由binder建立
◆ Message(消息)
可見該編程模型異常強大,短短几行代碼,就實現了消息的對接和處理
input/output就是微服務接收和發出消息編程

下面開始對內容中心編碼segmentfault

3 編寫生產者

  • 添加依賴

  • 在啓動類添加註解

  • 寫配置

4 編寫消費者

編碼用戶中心mybatis

  • 添加依賴

  • 啓動類上添加註解

  • 寫配置

5 自定義接口

5.1 發送消息

  • 新建mysource接口

  • 啓動類註解

  • 寫配置,注意要和接口中的名字一致

  • 測試代碼

注意,因爲mybatis會掃描啓動類註解上scan註解所限制路徑下的全部接口,因此一旦有接口未被xml mapper,即拋異常,因此編碼時必須將掃描註解範圍限定死在mapper包下!

5.2 消費消息

用戶中心編碼app

  • 寫接口

  • 添註解

  • 加配置


透過現象看本質

當咱們定義好Source/Sink接口後,在啓動類使用EnableBinding指定了接口後,就會使用IOC建立對應名字的代理類,因此配置文件中也必須同名框架

消息過濾

  • 推薦閱讀

Spring Cloud Stream實現消息過濾消費分佈式

監控

記得多看端點哦!
output/input其實就是一個channel
微服務

排錯依據的重要端點

  • /actuator/bindings
  • /actuator/channels
  • /actuator/health

異常處理

  • 推薦閱讀

Spring Cloud Stream錯誤處理詳解測試

整合RocketMQ實現分佈式事務

Stream自己並未考慮分佈式事務問題,都是RocketMQ的能力

重構生產者

對內容中心一頓操做:刪除沒必要要代碼

  • 自定義的MySource接口,由於Spring內置的就已經知足咱們的需求了

  • 接着別忘了刪除啓動類中對他的引用
  • 刪除TestController中對應測試代碼
  • 清理yml中Spring消息編程模型整合RocketMQ的部分

  • myoutput刪除

  • 修正以下

代碼重構

改造ShareService

  • 即改造如下代碼(直接刪除)

  • 添加Source

  • 開始使用source發送消息,可是send只能直接發送消息(或者帶有超時)


而咱們以前使用rocketmqtemplate傳遞參數時能夠帶個arg

那如今咱們該怎麼傳arg呢???
記得前面埋下的伏筆,header也是頗有用處的!
咱們能夠將要傳的參數放入header中,以下:

rocketmqtemplate功成身退,咱們可使用stream編程模型徹底替代了

改造AddBonusTransactionListener

  • 如今這裏的arg是null了

  • 須要從header中獲取arg了(有坑,後面再說),在這裏打個斷點

  • 完善配置(IDEA沒法識別,但確實會生效),實現事務功能

  • 注意上面的group名稱要與下一致

  • 啓動內容中心
  • 發送請求

  • 發現dto實際上是字符串,並非DTO對象

  • 因此繼續修正代碼


  • 這樣就是正常的對象了


由於從header中獲取的都是字符串哦!切記!

重構消費者

對用戶中心刪除沒必要要代碼,與內容中心相似,再也不詳述

  • 刪除

  • MyTestStreamConsumer改成AddBonusStreamConsumer

重構以下


總結

  • 推薦閱讀

Spring Cloud Stream知識點盤點

參考

本文由博客一文多發平臺 OpenWrite 發佈!
相關文章
相關標籤/搜索