使用 RabbitMQ 實現異步調用

引言

除了上篇文章所講的 ActiveMQ,還有一種流行的開源消息中間件叫 RabbitMQ。和 ActiveMQ 相比,它具備更高的性能。web

RabbitMQ 再也不基於 JMS 規範,也沒有選擇 Java 做爲底層實現語言。 它基於另外一種消息通訊協議,名爲 AMQP,並採用 Erlang 語言做爲技術實現。 RabbitMQ 提供了衆多語言客戶端,可以與 Spring 框架整合,Spring Boot 也提供了對 RabbitMQ 的支持。spring

RabbitMQ 官網: http://www.rabbitmq.comdocker

啓動 RabbitMQ 服務器

運行 rabbitmq 容器

RabbitMQ 官方已經提供了本身的 Docker 容器,先下載 rabbitmq:3-management 鏡像來啓動 RabbitMQ 容器, 之因此選擇這個鏡像是由於它擁有一個 web 控制檯,能夠經過瀏覽器來訪問。shell

docker pull rabbitmq:3-management

RabbitMQ 除了控制檯,還提供了 HTTP API 方式,可方便應用程序使用。瀏覽器

下面使用以下 Docker 命令啓動 RabbitMQ服務器

docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management

在啓動 RabbitMQ 容器時,它對宿主機暴露了兩個端口號架構

  • 15672: 表示RabbitMQ 控制檯端口號,可在瀏覽器中經過控制檯來執行 RabbitMQ 的相關操做
  • 5672 表示 RabbitMQ 監聽的TCP 端口號,應用程序能夠經過該端口號與 RabbitMQ 創建 TCP 鏈接,並完成後續的異步消息通訊

此外,啓動時還有兩個環境變量app

  • RABBITMQ_DEFAULT_USER : 設置控制檯默認用戶名, 默認爲 guest
  • RABBITMQ_DEFAULT_PASS: 設置控制檯默認密碼,默認爲 guest

RabbitMQ 控制檯

RabbitMQ 容器啓動完畢後,打開瀏覽器,並在地址欄中輸入 http://localhost:15672/ ,而且輸入登陸的用戶名和密碼,就能夠看到控制檯以下所示
框架

在上面管理界面中,包含 6 個功能菜單

  • Overview: 用於查看 RabbitMQ 基本信息
  • Connections: 用於查看 RabbitMQ 客戶端的鏈接信息
  • Channels: 用於查看 RabbitMQ 的通道
  • Exchanges:用於查看 RabbitMQ 的交換機
  • Queues: 用於查看 RabbitMQ 的隊列
  • Admin: 用於管理 RabbitMQ 的用戶,虛擬主機,策略等數據

Exchange 和 Queue

RabbitMQ 只有 Queue, 沒有 Topic,由於可經過 Exchange 與 Queue 的組合來實現 Topic 所具有的功能。RabbitMQ 的消息模型以下圖所示

在 Exchange 和 Queue 間有一個 Binding 關係,當消息從 Producer 發送到 Exchange 中時,會根據 Binding 來路由消息的去向。

  • 若是 Binding 各不相同,那麼該消息將路由到其中一個 Queue 中,隨後將被一個 Consumer 所消費,此時實現了 "點對點"的消息通訊模型。
  • 若是 Binding 徹底相同,那麼該消息就會路由到每一個 Queue 中,隨後將會被每一個 Consumer 消費,這樣就實現了 「發佈與訂閱」 的消息通訊模型

所以可將 Binding 理解爲 Exchange 到 Queue 的路由規則,這些規則可經過 RabbitMQ 所提供的客戶端 API 來控制,也可經過 RabbitMQ 提供的控制檯來管理。

RabbitMQ 提供了一個默認的 Exchange(AMQP default),在控制檯的 Exchange 菜單中就能夠看到它,簡單狀況下,只須要使用默認的 Exchange 便可,當須要提供發佈與訂閱功能時纔會使用自定義的 Exchange。

開發服務端和客戶端

下面咱們就將 Spring Boot 與 RabbitMQ 進行整合,先開發一個服務端做爲消息的消費者,再開發一個客戶端做爲消息的生產者,隨後運行客戶端,並查看服務端中接收到的消息。

