前言:咱們如今有一個用微服務架構模式開發的系統,系統裏有一個商品服務和訂單服務,且它們都是同步通訊的。java
目前咱們商品服務和訂單服務之間的通訊方式是同步的,當業務擴大以後,若是還繼續使用同步的方式進行服務之間的通訊,會使得服務之間的耦合增大。例如咱們登陸操做可能須要同步調用用戶服務、積分服務、短信服務等等,而服務之間可能又依賴別的服務,那麼這樣一個登陸過程就會耗費很多的時間,以至用戶的體驗下降。web
那咱們在微服務架構下要如何對服務之間的通訊進行解耦呢?這就須要使用到消息中間件了,消息中間件能夠幫助咱們將同步的通訊轉化爲異步通訊,服務之間只須要對消息隊列進行消息的發佈、訂閱便可,從而解耦服務之間的通訊依賴。spring
目前較爲主流的消息中間件:數據庫
異步通訊特色:json
異步的常見形態:設計模式
MQ應用場景:bash
更多關於消息中間件的描述,能夠參考我另外一篇文章:架構
在上文 Spring Cloud Config - 統一配置中心 中,已經演示過使用Docker安裝RabbitMQ,因此這裏就再也不浪費篇幅演示了。app
直接進入正題,咱們以訂單服務和商品服務示例,首先在訂單服務的項目中,加入mq的依賴:框架
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在配置文件中增長RabbitMQ的相關配置項:
到訂單服務的項目中,新建一個message包,在該包中建立一個MqReceiver類,咱們來看看RabbitMQ的基本操做。代碼以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 接收消息,即消費者 * @author: 01 * @create: 2018-08-21 22:24 **/ @Slf4j @Component public class MqReceiver { /** * 接收消息並打印 * * @param message message */ @RabbitListener(queues = "myQueue") public void process(String message) { // @RabbitListener註解用於監聽RabbitMQ,queues指定監聽哪一個隊列 log.info(message); } }
由於RabbitMQ上尚未myQueue這個隊列,因此咱們還獲得RabbitMQ的管理界面上,建立這個隊列,以下:
而後新建一個測試類,用於發送消息到隊列中,代碼以下:
package org.zero.springcloud.order.server; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @program: sell_order * @description: 發送消息,即消息發佈者 * @author: 01 * @create: 2018-08-21 22:28 **/ @RunWith(SpringRunner.class) @SpringBootTest public class MqSenderTest { @Autowired private AmqpTemplate amqpTemplate; @Test public void send() { for (int i = 0; i < 100; i++) { amqpTemplate.convertAndSend("myQueue", "第" + i + "條消息"); } } }
運行該測試類,運行成功後到OrderApplication的控制檯上,看看是否接收並打印了接收到的消息。正常狀況應以下:
基本的消費者和發佈者的代碼咱們都已經編寫過,而且也測試成功了。但有個小問題,咱們要監聽一個不存在的隊列時,須要手動去新建這個隊列,感受每次都手動新建挺麻煩的。有沒有辦法當隊列不存在時,自動建立該隊列呢?答案是有的,依舊使用以前的那個註解,只不過此次的參數要換成queuesToDeclare
。示例代碼以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 接收消息,即消費者 * @author: 01 * @create: 2018-08-21 22:24 **/ @Slf4j @Component public class MqReceiver { /** * 接收並打印消息 * 能夠當隊列不存在時自動建立隊列 * * @param message message */ @RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process2(String message) { // @RabbitListener註解用於監聽RabbitMQ,queuesToDeclare能夠建立指定的隊列 log.info(message); } }
以上咱們經過示例簡單的介紹了消息的收發及隊列的建立,本小節則介紹一下exchange 的自動綁定方式。當須要自動綁定 exchange 時,咱們也能夠經過 bindings 參數完成。示例代碼以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 接收消息,即消費者 * @author: 01 * @create: 2018-08-21 22:24 **/ @Slf4j @Component public class MqReceiver { /** * 接收並打印消息 * 能夠當隊列不存在時自動建立隊列,以及自動綁定指定的Exchange * @param message message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("myQueue"), exchange = @Exchange("myExchange") )) public void process3(String message) { // @RabbitListener註解用於監聽RabbitMQ,bindings能夠建立指定的隊列及自動綁定Exchange log.info(message); } }
消息分組咱們也是能夠經過 bindings 參數完成,例如如今有一個數碼供應商服務和一個水果供應商服務,它們都監聽着同一個訂單服務的消息隊列。但我但願數碼訂單的消息被數碼供應商服務消費,而水果訂單的消息被水果供應商服務消費。因此咱們就須要用到消息分組。示例代碼以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 接收消息,即消費者 * @author: 01 * @create: 2018-08-21 22:24 **/ @Slf4j @Component public class MqReceiver { /** * 數碼供應商服務 - 接收消息 * * @param message message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("computerOrder"), exchange = @Exchange("myOrder"), key = "computer" // 指定路由的key )) public void processComputer(String message) { log.info("computer message : {}", message); } /** * 水果供應商服務 - 接收消息 * * @param message message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("computerOrder"), exchange = @Exchange("myOrder"), key = "fruit" // 指定路由的key )) public void processFruit(String message) { log.info("fruit message : {}", message); } }
測試代碼以下,經過指定key進行消息的分組,將消息發送到數碼供應商服務:
package org.zero.springcloud.order.server; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @program: sell_order * @description: 發送消息,即消息發佈者 * @author: 01 * @create: 2018-08-21 22:28 **/ @RunWith(SpringRunner.class) @SpringBootTest public class MqSenderTest { @Autowired private AmqpTemplate amqpTemplate; @Test public void sendOrder() { for (int i = 0; i < 100; i++) { // 第一個參數指定隊列,第二個參數來指定路由的key,第三個參數指定消息 amqpTemplate.convertAndSend("myOrder", "computer", "第" + i + "條消息"); } } }
重啓項目後,運行以上測試代碼,控制檯輸出以下,能夠看到只有數碼供應商服務纔可以接收到消息,而水果供應商服務是接收不到的。這就完成了消息分組:
Spring Cloud Stream 是一個用來爲微服務應用構建消息驅動能力的框架。它能夠基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程序。他經過使用Spring Integration來鏈接消息代理中間件以實現消息事件驅動。Spring Cloud Stream 爲一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發佈-訂閱、消費組、分區的三個核心概念。目前僅支持RabbitMQ、Kafka。
什麼是Spring Integration ? Integration 集成
企業應用集成(EAI)是集成應用之間數據和服務的一種應用技術。四種集成風格:
Spring Integration做爲一種企業級集成框架,聽從現代經典書籍《企業集成模式》,爲開發者提供了一種便捷的實現模式。Spring Integration構建在Spring控制反轉設計模式之上,抽象了消息源和目標,利用消息傳送和消息操做來集成應用環境下的各類組件。消息和集成關注點都被框架處理,因此業務組件能更好地與基礎設施隔離,從而下降開發者所要面對的複雜的集成職責。
模型圖:
如今咱們來看看Spring Cloud Stream的基本使用,到訂單服務項目上,增長以下依賴:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
而後是在配置文件中,配置rabbitmq的相關信息,只不過咱們以前已經配置過了因此不用配置了。
咱們來看看如何使用Spring Cloud Stream發送和接收消息,首先建立一個接口,定義input和output方法。代碼以下:
package org.zero.springcloud.order.server.message; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface StreamClient { // 接收消息、入口 @Input("myMessageInput") SubscribableChannel input(); // 發送消息、 @Output("myMessageOutput") MessageChannel output(); }
建立一個消息接收者。代碼以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 消息接收者 * @author: 01 * @create: 2018-08-22 22:16 **/ @Slf4j @Component @EnableBinding(StreamClient.class) public class StreamReceiver { @StreamListener("myMessageOutput") public void process(String message) { log.info("message : {}", message); } }
消息發送者,這裏做爲一個Controller存在。代碼以下:
package org.zero.springcloud.order.server.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.zero.springcloud.order.server.message.StreamClient; /** * @program: sell_order * @description: 消息發送者 * @author: 01 * @create: 2018-08-22 22:18 **/ @RestController public class SendMessageController { private final StreamClient streamClient; @Autowired public SendMessageController(StreamClient streamClient) { this.streamClient = streamClient; } @GetMapping("/send/msg") public void send() { for (int i = 0; i < 100; i++) { MessageBuilder<String> messageBuilder = MessageBuilder.withPayload("這是第" + i + "條消息"); streamClient.output().send(messageBuilder.build()); } } }
由於咱們的微服務可能會部署多個實例,如有多個實例須要對消息進行分組,不然全部的服務實例都會接收到相同的消息。在配置文件中,增長以下配置完成消息的分組:
spring: ... cloud: ... stream: bindings: myMessageOutput: group: order ...
重啓項目,訪問http://localhost:9080/send/msg
,控制檯輸出以下:
注:Spring Cloud Stream能夠在項目啓動的時候自動建立隊列,在項目關閉的時候自動刪除隊列
在實際的開發中,咱們通常發送的消息一般會是一個java對象而不是字符串。因此咱們來看看如何發送對象,其實和發送字符串幾乎是同樣的。消息發送者代碼以下:
package org.zero.springcloud.order.server.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.zero.springcloud.order.server.dto.OrderDTO; import org.zero.springcloud.order.server.message.StreamClient; /** * @program: sell_order * @description: 消息發送者 * @author: 01 * @create: 2018-08-22 22:18 **/ @RestController public class SendMessageController { private final StreamClient streamClient; @Autowired public SendMessageController(StreamClient streamClient) { this.streamClient = streamClient; } /** * 發送OrderDTO對象 */ @GetMapping("/send/msg") public void send() { OrderDTO orderDTO = new OrderDTO(); orderDTO.setOrderId("123465"); MessageBuilder<OrderDTO> messageBuilder = MessageBuilder.withPayload(orderDTO); streamClient.output().send(messageBuilder.build()); } }
消息接收者也只須要在方法參數上聲明這個對象的類型便可。代碼以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; import org.zero.springcloud.order.server.dto.OrderDTO; /** * @program: sell_order * @description: 消息接收者 * @author: 01 * @create: 2018-08-22 22:16 **/ @Slf4j @Component @EnableBinding(StreamClient.class) public class StreamReceiver { /** * 接收OrderDTO對象 * @param message message */ @StreamListener("myMessageOutput") public void process(OrderDTO message) { log.info("message : {}", message); } }
另外須要提到的一點是,默認狀況下,java對象在消息隊列中是以base64編碼存在的,咱們也都知道base64不可讀。爲了方便查看堆積在消息隊列裏的對象數據,咱們但願java對象是以json格式的字符串呈現,這樣就方便咱們人類閱讀。至於這個問題,咱們只須要在配置文件中,增長一段content-type的配置便可。以下:
spring: ... cloud: ... stream: bindings: myMessageOutput: group: order content-type: application/json ...
重啓項目,訪問http://localhost:9080/send/msg
,控制檯輸出以下:
2018-08-22 23:32:33.704 INFO 12436 --- [nio-9080-exec-4] o.z.s.o.server.message.StreamReceiver : message : OrderDTO(orderId=123465, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=null, orderStatus=null, payStatus=null, createTime=null, updateTime=null, orderDetailList=null)
當咱們接收到消息的時候,可能會須要返回一段特定的消息,表示消息已收到之類的。至於這個功能,咱們經過@SendTo
註解便可完成。代碼以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; import org.zero.springcloud.order.server.dto.OrderDTO; /** * @program: sell_order * @description: 消息接收者 * @author: 01 * @create: 2018-08-22 22:16 **/ @Slf4j @Component @EnableBinding(StreamClient.class) public class StreamReceiver { /** * 接收OrderDTO對象 * @param message message */ @StreamListener("myMessageOutput") @SendTo("myMessageInput") public String process(OrderDTO message) { log.info("message : {}", message); return "success"; } @StreamListener("myMessageInput") public void success(String message) { log.info("message : {}", message); } }
重啓項目,訪問http://localhost:9080/send/msg
,控制檯輸出以下:
Spring Cloud Stream 再一次簡化了咱們在分佈式環境下對消息中間件的操做,配置好消息中間件的鏈接地址及用戶密碼後,在開發的過程當中,咱們只須要關注input和output,對消息中間件的操做基本是無感知的。