SpringCloud Stream消息驅動能夠簡化開發人員對消息中間件的使用複雜度,讓系統開發人員更多盡力專一與核心業務邏輯的開發。SpringCloud Stream基於SpringBoot實現,自動配置化的功能能夠幫助咱們快速上手學習,相似與咱們以前學習的orm框架,能夠平滑的切換多種不一樣的數據庫。node
目前SpringCloud Stream 目前只支持 rabbitMQ和kafkaweb
經過定義綁定器做爲中間層,實現了應用程序與消息中間件細節之間的隔離。經過嚮應用程序暴露統一的Channel經過,是的應用程序不須要再考慮各類不一樣的消息中間件的實現。當須要升級消息中間件,或者是更換其餘消息中間件產品時,咱們須要作的就是更換對應的Binder綁定器而不須要修改任何應用邏輯 。spring
在該模型圖上有以下幾個核心概念:數據庫
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <!-- SpringBoot整合Web組件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> </dependencies>
server: port: 9000 spring: application: name: spingcloud-stream-producer # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest
建立管道app
// 建立管道接口 public interface SendMessageInterface { // 建立一個輸出管道,用於發送消息 @Output("my_msg") SubscribableChannel sendMsg(); }
@RestController public class SendMsgController { @Autowired private SendMessageInterface sendMessageInterface; @RequestMapping("/sendMsg") public String sendMsg() { String msg = UUID.randomUUID().toString(); System.out.println("生產者發送內容msg:" + msg); Message build = MessageBuilder.withPayload(msg.getBytes()).build(); sendMessageInterface.sendMsg().send(build); return "success"; } }
@SpringBootApplication @EnableBinding(SendMessageInterface.class) // 開啓綁定 public class AppProducer { public static void main(String[] args) { SpringApplication.run(AppProducer.class, args); } }
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <!-- SpringBoot整合Web組件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> </dependencies>
server: port: 9000 spring: application: name: spingcloud-stream-consumer # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest
public interface RedMsgInterface { // 從管道中獲取消息 @Input("my_msg") SubscribableChannel redMsg(); }
@Component public class Consumer { @StreamListener("my_msg") public void listener(String msg) { System.out.println("消費者獲取生產消息:" + msg); } }
@SpringBootApplication @EnableBinding(RedMsgInterface.class) public class AppConsumer { public static void main(String[] args) { SpringApplication.run(AppConsumer.class, args); } }
在現實的業務場景中,每個微服務應用爲了實現高可用和負載均衡,都會集羣部署,按照上面咱們啓動了兩個應用的實例,消息被重複消費了兩次。爲解決這個問題,Spring Cloud Stream 中提供了消費組,經過配置 spring.cloud.stream.bindings.myInput.group 屬性爲應用指定一個組名,下面修改下配置文件,負載均衡
server: port: 8001 spring: application: name: spring-cloud-stream # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest cloud: stream: bindings: mymsg: ###指定 管道名稱 #指定該應用實例屬於 stream 消費組 group: stream
@Component public class Consumer { @Value("${server.port}") private String serverPort; @StreamListener("my_msg") public void listener(String msg) { System.out.println("消費者獲取生產消息:" + msg + ",端口號:" + serverPort); } }
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <version>2.0.1.RELEASE</version> </dependency>
server: port: 9000 spring: cloud: stream: # 設置成使用kafka kafka: binder: # Kafka的服務端列表,默認localhost brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092 # Kafka服務端鏈接的ZooKeeper節點列表,默認localhost zkNodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181 minPartitionCount: 1 autoCreateTopics: true autoAddPartitions: true
server: port: 8000 spring: application: name: springcloud_kafka_consumer cloud: instance-count: 1 instance-index: 0 stream: kafka: binder: brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092 zk-nodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181 auto-add-partitions: true auto-create-topics: true min-partition-count: 1 bindings: input: destination: my_msg group: s1 consumer: autoCommitOffset: false concurrency: 1 partitioned: false