開發服務端

建立一個名爲 rabbitmq-hello-server 的 maven 項目或者 Spring Starter Project, 在 pom.xml 文件中添加下面 Maven 依賴

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.19.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

Spring Boot 框架中已經添加了對 RabbitMQ 的支持,只須要依賴 spring-boot-starter-amqp 就能夠啓動 RabbitMQ,此時還須要在 application.properties 配置文件中添加 RabbitMQ 的相關配置項

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

接下來建立 HelloServer 類,封裝服務端代碼

package demo.msa.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class HelloServer {

    @RabbitListener(queues = "hello-queue")
    public void receive(String message) {
        System.out.println(message);
    }
}

只須要在 receive() 方法上定義 @RabbitListener ,而且設置 queues 參數來指定消費者須要監聽的的隊列名稱。

最後,編寫一個 Spring Boot 應用程序啓動類來啓動服務器

package demo.msa.rabbitmq;

import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqHelloServerApplication {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello-queue");
    }
    
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqHelloServerApplication.class, args);
    }

}

在 RabbitMQ 中,必須經過程序來顯式建立隊列。服務端啓動完畢後,將持續監聽 RabbitMQ 的 hello-queue 隊列中即將到來的消息,該消息由客戶端來發送。

開發客戶端

建立一個名爲 rabbitmq-hello-client 的 maven 項目或者 Spring Starter Project, pom 中的依賴與服務端一致。客戶端的 application.properties 文件與服務端一致。

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.19.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

接下來建立一個名爲 HelloClient 的類,將其做爲客戶端

package demo.msa.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HelloClient {
    
    @Autowired
    private RabbitTemplate rabbitmqTemplate;
    
    public void send(String message) {
        rabbitmqTemplate.convertAndSend("hello-queue", message);
    }
}

最後編寫 Spring Boot 應用程序啓動類來啓動客戶端

package demo.msa.rabbitmq;

import javax.annotation.PostConstruct;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqHelloClientApplication {
    
    @Autowired
    private HelloClient helloClient;
    
    @Bean
    public Queue helloQueue() {
        return new Queue("hello-queue");
    }
    
    @PostConstruct
    public void init() {
        helloClient.send("hello world!");
    }
    
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqHelloClientApplication.class, args).close();
    }

}

與服務端同樣,此處使用 @Bean 註解的 helloQueue() 方法建立一個名爲 hello-queue 的隊列,這樣能夠保證當客戶端在服務端以前啓動時,也能建立所需的隊列。並且 RabbitMQ 能夠確保不會建立同名的隊列,所以可分別在服務端與客戶端建立同名的隊列。

運行 main 方法能夠啓動客戶端應用程序,此時將在服務端看到客戶端發送的消息,也能夠在 RabbitMQ 控制檯中看到消息隊列當前的狀態。

Java Bean 類型傳輸

上面發送和接收的消息只是 String 類型,若是發送的消息是一個普通的 Java Bean 類型,應該如何調用呢?

Java Bean 類型則必須實現 Serializable 序列化接口才能正常調用,這是由於 RabbitMQ 所傳送的消息是 byte[] 類型,當客戶端發送消息須要進行序列化(也就是講 Java 類型轉換爲 byte[] 類型),當服務端接收消息前須要先反序列化,所以發送和接收的消息對象必須實現 JDK 的序列化接口。

除了這種序列化方式外,咱們也可使用 Jackson 來實現,並且 RabbitMQ 已經爲咱們提供了 jackson 序列化的方式,這種方式更加高效。所須要作的是定義一個 Jackson2JsonMessageConverter 的 Spring Bean。

@Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

結語

RabbitMQ 的性能很是高效和穩定,也能很是方便的與 Spring Boot 應用程序集成,還擁有很是豐富的官方文檔和控制檯,所以選擇 RabbitMQ 做爲服務之間的異步消息調用平臺,將成爲整個微服務架構中的 "消息中心"。

參考

  • 《架構探險—輕量級微服務架構》
相關文章
相關標籤/搜索