RabbitMQ實戰:理解消息通訊

本系列是「RabbitMQ實戰:高效部署分佈式消息隊列」書籍的總結筆記。java

前段時間總結完了「深刻淺出MyBatis」系列,對MyBatis有了更全面和深刻的瞭解,在掘金社區也收到了一些博友的喜歡,很高興。另外,短暫的陪產假就要結束了,小寶也二週了,下週二就要投入工做了,但願本身儘快調整過來,加油努力。bash

從本篇開始總結「RabbitMQ實戰」系列的閱讀筆記,RabbitMQ是一個開源的消息代理和隊列服務器,能夠經過基本協議在徹底不一樣的應用之間共享數據,能夠將做業排隊以便讓分佈式服務進行處理。服務器

本篇介紹下消息通訊,首先介紹基礎概念,將這些概念映射到AMQP協議,而後介紹消息持久化、發送方確認模式等消息可靠性保證。微信

經過本篇介紹,你會了解到:session

  • 消息通訊概念:消費者、生產者和代理
  • AMQP元素:隊列、交換器、綁定
  • 虛擬主機
  • 消息持久化
  • 發送方確認模式

消息通訊概念

此部分的介紹,會牽涉到AMQP的元素,若是以前沒接觸過的,能夠結合下面的「AMQP元素」進行理解。併發

消息

消息是傳輸的主體,消息包括兩部分:有效載荷(payload)和標籤(label);有效載荷是要傳輸的數據,能夠是任何內容,好比JSON串、二進制、自定義的數據協議等;標籤描述了有效載荷,而且Rabbit用它來決定誰將得到消息的投遞。分佈式

能夠與HTTP協議類比,HTTP消息頭部描述了消息體的類型、大小等,HTTP消息體是要傳輸的數據,HTTP服務端經過消息頭部決定如何處理請求和數據。工具

生產者和消費者

生產者建立消息,而後發送到代理服務器(RabbitMQ Server),AMQP只會用標籤表述這條消息(一個交換器名稱和可選的主題標記),Rabbit服務器會根據標籤把消息發送給訂閱的消費者。性能

消費者消費消息,它會訂閱到隊列(queue)上,每當有消息到達RabbitMQ服務器時,會發送給消費者,消費者收到消息時,會進行處理。spa

注意:消費者收到的消息只包括有效載荷,全部不會知道是從哪裏發來的。

鏈接和信道

要想發佈或消費消息,必須先與RabbitMQ Server創建一條TCP鏈接,創建TCP鏈接以後,要建立一條信道,信道是創建在真實TCP鏈接的虛擬鏈接。

AMQP命令都是經過信道發送出去的,每條信道會被指派一個惟一的ID,爲何不直接經過TCP鏈接發送AMQP命令呢? 由於操做系統創建和銷燬TCP會話是很昂貴的,並且建立的鏈接數也有限。 經過引入通道,能夠在鏈接上創建通道,並且通道是私密的,相互不受影響。

通道的概念仍是有點抽象,後面專門寫一篇文章進行分析介紹,這裏簡單理解下吧。

AMQP元素

AMQP消息路由有三部分組成:隊列、交換器和綁定,隊列是存放消息的地方,交換器是決定不一樣的分發策略,綁定是隊列和交換器的橋樑,定義匹配規則。

生產者發送消息到交換器,交換器根據自身類型和綁定規則,將消息存放在對應隊列中,而後將消息發送到監聽隊列的消費者。

AMQP基本模型

如上圖:P爲生產者,X爲交換器,交換器類型爲direct,根據不一樣的綁定規則(orange、black、green),分發給不一樣的隊列,C爲消費者,從不一樣的隊列介紹消息。

隊列

消費者經過兩種方式從特定的隊列接收消息:

  • basic.consume,這樣會將信道置爲接收模式,直到取消對隊列的訂閱;
  • basic.get,主動讓消費者接收隊列中的下一條消息;

basic.get會影響性能,推薦使用basic.consume來實現高吞吐量,由於其處理過程是先訂閱消息,獲取單條消息,再取取消訂閱。

若是隊列擁有多個消費者時,隊列收到的消息將以循環的方式發給消費者,即多個消費者平均消費這些消息。

