在這裏插入圖片描述html
Spring Cloud Stream 是用於構建與消息傳遞系統的高度可伸縮的事件驅動微服務架構,該框架提供來一個靈活的編程模型,它簡歷在已經創建和熟悉的Spring 熟語和最佳實踐上,包括支持持久化的發佈訂閱、消費組以及消息分區這三個核心概念web
API:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/spring
中文指導手冊:https://m.wang1314.com/doc/webapp/topic/20971999.html編程
標準MQjson
Biinder服務器
INPUT 對應於消費者架構
OUTPUT 對應於生產者app
在沒有綁定器這個概念的狀況下,咱們springBoot 應用要直接與消息中間件進行交互的時候,因爲消息中間件構造不一樣,他們的實現細節上會有較大的差別性,經過定義綁定器做爲中間層,完美地實現了應用與消息中間件細節之間的隔離。經過嚮應用程序暴露統一的Channel 通道,使得應用程序不須要再考慮各類不一樣的消息中間件實現框架
經過綁定器Binder做爲中間層,實現了應用程序與消息中間件的隔離dom
Spring Cloud Stream標準流程套路
在這裏插入圖片描述
<dependencies> <!--stream rabbit --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--eureka client--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--監控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--熱部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency></dependencies>
server: port: 8801
spring: application: name: cloud-stream-provider cloud: stream: binders: #在此處配置要綁定的rabbitmq的服務信息 defaultRabbit: #表示定義的名稱,用於binding整合 type: rabbit #消息組件類型 environment: #設置rabbitmq的相關環境配置 spring: rabbitmq: host: ip端口號 #RabbitMQ在本機的用localhost,在服務器的用服務器的ip地址 port: 5672 username: guest password: guest bindings: #服務的整合處理 output: #這個名字是一個通道的名稱 destination: studyExchange #表示要使用的Exchange名稱定義 content-type: application/json #設置消息類型,本次爲json binder: defaultRabbit #設置要綁定的消息服務的具體設置(爆紅不影響使用,位置沒錯)
eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 #設置心跳的時間間隔(默認是30S) lease-expiration-duration-in-seconds: 5 #若是超過5S間隔就註銷節點 默認是90s instance-id: send-8801.com #在信息列表時顯示主機名稱 prefer-ip-address: true #訪問的路徑變爲IP地址
啓動類
@SpringBootApplicationpublic class StreamMQMain8801 {
public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); }
}
新建service包 --- IMessageProvider接口 與實現類
public interface IMessageProvider { public String send();}
@EnableBinding(Source.class) //定義消息的推送管道(Source是spring的)public class IMessageProviderImpl implements IMessageProvider {
@Resource private MessageChannel output; //消息發送管道
@Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); //MessageBuilder是spring的integration.support.MessageBuilder System.out.println("*******serial: " + serial); return null; }}
@RestControllerpublic class SendMessageController {
@Resource private IMessageProvider iMessageProvider;
@GetMapping("/sendMessage") public String sendMessage(){ return iMessageProvider.send(); }
}
<dependencies> <!--stream rabbit --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--eureka client--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--監控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--熱部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency></dependencies>
server: port: 8802
spring: application: name: cloud-stream-provider cloud: stream: binders: #在此處配置要綁定的rabbitmq的服務信息 defaultRabbit: #表示定義的名稱,用於binding整合 type: rabbit #消息組件類型 environment: #設置rabbitmq的相關環境配置 spring: rabbitmq: host: ip端口 #RabbitMQ在本機的用localhost,在服務器的用服務器的ip地址 port: 5672 username: 用戶名 password: 密碼 bindings: #服務的整合處理 input: #這個名字是一個通道的名稱 destination: studyExchange #表示要使用的Exchange名稱定義 content-type: application/json #設置消息類型,本次爲json,本文要設置爲「text/plain」 binder: defaultRabbit #設置要綁定的消息服務的具體設置(爆紅不影響使用,位置沒錯)
eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 #設置心跳的時間間隔(默認是30S) lease-expiration-duration-in-seconds: 5 #若是超過5S間隔就註銷節點 默認是90s instance-id: receive-8802.com #在信息列表時顯示主機名稱 prefer-ip-address: true #訪問的路徑變爲IP地址
@SpringBootApplicationpublic class StreamMQMain8802 {
public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class, args); }
}
@EnableBinding(Sink.class)@Controllerpublic class ReceiveMessageListenerController {
@Value("${server.port}") private String serverPort;
@StreamListener(Sink.INPUT) //監聽 public void input(Message<String> message){ System.out.println("消費者1號------>收到的消息:" + message.getPayload() + "\t port:" + serverPort); }
}
啓動Eureka 消息驅動生產者,消費者
在這裏插入圖片描述
在這裏插入圖片描述