SpringCloud學習筆記(9、SpringCloud Stream)

目錄:

  • 什麼是SpringCloud Stream
  • 如何使用SpringCloud Stream
  • 消息分流

什麼是SpringCloud Stream:

SpringCloud Stream是一個用於構建消息驅動的微服務應用框架。它經過注入,輸入、輸出通道來與外界通訊;所以它很容易實現消息的中轉,而且在更換消息中間件的時候不須要該代碼,僅須要修改配置便可。支持的消息中間件如RabbitMQ、Kafka等等。spring

如何使用SpringCloud Stream(以RabbitMQ爲例):

一、增長maven依賴app

 1 <dependency>
 2     <groupId>org.springframework.cloud</groupId>
 3     <artifactId>spring-cloud-stream</artifactId>
 4 </dependency>
 5 <dependency>
 6     <groupId>org.springframework.cloud</groupId>
 7     <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
 8 </dependency>
 9 <dependency>
10     <groupId>org.springframework.cloud</groupId>
11     <artifactId>spring-cloud-stream-test-support</artifactId>
12     <scope>test</scope>
13 </dependency>

二、增長properties配置框架

 1 spring.application.name=stream
 2 server.port=7070
 3 
 4 # rabbitmq
 5 spring.rabbitmq.host=localhost
 6 spring.rabbitmq.port=5672
 7 spring.rabbitmq.username=guest
 8 spring.rabbitmq.password=guest
 9 
10 # stream
11 spring.cloud.stream.bindings.input.destination=customer
12 spring.cloud.stream.bindings.output.destination=customer

三、啓動類加上本工程的消息代理類型maven

@EnableBinding({Processor.class})

@EnableBinding分爲三種類型微服務

)org.springframework.cloud.stream.messaging.Processor:接收和發送消息ui

)org.springframework.cloud.stream.messaging.Source:僅支持發送消息spa

)org.springframework.cloud.stream.messaging.Sink:僅支持接收消息代理

四、加上Controller及Servicecode

 1 @RestController
 2 @AllArgsConstructor
 3 public class ProcessorController {
 4 
 5     private final ProcessorService processorService;
 6 
 7     @GetMapping("/testProcessor/{message}")
 8     public boolean testProcessor(@PathVariable("message") String message) {
 9         return processorService.send(message);
10     }
11 }
 1 @Service
 2 @AllArgsConstructor
 3 public class ProcessorService {
 4 
 5     private final Processor processor;
 6 
 7     public boolean send(String message) {
 8         return processor.output().send(MessageBuilder.withPayload(message).build());
 9     }
10 
11     public boolean subscribe(MessageHandler handler) {
12         return processor.input().subscribe(handler);
13     }
14 }

五、在任意bean下寫上接收邏輯或另起一個工程(另外一個工程的mq須要配成一個哦)server

1 @StreamListener(Sink.INPUT)
2 public void receive(String message) {
3     System.err.println("receive message: " + message);
4 }

而後咱們啓動項目,訪問http://localhost:7070/testProcessor/hello,此時就會在控制檯看到receive message: hello的字樣。

消息分流(kafka特性):

1 @GetMapping("/testMessageShunt/{type}")
2 public boolean testMessageShunt(@PathVariable("type") String type) {
3     String header = "a".equalsIgnoreCase(type) ? "msg1" : "msg2";
4     return processorService.send(type, header);
5 }
 1 /**
 2  * RabbitMQ不支持消息分流
 3  */
 4 @StreamListener(value = Sink.INPUT, condition = "headers['contentType']=='mgs1'")
 5 public void receiveMessage1(@Payload Message<String> message) {
 6     System.err.println("receive message1: " + message.getPayload());
 7 }
 8 
 9 /**
10  * RabbitMQ不支持消息分流
11  */
12 @StreamListener(value = Sink.INPUT, condition = "headers['contentType']=='mgs2'")
13 public void receiveMessage2(@Payload Message<String> message) {
14     System.err.println("receive message2: " + message.getPayload());
15 }
相關文章
相關標籤/搜索