另外,消費者接收到的每一條消息都要進行確認,必須經過basic.ack命令向rabbitmq服務端發送一個確認。 也能夠設置auto_ack爲true,只要消費者接收到消息,就自動視爲確認,不過不建議這樣,由於接收到不表明業務邏輯處理成功。 服務端接收到確認後,會從隊列中刪除對應消息。

還有一種場景,在接收到消息後,若是不想處理,能夠經過下面方式處理:

  • 把消費者從RabbitMQ服務器斷開鏈接,,這樣RabbitMQ會自動將消息入隊併發送給另一個消費者;
  • 若是不想發送給其餘消費者處理,就是想忽略這個消息,能夠發送basic.reject命令;

最後來介紹下如何建立隊列,首先明確下是生成者仍是消費者建立,關鍵點是:生產者可否承擔起丟失消息,由於發出去的消息若是路由到了不存在的隊列,Rabbit會忽略它們。因此,建議生成者和消費者都嘗試去建立隊列,能夠經過設置queue.declare的passive選項設置爲ture來判斷隊列是否存在,若是不存在會返回一個錯誤。

經過queue.declare命令來建立隊列,有一些選項說明下:

  • exclusive:若是設置true的化,隊列將變成私有的,只有建立隊列的應用程序纔可以消費隊列消息;
  • auto-delete:當最後一個消費者取消訂閱的時候,隊列會自動移除;
  • durable:是否要持久化;
queueDeclare(String queue, 
            boolean durable, 
            boolean exclusive, 
            Map<String, Object> arguments);
複製代碼
交換器和綁定

交換器有四種類型:direct、fanout、topic、headers,其中headers匹配消息的header而非路由鍵,不太實用,就不詳細介紹了。

第一種:direct交換器

direct交換器比較簡單,若是和路由鍵 徹底匹配 的話,就會投遞到對應的隊列:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
複製代碼

服務器默認包含一個空白字符串名稱的默認路由器,當聲明一個隊列時,會自定綁定到默認交換器,並以隊列名稱做爲路由鍵。

第二種:fanout交換器

fanout交換器,不處理路由鍵,只須要簡單的將隊列綁定到交換機上,爲會每一個消費者自動生成一個隨機隊列,全部的消費者都會收到全部消息。

fanout交換器

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
複製代碼

第三種:topic交換器

topic交換器,將路由鍵和某模式進行匹配,此時隊列須要綁定要一個模式上。

tipic交換器

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
複製代碼

關於模式,符號#匹配一個或多個詞,符號*匹配一個詞,所以kfs.#可以匹配到kfs.session.message,可是audit.*只會匹配到audit.session。

虛擬主機

每一個RabbitMQ服務器都能建立虛擬消息服務器,稱爲虛擬主機(vhost),每一個RabbitMQ本質上是一個mini版的RabbitMQ服務器,擁有本身的隊列、交換器、綁定,還有本身的權限機制。

鏈接時,必須制定vhost,rabbitmq包含了默認的vhost:"/"。當建立一個用戶時,會被指派給至少一個vhsot,而且相互隔離。

vhost不能經過AMQP協議建立,須要使用rabbitmqctl工具建立。

消息持久化和發送方確認模式

若是沒有持久化,重啓rabbitmq後,隊列、交換器都會消失,RabbitMQ提供了持久化的功能,須要知足如下三個條件:

  • 交換器設置爲持久化,經過durable屬性;
  • 隊列設置爲持久化,經過durable屬性;
  • 消息投遞模式delivery設置爲2;

當發佈一條持久化消息到持久化交換器上時,rabbit會在消息提交到日誌文件後纔會發送響應,全部會損失性能,因此,只對重要數據持久化便可。

考慮這種狀況:因爲發佈消息後,不返回任何信息給生產者,如何只對服務器已經持久化到硬盤了呢,可能在傳輸過程當中丟失,或者持久化前服務器宕機,致使消息丟失。

RabbitMQ經過「發送方確認模式」來解決上面的問題。首先,須要將信道設置成confirm模式,這樣全部在信道上發佈的消息都會被指派一個惟一的ID號,一旦消息被投遞到全部匹配的隊列或持久化到磁盤,會發送一個確認消息給生產者。

經過本篇的介紹,對Rabbit的消息模型有了總體瞭解,下一篇會寫個DEMO,並介紹下運行和管理RabbitMQ。

歡迎掃描下方二維碼,關注個人我的微信公衆號 ~

情情說
相關文章
相關標籤/搜索