Spring Cloud 集成 RabbitMQ

同步 or 異步

前言:咱們如今有一個用微服務架構模式開發的系統,系統裏有一個商品服務和訂單服務,且它們都是同步通訊的。java

目前咱們商品服務和訂單服務之間的通訊方式是同步的,當業務擴大以後,若是還繼續使用同步的方式進行服務之間的通訊,會使得服務之間的耦合增大。例如咱們登陸操做可能須要同步調用用戶服務、積分服務、短信服務等等,而服務之間可能又依賴別的服務,那麼這樣一個登陸過程就會耗費很多的時間,以至用戶的體驗下降。web

那咱們在微服務架構下要如何對服務之間的通訊進行解耦呢?這就須要使用到消息中間件了,消息中間件能夠幫助咱們將同步的通訊轉化爲異步通訊,服務之間只須要對消息隊列進行消息的發佈、訂閱便可,從而解耦服務之間的通訊依賴。spring

目前較爲主流的消息中間件:數據庫

  • RabbitMQ
  • Kafka
  • ActiveMQ

異步通訊特色:json

  • 客戶端請求不會阻塞進程,服務端的響應能夠是非即時的

異步的常見形態:設計模式

  • 推送通知
  • 請求/異步響應
  • 消息隊列

MQ應用場景:bash

  • 異步處理
  • 流量削峯
  • 日誌處理
  • 應用解耦

更多關於消息中間件的描述,能夠參考我另外一篇文章:架構


RabbitMQ的基本使用(上)

在上文 Spring Cloud Config - 統一配置中心 中,已經演示過使用Docker安裝RabbitMQ,因此這裏就再也不浪費篇幅演示了。app

直接進入正題,咱們以訂單服務和商品服務示例,首先在訂單服務的項目中,加入mq的依賴:框架

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置文件中增長RabbitMQ的相關配置項:
Spring Cloud 集成 RabbitMQ

到訂單服務的項目中,新建一個message包,在該包中建立一個MqReceiver類,咱們來看看RabbitMQ的基本操做。代碼以下:

package org.zero.springcloud.order.server.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @program: sell_order
 * @description: 接收消息,即消費者
 * @author: 01
 * @create: 2018-08-21 22:24
 **/
@Slf4j
@Component
public class MqReceiver {

    /**
     * 接收消息並打印
     *
     * @param message message
     */
    @RabbitListener(queues = "myQueue")
    public void process(String message) {
        // @RabbitListener註解用於監聽RabbitMQ,queues指定監聽哪一個隊列
        log.info(message);
    }
}

由於RabbitMQ上尚未myQueue這個隊列,因此咱們還獲得RabbitMQ的管理界面上,建立這個隊列,以下:
Spring Cloud 集成 RabbitMQ

而後新建一個測試類,用於發送消息到隊列中,代碼以下:

package org.zero.springcloud.order.server;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @program: sell_order
 * @description: 發送消息,即消息發佈者
 * @author: 01
 * @create: 2018-08-21 22:28
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqSenderTest {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void send() {
        for (int i = 0; i < 100; i++) {
            amqpTemplate.convertAndSend("myQueue", "第" + i + "條消息");
        }
    }
}

運行該測試類,運行成功後到OrderApplication的控制檯上,看看是否接收並打印了接收到的消息。正常狀況應以下:
Spring Cloud 集成 RabbitMQ

基本的消費者和發佈者的代碼咱們都已經編寫過,而且也測試成功了。但有個小問題,咱們要監聽一個不存在的隊列時,須要手動去新建這個隊列,感受每次都手動新建挺麻煩的。有沒有辦法當隊列不存在時,自動建立該隊列呢?答案是有的,依舊使用以前的那個註解,只不過此次的參數要換成queuesToDeclare。示例代碼以下:

package org.zero.springcloud.order.server.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @program: sell_order
 * @description: 接收消息,即消費者
 * @author: 01
 * @create: 2018-08-21 22:24
 **/
@Slf4j
@Component
public class MqReceiver {

    /**
     * 接收並打印消息
     * 能夠當隊列不存在時自動建立隊列
     *
     * @param message message
     */
    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    public void process2(String message) {
        // @RabbitListener註解用於監聽RabbitMQ,queuesToDeclare能夠建立指定的隊列
        log.info(message);
    }
}

RabbitMQ的基本使用(下)

以上咱們經過示例簡單的介紹了消息的收發及隊列的建立,本小節則介紹一下exchange 的自動綁定方式。當須要自動綁定 exchange 時,咱們也能夠經過 bindings 參數完成。示例代碼以下:

package org.zero.springcloud.order.server.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @program: sell_order
 * @description: 接收消息,即消費者
 * @author: 01
 * @create: 2018-08-21 22:24
 **/
@Slf4j
@Component
public class MqReceiver {

