深刻淺出 RabbitMQ

什麼是 RabbitMQ

簡介(優勢)

  • 基於 ErLang 語言開發有高可用高併發的優勢,適合集羣。
  • 開源、穩定、易用、跨平臺、支持多種語言、文檔齊全。
  • 有消息確認機制和持久化機制,可靠性高。

概念

生產者和消費者

  • Producer:消息的生產者
  • Consumer:消息的消費者

Queue

  • 消息隊列提供了 FIFO 的處理機制,具備緩存消息的能力。在 RabbitMQ 中,隊列消息能夠設置爲持久化,臨時或者自動刪除。
  • 若是是持久化的隊列,Queue 中的消息會在 Server 本地硬盤存儲一份,防止系統 Crash 數據丟失。
  • 若是是臨時的隊列,Queue 中的數據在系統重啓以後就會丟失。
  • 如實是自動刪除的隊列,當不存在用戶鏈接到 Server,隊列中的數據會被自動刪除。

ExChange

ExChange 相似於數據通訊網絡中的交換機,提供消息路由策略。java

RabbitMQ 中,生產者不是將消息直接發送給 Queue,而是先發送給 ExChangeExChange 根據生產者傳遞的 key 按照特定的路由算法將消息給指定的 Queue。一個 ExChange 能夠綁定多個 Queue。和 Queue 同樣,ExChange 也能夠設置爲持久化、臨時或者自動刪除。算法

Binding

所謂綁定就是將一個特定的 ExChange 和一個特定的 Queue 綁定起來。ExChangeQueue 的綁定能夠是多對多的關係。spring

Virtual Host

RabbitMQ Server上能夠建立多個虛擬的 Message Broker(又叫作 Virtual Hosts)。每個 vhost 本質上是一個迷你的 RabbitMQ Server,分別管理各自的 ExChangebinding。生產者和消費者鏈接 RabbitMQ Server 須要指定一個 Virtual Hostdocker

使用過程

  1. 客戶端鏈接到消息隊列服務器,打開一個 Channel
  2. 客戶端聲明一個 ExChange,並設置相關屬性。
  3. 客戶端聲明一個 Queue,並設置相關屬性。
  4. 客戶端使用 Routing Key,在 ExChangeQueue 之間創建好綁定關係。
  5. 客戶端投遞消息到 ExChange
  6. ExChange 接收到消息後,就根據消息的 key 和已經設置的 bingding,進行消息路由,將消息投遞到一個或多個隊列裏。

部署 RabbitMQ

使用 Docker Compose 部署

建立 docker-compose.yml緩存

version: '3.1'
services:
  rabbitmq:
    restart: always
    image: rabbitmq:management
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      TZ: Asia/Shanghai
      RABBITMQ_DEFAULT_USER: rabbit
      RABBITMQ_DEFAULT_PASS: 123456
    volumes:
      - ./data:/var/lib/rabbitmq

RabbitMQ WebUI 界面

  • 訪問地址:http://{ip}:15672服務器

  • 首頁網絡

    深刻淺出 RabbitMQ

  • Global counts併發

    深刻淺出 RabbitMQ

  • 交換機頁app

    深刻淺出 RabbitMQ

  • 隊列頁ide

    深刻淺出 RabbitMQ

    • Name:消息隊列的名稱,這裏是經過程序建立的

    • Features:消息隊列的類型,durable:true 爲會持久化消息

    • Ready:準備好的消息

    • Unacked:未確認的消息

    • Total:所有消息

    若是都爲 0 則說明所有消息處理完成

使用 RabbitMQ

建立生產者

建立一個名爲 spring-boot-amqp-provider 的生產者項目。

相關配置

  • 建立 application.yml 文件

    spring:
    application:
      name: spring-boot-amqp
    rabbitmq:
      host: 192.168.75.133
      port: 5672
      username: rabbit
      password: 123456
  • 建立隊列

    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
    * 隊列配置
    */
    @Configuration
    public class RabbitMQConfiguration {
    
      @Bean
      public Queue queue() {
          return new Queue("helloRabbitMQ");
      }
    }
  • 建立消息提供者

    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
    * 消息提供者
    */
    @Component
    public class RabbitMQProvider {
    
      @Autowired
      private AmqpTemplate amqpTemplate;
    
      public void send() {
          String context = "hello" + new Date();
          System.out.println("Provider: " + context);
          amqpTemplate.convertAndSend("helloRabbitMQ", context);
      }
    }

發送消息

建立測試用例

import com.lusifer.spring.boot.amqp.Application;
import com.lusifer.spring.boot.amqp.provider.HelloRabbitProvider;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class AmqpTest {

    @Autowired
    private HelloRabbitProvider helloRabbitProvider;

    @Test
    public void testSender() {
        for (int i = 0; i < 10; i++) {
            RabbitMQProvider.send();
        }
    }
}

建立消費者

建立一個名爲 spring-boot-amqp-consumer 的消費者項目。

相關配置

建立 application.yml 文件

spring:
  application:
    name: spring-boot-amqp-consumer
  rabbitmq:
    host: 192.168.75.133
    port: 5672
    username: rabbit
    password: 123456

接收消息

建立消息的消費監聽組件

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "helloRabbitMQ")
public class HelloRabbitConsumer {

    @RabbitHandler
    public void process(String message) {
        System.out.println("Consumer: " + message);
    }
}
相關文章
相關標籤/搜索