RabbitMQ系列之基本概念和基本用法

  1. 本文主要整理一下與 RabbitMQ 相關的一些基本概念,瞭解這些概念,是使用好 RabbitMQ 的基礎;
  2. 並使用直連型交換機作了一個demo,經過實例去更好的理解這些概念。文末提供有 demo 下載地址。

AMQP

  • 即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計;
  • AMQP 的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。
  • RabbitMQ 是一個開源的 AMQP 實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、PHP等。

備註:想對 AMQP 有深刻了解能夠點這裏查看。git

RabbitMQ 的一些基本概念

經過下面這張圖片,咱們能夠對 RabbitMQ 的模型有一個大概的瞭解:
rabbitMQ模型.pnggithub

基本概念

須要瞭解的基本概念主要以下:web

  • Producer:消息生產者。
  • Consumer:消息消費者。
  • Connection(鏈接):Producer 和 Consumer 經過 TCP 鏈接到 RabbitMQ Server。
  • Channel(信道):基於 Connection 建立,數據流動都是在 Channel 中進行。
  • Broker(消息代理):實際上就是消息服務器實體。
  • Vhost(虛擬主機) : 虛擬主機,一個消息代理(Broker)裏能夠開設多個虛擬主機(Vhost),用做不一樣用戶的權限分離。
  • Exchange(交換機) : 用來發送消息的 AMQP 實體,它指定消息按什麼路由規則,路由到哪一個隊列。
  • Queue(消息隊列) :是 RabbitMQ 的內部對象,用於存儲消息。每一個消息都會被投入到一個或多個隊列。且多個消費者能夠訂閱同一個 Queue(這時 Queue 中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理)。
  • Binding(綁定) : 它的做用就是把交換機(Exchange)和隊列(Queue)按照路由規則綁定起來。
  • Routing Key(路由鍵) :消息發送給 Exchange(交換機)時,消息將擁有一個路由鍵(默認爲空), Exchange(交換機)根據這個路由鍵將消息發送到匹配的隊列中。
  • Binding Key(綁定鍵):指定當前 Exchange(交換機)下,什麼樣的 Routing Key(路由鍵)會被下派到當前綁定的 Queue 中。

交換機的三種類型

  • Direct:徹底匹配,消息路由到那些 Routing Key 與 Binding Key 徹底匹配的 Queue 中。好比 Routing Key 爲 test1,則只會轉發轉發 test1,不會轉發 test2。
  • Topic:模式匹配,Exchange 會把消息發送到一個或者多個知足通配符規則的 routing-key 的 Queue。spring

    • *號表示匹配一個 word(好比:知足 a.*.c的 routing-key 有 a.test1.c);
    • #號匹配多個 word 和路徑,路徑之間經過 . 隔開(好比:知足 a.#.c的 routing-key 有 a.test1.test2.c);
  • Fanout:忽略匹配,把全部發送到該 Exchange 的消息路由到全部與它綁定的 Queue 中。

經過實例去深刻理解

一會兒看到這麼多概念,可能會有點發懵。下面咱們經過一個很簡單的 demo 去深刻理解一下這些概念。
panda.jpeg安全

SpringBoot 工程添加 RabbitMQ

pom文件添加依賴

建立一個 SpringBoot 項目,在 pom.xml文件裏添加依賴:服務器

<!--mq-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
設置 RabbitMQ 配置參數
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

建立 RabbitMQ 配置類

添加一個 RabbitMQ 的配置類 RabbitConfig:微信

/**
 * RabbitMQ配置
 * @公衆號 全棧在路上
 * @GitHub https://github.com/liuyongfei1
 * @author lyf
 * @date 2020-05-17 17:20
 */
Configuration
public class RabbitConfig {

    /**
     * 聲明隊列
     * 參數說明:
     * durable 是否持久化,默認是false(持久化隊列則數據會被存儲在磁盤上,當消息代理重啓時數據不會丟失;暫存隊列只對當前鏈接有效)
     * exclusive 默認是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable
     * autoDelete 默認是false,是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
     * 通常設置一下隊列的持久化就好,其他兩個就是默認false
     * @return Queue
     */
    @Bean
    Queue myQueue() {
        return new Queue(QueueConstants.QUEUE_NAME, true);
    }

    /**
     * 設置交換機,類型爲 direct
     * @return DirectExchange
     */
    @Bean
    DirectExchange myExchange() {
        return new DirectExchange(QueueConstants.QUEUE_EXCHANGE_NAME, true, false);
    }

    /**
     * 綁定:將交換機和隊列綁定,並設置路由匹配鍵
     * @return Binding
     */
    @Bean
    Binding queueBinding() {
        return BindingBuilder.bind(myQueue()).to(myExchange()).with(QueueConstants.QUEUE_ROUTING_KEY_NAME);
    }
}

消息生產端

/**
 * 消息生產端
 * @公衆號 全棧在路上
 * @GitHub https://github.com/liuyongfei1
 * @author lyf
 * @date 2020-05-17 18:30
 */
@RestController
public class ProducerController {

    /**
     * RabbitTemplate提供了發送/接收消息的方法
     */
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 發送消息(交換機類型爲 Direct)
     * @return
     */
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        // 生成消息的惟一id
        String msgId = UUID.randomUUID().toString();
        String messageData = "hello,this is rabbitmq demo message";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        // 定義要發送的消息對象
        Map<String,Object> messageObj = new HashMap<>();
        messageObj.put("msgId",msgId);
        messageObj.put("messageData",messageData);
        messageObj.put("createTime",createTime);

        rabbitTemplate.convertAndSend(QueueConstants.QUEUE_EXCHANGE_NAME,QueueConstants.QUEUE_ROUTING_KEY_NAME, messageObj);
        return "message send ok";
    }

}

生產消息

代碼保存後,啓動服務,使用 Postman 請求生產消息接口:
postman.pngapp

查看 RabbitMQ 管理界面

​Postman 請求成功,咱們打開 RabbitMQ 的管理界面:
rabbitmq-web1.pngdom

能夠看到有一個名爲 demo1_queue 的隊列,說明咱們測試的消息已經推送到RabbitMq 服務器上面了:spring-boot

  • Total 爲 1

    • 由於使用Postman就請求了接口一次,所以生產端總共就生產了 1 條消息。
  • Ready 爲 1

    • 由於咱們尚未啓動消費端,因此沒有被消費的消息數量爲 1。

消息消費端

/**
 * 消息消費端
 * @公衆號 全棧在路上
 * @GitHub https://github.com/liuyongfei1
 * @author lyf
 * @date 2020-05-17 18:00
 */
@Component
@RabbitListener(queues = {QueueConstants.QUEUE_NAME})
public class ConsumerController {

    @RabbitHandler
    public void handler(Map message) throws IOException {
        System.out.println("收到消息:" + message.toString());
    }
}

保存代碼,重啓服務,在 Idea 的終端窗口,能夠看到以前生產的那條消息已經被被消費了:
consumer-console.png
因爲當前隊列就 1 條消息 且已經被成功消費掉了,再次訪問 RabbitMQ 管理界面,會發現 Ready 和 Total 已經更新爲 0。

demo下載地址

  1. 文中 demo 代碼下載請點擊這裏
  2. 歡迎你們關注掃描二維碼或 添加微信公衆號:全棧在路上
  3. 微信公衆號二維碼.jpg
相關文章
相關標籤/搜索