    /**
     * 接收並打印消息
     * 能夠當隊列不存在時自動建立隊列,以及自動綁定指定的Exchange
     * @param message message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("myQueue"),
            exchange = @Exchange("myExchange")
    ))
    public void process3(String message) {
        // @RabbitListener註解用於監聽RabbitMQ,bindings能夠建立指定的隊列及自動綁定Exchange
        log.info(message);
    }
}

消息分組咱們也是能夠經過 bindings 參數完成,例如如今有一個數碼供應商服務和一個水果供應商服務,它們都監聽着同一個訂單服務的消息隊列。但我但願數碼訂單的消息被數碼供應商服務消費,而水果訂單的消息被水果供應商服務消費。因此咱們就須要用到消息分組。示例代碼以下:

package org.zero.springcloud.order.server.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @program: sell_order
 * @description: 接收消息,即消費者
 * @author: 01
 * @create: 2018-08-21 22:24
 **/
@Slf4j
@Component
public class MqReceiver {

    /**
     * 數碼供應商服務 - 接收消息
     *
     * @param message message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("computerOrder"),
            exchange = @Exchange("myOrder"),
            key = "computer"  // 指定路由的key
    ))
    public void processComputer(String message) {
        log.info("computer message : {}", message);
    }

    /**
     * 水果供應商服務 - 接收消息
     *
     * @param message message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("computerOrder"),
            exchange = @Exchange("myOrder"),
            key = "fruit"  // 指定路由的key
    ))
    public void processFruit(String message) {
        log.info("fruit message : {}", message);
    }
}

測試代碼以下,經過指定key進行消息的分組,將消息發送到數碼供應商服務:

package org.zero.springcloud.order.server;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @program: sell_order
 * @description: 發送消息,即消息發佈者
 * @author: 01
 * @create: 2018-08-21 22:28
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqSenderTest {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void sendOrder() {
        for (int i = 0; i < 100; i++) {
            // 第一個參數指定隊列,第二個參數來指定路由的key,第三個參數指定消息
            amqpTemplate.convertAndSend("myOrder", "computer", "第" + i + "條消息");
        }
    }
}

重啓項目後,運行以上測試代碼,控制檯輸出以下,能夠看到只有數碼供應商服務纔可以接收到消息,而水果供應商服務是接收不到的。這就完成了消息分組:
Spring Cloud 集成 RabbitMQ


Spring Cloud Stream的使用(上)

Spring Cloud Stream 是一個用來爲微服務應用構建消息驅動能力的框架。它能夠基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程序。他經過使用Spring Integration來鏈接消息代理中間件以實現消息事件驅動。Spring Cloud Stream 爲一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發佈-訂閱、消費組、分區的三個核心概念。目前僅支持RabbitMQ、Kafka。

什麼是Spring Integration ? Integration 集成

企業應用集成(EAI)是集成應用之間數據和服務的一種應用技術。四種集成風格:

  1. 文件傳輸:兩個系統生成文件,文件的有效負載就是由另外一個系統處理的消息。該類風格的例子之一是針對文件輪詢目錄或FTP目錄,並處理該文件。
  2. 共享數據庫:兩個系統查詢同一個數據庫以獲取要傳遞的數據。一個例子是你部署了兩個EAR應用,它們的實體類(JPA、Hibernate等)共用同一個表。
  3. 遠程過程調用:兩個系統都暴露另外一個能調用的服務。該類例子有EJB服務,或SOAP和REST服務。
  4. 消息:兩個系統鏈接到一個公用的消息系統,互相交換數據,並利用消息調用行爲。該風格的例子就是衆所周知的中心輻射式的(hub-and-spoke)JMS架構。

Spring Integration做爲一種企業級集成框架,聽從現代經典書籍《企業集成模式》,爲開發者提供了一種便捷的實現模式。Spring Integration構建在Spring控制反轉設計模式之上,抽象了消息源和目標,利用消息傳送和消息操做來集成應用環境下的各類組件。消息和集成關注點都被框架處理,因此業務組件能更好地與基礎設施隔離,從而下降開發者所要面對的複雜的集成職責。

模型圖:
Spring Cloud 集成 RabbitMQ

如今咱們來看看Spring Cloud Stream的基本使用,到訂單服務項目上,增長以下依賴:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

而後是在配置文件中,配置rabbitmq的相關信息,只不過咱們以前已經配置過了因此不用配置了。

咱們來看看如何使用Spring Cloud Stream發送和接收消息,首先建立一個接口,定義input和output方法。代碼以下:

package org.zero.springcloud.order.server.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface StreamClient {

    // 接收消息、入口
    @Input("myMessageInput")
    SubscribableChannel input();

    // 發送消息、
    @Output("myMessageOutput")
    MessageChannel output();
}

建立一個消息接收者。代碼以下:

package org.zero.springcloud.order.server.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * @program: sell_order
 * @description: 消息接收者
 * @author: 01
 * @create: 2018-08-22 22:16
 **/
@Slf4j
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {

    @StreamListener("myMessageOutput")
    public void process(String message) {
        log.info("message : {}", message);
    }
}

消息發送者,這裏做爲一個Controller存在。代碼以下:

package org.zero.springcloud.order.server.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.zero.springcloud.order.server.message.StreamClient;

/**
 * @program: sell_order
 * @description: 消息發送者
 * @author: 01
 * @create: 2018-08-22 22:18
 **/
@RestController
public class SendMessageController {

