Spring Cloud Stream是一個用於構建消息驅動的微服務應用的框架,其提供的一系列抽象屏蔽了不一樣類型消息中間件使用上的差別,同時也大大簡化了Spring在整合消息中間件時的使用複雜度。html
Spring Cloud Stream 提供了Binder(負責與消息中間件進行交互)java
# 其餘參數默認配置 spring.rabbitmq.host=你的host
// 該註解表示綁定Sink消息通道 @EnableBinding(Sink.class) public class MsgReceiver { private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class); // 自帶 消費者 @StreamListener(Sink.INPUT) public void receive(Object payload){ logger.info("received: " + payload); } }
public interface MyChannel { String INPUT = "test-input"; String OUTPUT = "test-output"; // 收 @Input(INPUT) SubscribableChannel input(); // 發 @Output(OUTPUT) MessageChannel output(); }
// 綁定自定義消息通道 @EnableBinding(MyChannel.class) public class MsgReceiver1 { private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class); // 收 @StreamListener(MyChannel.INPUT) public void receive(Object payload){ logger.info("received1: " + payload + ":" + new Date()); } }
package com.sundown.stream.controller; import com.sundown.stream.bean.ChatMessage; import com.sundown.stream.msg.MyChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.support.MessageBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.GenericMessage; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Random; @RestController public class HelloController { @Autowired MyChannel myChannel; @GetMapping("/hello") public void hello(){ String message = "welcome spring cloud stream"; myChannel.output().send(MessageBuilder.withPayload(message).build()); } }
spring.cloud.stream.bindings.test-input.destination=test-topic spring.cloud.stream.bindings.test-output.destination=test-topic
java -jar stream-0.0.1-SNAPSHOT.jar
和java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081
運行訪問http://localhost:8080/hello
spring.cloud.stream.bindings.test-input.destination=test-topic spring.cloud.stream.bindings.test-output.destination=test-topic spring.cloud.stream.bindings.test-input.group=gg spring.cloud.stream.bindings.test-output.group=gg
spring.cloud.stream.bindings.test-input.destination=test-topic spring.cloud.stream.bindings.test-output.destination=test-topic spring.cloud.stream.bindings.test-input.group=gg spring.cloud.stream.bindings.test-output.group=gg # 開啓消費分區(消費者上配置) spring.cloud.stream.bindings.test-input.consumer.partitioned=true # 消費者實例個數(消費者上配置) spring.cloud.stream.instance-count=2 # 當前實例下標(消費者上配置) spring.cloud.stream.instance-index=0
@RestController public class HelloController { @Autowired MyChannel myChannel; @GetMapping("/hello") public void hello(){ String message = "welcome spring cloud stream"; // 先寫死 int whichPart = 1; System.out.println("發送消息:" + message + ",發往分區:" + whichPart); myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build()); } }
java -jar stream-0.0.1-SNAPSHOT.jar --spring.cloud.stream.instance-index=0
java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081 --spring.cloud.stream.instance-index=0
(別忘了先關閉啓動類 否則打包會報錯)@GetMapping("/hello") public void hello(){ String message = "welcome spring cloud stream"; int whichPart = new Random().nextInt(2); System.out.println("發送消息:" + message + ",發往分區:" + whichPart); myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build()); }
雖然定時任務能夠用cron表達式 可是對於一些特殊的定時任務 可使用stream+rabbitmq更合適 好比幾分鐘後執行
rabbitmq插件安裝web
spring.rabbitmq.host=xxx spring.cloud.stream.bindings.test-input.destination=topic spring.cloud.stream.bindings.test-output.destination=topic spring.cloud.stream.rabbit.bindings.test-input.consumer.delayed-exchange=true spring.cloud.stream.rabbit.bindings.test-output.producer.delayed-exchange=true #spring.cloud.stream.bindings.test-input.destination=test-topic #spring.cloud.stream.bindings.test-output.destination=test-topic # #spring.cloud.stream.bindings.test-input.group=gg #spring.cloud.stream.bindings.test-output.group=gg # ## 開啓消費分區(消費者上配置) #spring.cloud.stream.bindings.test-input.consumer.partitioned=true ## 消費者實例個數(消費者上配置) #spring.cloud.stream.instance-count=2 ## 當前實例下標(消費者上配置) #spring.cloud.stream.instance-index=0 # ## 生產者配置 #spring.cloud.stream.bindings.test-output.producer.partition-key-expression=headers['whichPart'] ## 消費節點數量 #spring.cloud.stream.bindings.test-output.producer.partition-count=2
// 綁定自定義消息通道 @EnableBinding(MyChannel.class) public class MsgReceiver1 { private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class); // 收 @StreamListener(MyChannel.INPUT) public void receive(Object payload){ // 添加日期 一會好對比 logger.info("received1: " + payload + ":" + new Date()); } }
@RestController public class HelloController { private static final Logger logger = LoggerFactory.getLogger(HelloController.class); @Autowired MyChannel myChannel; @GetMapping("/delay") public void delay(){ String message = "welcome spring cloud stream"; logger.info("send msg:" + new Date()); // x-delay --> 延遲3s myChannel.output().send(MessageBuilder.withPayload(message).setHeader("x-delay", 3000).build()); } }
stream自帶的與自定義(添加destination=xxx)之間的相似和區別
解決重複消費 分組(group)
消息分組單個實例訪問(開啓消費分區 實例個數 實例下標 生產者配置 消費節點數)
定時器 rabbitmq相關的插件安裝運行 後端代碼實現(配置delayed-exchange和destination以及controller 發送時添加setHeader("x-delay", 3000) 3s延時)spring