SpringCloud Stream是一個用於構建消息驅動的微服務應用框架。它經過注入,輸入、輸出通道來與外界通訊;所以它很容易實現消息的中轉,而且在更換消息中間件的時候不須要該代碼,僅須要修改配置便可。支持的消息中間件如RabbitMQ、Kafka等等。spring
一、增長maven依賴app
1 <dependency> 2 <groupId>org.springframework.cloud</groupId> 3 <artifactId>spring-cloud-stream</artifactId> 4 </dependency> 5 <dependency> 6 <groupId>org.springframework.cloud</groupId> 7 <artifactId>spring-cloud-stream-binder-rabbit</artifactId> 8 </dependency> 9 <dependency> 10 <groupId>org.springframework.cloud</groupId> 11 <artifactId>spring-cloud-stream-test-support</artifactId> 12 <scope>test</scope> 13 </dependency>
二、增長properties配置框架
1 spring.application.name=stream 2 server.port=7070 3 4 # rabbitmq 5 spring.rabbitmq.host=localhost 6 spring.rabbitmq.port=5672 7 spring.rabbitmq.username=guest 8 spring.rabbitmq.password=guest 9 10 # stream 11 spring.cloud.stream.bindings.input.destination=customer 12 spring.cloud.stream.bindings.output.destination=customer
三、啓動類加上本工程的消息代理類型maven
@EnableBinding({Processor.class})
@EnableBinding分爲三種類型微服務
)org.springframework.cloud.stream.messaging.Processor:接收和發送消息ui
)org.springframework.cloud.stream.messaging.Source:僅支持發送消息spa
)org.springframework.cloud.stream.messaging.Sink:僅支持接收消息代理
四、加上Controller及Servicecode
1 @RestController 2 @AllArgsConstructor 3 public class ProcessorController { 4 5 private final ProcessorService processorService; 6 7 @GetMapping("/testProcessor/{message}") 8 public boolean testProcessor(@PathVariable("message") String message) { 9 return processorService.send(message); 10 } 11 }
1 @Service 2 @AllArgsConstructor 3 public class ProcessorService { 4 5 private final Processor processor; 6 7 public boolean send(String message) { 8 return processor.output().send(MessageBuilder.withPayload(message).build()); 9 } 10 11 public boolean subscribe(MessageHandler handler) { 12 return processor.input().subscribe(handler); 13 } 14 }
五、在任意bean下寫上接收邏輯或另起一個工程(另外一個工程的mq須要配成一個哦)server
1 @StreamListener(Sink.INPUT) 2 public void receive(String message) { 3 System.err.println("receive message: " + message); 4 }
而後咱們啓動項目,訪問http://localhost:7070/testProcessor/hello,此時就會在控制檯看到receive message: hello的字樣。
1 @GetMapping("/testMessageShunt/{type}") 2 public boolean testMessageShunt(@PathVariable("type") String type) { 3 String header = "a".equalsIgnoreCase(type) ? "msg1" : "msg2"; 4 return processorService.send(type, header); 5 }
1 /** 2 * RabbitMQ不支持消息分流 3 */ 4 @StreamListener(value = Sink.INPUT, condition = "headers['contentType']=='mgs1'") 5 public void receiveMessage1(@Payload Message<String> message) { 6 System.err.println("receive message1: " + message.getPayload()); 7 } 8 9 /** 10 * RabbitMQ不支持消息分流 11 */ 12 @StreamListener(value = Sink.INPUT, condition = "headers['contentType']=='mgs2'") 13 public void receiveMessage2(@Payload Message<String> message) { 14 System.err.println("receive message2: " + message.getPayload()); 15 }