    private final StreamClient streamClient;

    @Autowired
    public SendMessageController(StreamClient streamClient) {
        this.streamClient = streamClient;
    }

    @GetMapping("/send/msg")
    public void send() {
        for (int i = 0; i < 100; i++) {
            MessageBuilder<String> messageBuilder = MessageBuilder.withPayload("這是第" + i + "條消息");
            streamClient.output().send(messageBuilder.build());
        }
    }
}

由於咱們的微服務可能會部署多個實例,如有多個實例須要對消息進行分組,不然全部的服務實例都會接收到相同的消息。在配置文件中,增長以下配置完成消息的分組:

spring:
  ...
  cloud:
    ...
    stream:
      bindings:
        myMessageOutput:
          group: order
...

重啓項目,訪問http://localhost:9080/send/msg,控制檯輸出以下:
Spring Cloud 集成 RabbitMQ

注:Spring Cloud Stream能夠在項目啓動的時候自動建立隊列,在項目關閉的時候自動刪除隊列


Spring Cloud Stream的使用(下)

在實際的開發中,咱們通常發送的消息一般會是一個java對象而不是字符串。因此咱們來看看如何發送對象,其實和發送字符串幾乎是同樣的。消息發送者代碼以下:

package org.zero.springcloud.order.server.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.zero.springcloud.order.server.dto.OrderDTO;
import org.zero.springcloud.order.server.message.StreamClient;

/**
 * @program: sell_order
 * @description: 消息發送者
 * @author: 01
 * @create: 2018-08-22 22:18
 **/
@RestController
public class SendMessageController {

    private final StreamClient streamClient;

    @Autowired
    public SendMessageController(StreamClient streamClient) {
        this.streamClient = streamClient;
    }

    /**
     * 發送OrderDTO對象
     */
    @GetMapping("/send/msg")
    public void send() {
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setOrderId("123465");

        MessageBuilder<OrderDTO> messageBuilder = MessageBuilder.withPayload(orderDTO);
        streamClient.output().send(messageBuilder.build());
    }
}

消息接收者也只須要在方法參數上聲明這個對象的類型便可。代碼以下:

package org.zero.springcloud.order.server.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
import org.zero.springcloud.order.server.dto.OrderDTO;

/**
 * @program: sell_order
 * @description: 消息接收者
 * @author: 01
 * @create: 2018-08-22 22:16
 **/
@Slf4j
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {

    /**
     * 接收OrderDTO對象
     * @param message message
     */
    @StreamListener("myMessageOutput")
    public void process(OrderDTO message) {
        log.info("message : {}", message);
    }
}

另外須要提到的一點是,默認狀況下,java對象在消息隊列中是以base64編碼存在的,咱們也都知道base64不可讀。爲了方便查看堆積在消息隊列裏的對象數據,咱們但願java對象是以json格式的字符串呈現,這樣就方便咱們人類閱讀。至於這個問題,咱們只須要在配置文件中,增長一段content-type的配置便可。以下:

spring:
  ...
  cloud:
    ...
    stream:
      bindings:
        myMessageOutput:
          group: order
          content-type: application/json
...

重啓項目,訪問http://localhost:9080/send/msg,控制檯輸出以下:

2018-08-22 23:32:33.704  INFO 12436 --- [nio-9080-exec-4] o.z.s.o.server.message.StreamReceiver    : message : OrderDTO(orderId=123465, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=null, orderStatus=null, payStatus=null, createTime=null, updateTime=null, orderDetailList=null)

當咱們接收到消息的時候,可能會須要返回一段特定的消息,表示消息已收到之類的。至於這個功能,咱們經過@SendTo註解便可完成。代碼以下:

package org.zero.springcloud.order.server.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import org.zero.springcloud.order.server.dto.OrderDTO;

/**
 * @program: sell_order
 * @description: 消息接收者
 * @author: 01
 * @create: 2018-08-22 22:16
 **/
@Slf4j
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {

    /**
     * 接收OrderDTO對象
     * @param message message
     */
    @StreamListener("myMessageOutput")
    @SendTo("myMessageInput")
    public String process(OrderDTO message) {
        log.info("message : {}", message);

        return "success";
    }

    @StreamListener("myMessageInput")
    public void success(String message) {
        log.info("message : {}", message);
    }
}

重啓項目,訪問http://localhost:9080/send/msg,控制檯輸出以下:
Spring Cloud 集成 RabbitMQ

Spring Cloud Stream 再一次簡化了咱們在分佈式環境下對消息中間件的操做,配置好消息中間件的鏈接地址及用戶密碼後,在開發的過程當中,咱們只須要關注input和output,對消息中間件的操做基本是無感知的。

相關文章
相關標籤/搜索