RabbitMQ詳解

介紹RabbitMQ前,有必須先了解一下AMQP協議。AMQP協議是一個高級抽象層消息通訊協議,RabbitMQ是AMQP協議的實現。它主要包括如下組件:正則表達式

 

1. Server(broker): 接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。安全

2. Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Host服務器

3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。異步

4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。函數

5.Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。性能

6.Binding:Binding聯繫了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。 操作系統

7.Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。線程

8.Channel:信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection。設計

9.Command:AMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。

在瞭解了AMQP模型之後,須要簡單介紹一下AMQP的協議棧,AMQP協議自己包括三層:

 

1.       Modle Layer,位於協議最高層,主要定義了一些供客戶端調用的命令,客戶端能夠利用這些命令實現本身的業務邏輯,例如,客戶端能夠經過queue.declare聲明一個隊列,利用consume命令獲取一個隊列中的消息。

2.       Session Layer,主要負責將客戶端的命令發送給服務器,在將服務器端的應答返回給客戶端,主要爲客戶端與服務器之間通訊提供可靠性、同步機制和錯誤處理。

3.       Transport Layer,主要傳輸二進制數據流,提供幀的處理、信道複用、錯誤檢測和數據表示。

 

從AMQP協議能夠看出,MessageQueue、Exchange和Binding構成了AMQP協議的核心,下面咱們就圍繞這三個主要組件    從應用使用的角度全面的介紹如何利用Rabbit MQ構建消息隊列以及使用過程當中的注意事項。

 

 

1. 聲明MessageQueue

      在Rabbit MQ中,不管是生產者發送消息仍是消費者接受消息,都首先須要聲明一個MessageQueue。這就存在一個問題,是生產者聲明仍是消費者聲明呢?要解決這個問題,首先須要明確:

a)消費者是沒法訂閱或者獲取不存在的MessageQueue中信息。

b)消息被Exchange接受之後,若是沒有匹配的Queue,則會被丟棄。

在明白了上述兩點之後,就容易理解若是是消費者去聲明Queue,就有可能會出如今聲明Queue以前,生產者已發送的消息被丟棄的隱患。若是應用可以經過消息重發的機制容許消息丟失,則使用此方案沒有任何問題。可是若是不能接受該方案,這就須要不管是生產者仍是消費者,在發送或者接受消息前,都須要去嘗試創建消息隊列。這裏有一點須要明確,若是客戶端嘗試創建一個已經存在的消息隊列,Rabbit MQ不會作任何事情,並返回客戶端創建成功的。

       若是一個消費者在一個信道中正在監聽某一個隊列的消息,Rabbit MQ是不容許該消費者在同一個channel去聲明其餘隊列的。Rabbit MQ中,能夠經過queue.declare命令聲明一個隊列,能夠設置該隊列如下屬性:

a) Exclusive:排他隊列,若是一個隊列被聲明爲排他隊列,該隊列僅對首次聲明它的鏈接可見,並在鏈接斷開時自動刪除。這裏須要注意三點:其一,排他隊列是基於鏈接可見的,同一鏈接的不一樣信道是能夠同時訪問同一個鏈接建立的排他隊列的。其二,「首次」,若是一個鏈接已經聲明瞭一個排他隊列,其餘鏈接是不容許創建同名的排他隊列的,這個與普通隊列不一樣。其三,即便該隊列是持久化的,一旦鏈接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。

b)   Auto-delete:自動刪除,若是該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。

 c)   Durable:持久化,這個會在後面做爲專門一個章節討論。

d)  其餘選項,例如若是用戶僅僅想查詢某一個隊列是否已存在,若是不存在,不想創建該隊列,仍然能夠調用queue.declare,只不過須要將參數passive設爲true,傳給queue.declare,若是該隊列已存在,則會返回true;若是不存在,則會返回Error,可是不會建立新的隊列。

2. 生產者發送消息

        在AMQP模型中,Exchange是接受生產者消息並將消息路由到消息隊列的關鍵組件。ExchangeType和Binding決定了消息的路由規則。因此生產者想要發送消息,首先必需要聲明一個Exchange和該Exchange對應的Binding。能夠經過 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,聲明一個Exchange須要三個參數:ExchangeName,ExchangeType和Durable。ExchangeName是該Exchange的名字,該屬性在建立Binding和生產者經過publish推送消息時須要指定。ExchangeType,指Exchange的類型,在RabbitMQ中,有三種類型的Exchange:direct ,fanout和topic,不一樣的Exchange會表現出不一樣路由行爲。Durable是該Exchange的持久化屬性,這個會在消息持久化章節討論。聲明一個Binding須要提供一個QueueName,ExchangeName和BindingKey。下面咱們就分析一下不一樣的ExchangeType表現出的不一樣路由規則。

        生產者在發送消息時,都須要指定一個RoutingKey和Exchange,Exchange在接到該RoutingKey之後,會判斷該ExchangeType:

                         a) 若是是Direct類型,則會將消息中的RoutingKey與該Exchange關聯的全部Binding中的BindingKey進行比較,若是相等,則發送到該Binding對應的Queue中。

 
                  b)  若是是 Fanout 類型,則會將消息發送給全部與該 Exchange 定義過 Binding 的全部 Queues 中去,實際上是一種廣播行爲。
         
 

        c)若是是Topic類型,則會按照正則表達式,對RoutingKey與BindingKey進行匹配,若是匹配成功,則發送到對應的Queue中。

             

3. 消費者訂閱消息    

    在RabbitMQ中消費者有2種方式獲取隊列中的消息:

       a)  一種是經過basic.consume命令,訂閱某一個隊列中的消息,channel會自動在處理完上一條消息以後,接收下一條消息。(同一個channel消息處理是串行的)。除非關閉channel或者取消訂閱,不然客戶端將會一直接收隊列的消息。

             b)  另一種方式是經過basic.get命令主動獲取隊列中的消息,可是絕對不能夠經過循環調用basic.get來代替basic.consume,這是由於basic.get RabbitMQ在實際執行的時候,是首先consume某一個隊列,而後檢索第一條消息,而後再取消訂閱。若是是高吞吐率的消費者,最好仍是建議使用basic.consume。

      若是有多個消費者同時訂閱同一個隊列的話,RabbitMQ是採用循環的方式分發消息的,每一條消息只能被一個訂閱者接收。例如,有隊列Queue,其中ClientA和ClientB都Consume了該隊列,MessageA到達隊列後,被分派到ClientA,ClientA回覆服務器收到響應,服務器刪除MessageA;再有一條消息MessageB抵達隊列,服務器根據「循環推送」原則,將消息會發給ClientB,而後收到ClientB的確認後,刪除MessageB;等到再下一條消息時,服務器會再將消息發送給ClientA。

       這裏咱們能夠看出,消費者再接到消息之後,都須要給服務器發送一條確認命令,這個便可以在handleDelivery裏顯示的調用basic.ack實現,也能夠在Consume某個隊列的時候,設置autoACK屬性爲true實現。這個ACK僅僅是通知服務器能夠安全的刪除該消息,而不是通知生產者,與RPC不一樣。 若是消費者在接到消息之後還沒來得及返回ACK就斷開了鏈接,消息服務器會重傳該消息給下一個訂閱者,若是沒有訂閱者就會存儲該消息。

        既然RabbitMQ提供了ACK某一個消息的命令,固然也提供了Reject某一個消息的命令。當客戶端發生錯誤,調用basic.reject命令拒絕某一個消息時,能夠設置一個requeue的屬性,若是爲true,則消息服務器會重傳該消息給下一個訂閱者;若是爲false,則會直接刪除該消息。固然,也能夠經過ack,讓消息服務器直接刪除該消息而且不會重傳。

4. 持久化:

        Rabbit MQ默認是不持久隊列、Exchange、Binding以及隊列中的消息的,這意味着一旦消息服務器重啓,全部已聲明的隊列,Exchange,Binding以及隊列中的消息都會丟失。經過設置Exchange和MessageQueue的durable屬性爲true,可使得隊列和Exchange持久化,可是這還不能使得隊列中的消息持久化,這須要生產者在發送消息的時候,將delivery mode設置爲2,只有這3個所有設置完成後,才能保證服務器重啓不會對現有的隊列形成影響。這裏須要注意的是,只有durable爲true的Exchange和durable爲ture的Queues才能綁定,不然在綁定時,RabbitMQ都會拋錯的。持久化會對RabbitMQ的性能形成比較大的影響,可能會降低10倍不止。

5. 事務:

     對事務的支持是AMQP協議的一個重要特性。假設當生產者將一個持久化消息發送給服務器時,由於consume命令自己沒有任何Response返回,因此即便服務器崩潰,沒有持久化該消息,生產者也沒法獲知該消息已經丟失。若是此時使用事務,即經過txSelect()開啓一個事務,而後發送消息給服務器,而後經過txCommit()提交該事務,便可以保證,若是txCommit()提交了,則該消息必定會持久化,若是txCommit()還未提交即服務器崩潰,則該消息不會服務器就收。固然Rabbit MQ也提供了txRollback()命令用於回滾某一個事務。

6. Confirm機制:

      使用事務當然能夠保證只有提交的事務,纔會被服務器執行。可是這樣同時也將客戶端與消息服務器同步起來,這背離了消息隊列解耦的本質。Rabbit MQ提供了一個更加輕量級的機制來保證生產者能夠感知服務器消息是否已被路由到正確的隊列中——Confirm。若是設置channel爲confirm狀態,則經過該channel發送的消息都會被分配一個惟一的ID,而後一旦該消息被正確的路由到匹配的隊列中後,服務器會返回給生產者一個Confirm,該Confirm包含該消息的ID,這樣生產者就會知道該消息已被正確分發。對於持久化消息,只有該消息被持久化後,纔會返回Confirm。Confirm機制的最大優勢在於異步,生產者在發送消息之後,便可繼續執行其餘任務。而服務器返回Confirm後,會觸發生產者的回調函數,生產者在回調函數中處理Confirm信息。若是消息服務器發生異常,致使該消息丟失,會返回給生產者一個nack,表示消息已經丟失,這樣生產者就能夠經過重發消息,保證消息不丟失。Confirm機制在性能上要比事務優越不少。可是Confirm機制,沒法進行回滾,就是一旦服務器崩潰,生產者沒法獲得Confirm信息,生產者其實自己也不知道該消息吃否已經被持久化,只有繼續重發來保證消息不丟失,可是若是原先已經持久化的消息,並不會被回滾,這樣隊列中就會存在兩條相同的消息,系統須要支持去重。

消息持久化是 RabbitMQ 最爲人津津樂道的特性之一, RabbitMQ 可以在付出最小的性能代價的基礎上實現消息的持久化,最大的奧祕就在於 RabbitMQ 多層消息隊列的設計上。下面,本文就從 MessageQueue 的設計和消息在 MessageQueue 的生命週期兩個方面全面介紹  RabbitMQ 的消息隊列。

   RabbitMQ徹底實現了AMQP協議,相似於一個郵箱服務。Exchange負責根據ExchangeType和RoutingKey將消息投遞到對應的消息隊列中,消息隊列負責在消費者獲取消息前暫存消息。在RabbitMQ中,MessageQueue主要由兩部分組成,一個爲AMQQueue,主要負責實現AMQP協議的邏輯功能。另一個是用來存儲消息的BackingQueue,本文重點關注的是BackingQueue的設計。

   

    在RabbitMQ中BackingQueue又由5個子隊列組成:Q一、Q二、Delta、Q3和Q4。RabbitMQ中的消息一旦進入隊列,不是固定不變的,它會隨着系統的負載在隊列中不斷流動,消息的狀態不斷髮生變化。RabbitMQ中的消息一共有5種狀態:

   a)Alpha:消息的內容和消息索引都保存在內存中;

   b)Beta:消息內容保存在磁盤上,消息索引保存在內存中;

   c)Gamma:消息內容保存在磁盤上,消息索引在磁盤和內存都有;

   d)Delta:消息內容和索引都在磁盤上;

   注意:對於持久化的消息,消息內容和消息索引都必須先保存到磁盤上,纔會處於上述狀態中的一種,而Gamma狀態的消息只有持久化的消息纔會有該狀態。

 

      BackingQueue 中的 5 個子隊列中的消息狀態, Q1 和 Q4 對應的是 Alpha 狀態, Q2 和 Q3 是 Beta 狀態, Delta 對應的是 Delta 狀態。上述就是 RabbitMQ 的多層隊列結構的設計,咱們能夠看出從 Q1 到 Q4 ,基本經歷的是由 RAM 到 DISK,再到 RAM 的設計。這樣的設計的好處就是當隊列負載很高的狀況下,可以經過將一部分消息由磁盤保存來節省內存空間,當負載下降的時候,這部分消息又漸漸回到內存,被消費者獲取,使得整個隊列有很好的彈性。下面咱們就來看一下,整個消息隊列的工做流程。

     引發消息流動主要有兩方面的因素:其一是消費者獲取消息;其二是因爲內存不足,引發消息的換出到磁盤上( Q1-.>Q2 、 Q2->Delta 、 Q3->Delta 、 Q4->Q3 )。 RabbitMQ 在系統運行時會根據消息傳輸的速度計算一個當前內存中可以保存的最大消息數量( Target_RAM_Count ),當內存中的消息數量大於該值時,就會引發消息的流動。進入隊列的消息,通常會按着 Q1->Q2->Delta->Q3->Q4 的順序進行流動,可是並非每條消息都必定會經歷全部的狀態,這個取決於當時系統的負載情況。
       當消費者獲取消息時,首先會從 Q4 隊列中獲取消息,若是 Q4 獲取成功,則返回,若是 Q4 爲空,則嘗試從 Q3 獲取消息;首先,系統會判斷 Q3 隊列是否爲空,若是爲空,則直接返回隊列爲空,即此時隊列中無消息(後續會論證)。若是不爲空,則取出 Q3 的消息,而後判斷此時 Q3 和 Delta 隊列的長度,若是都爲空,則可認爲 Q2 、 Delta 、 Q3 和 Q4 所有爲空 (後續說明 ) ,此時將 Q1 中消息直接轉移到 Q4 中,下次直接從 Q4 中獲取消息。若是 Q3 爲空, Delta 不空,則將 Delta 中的消息轉移到 Q3 中;若是 Q3 非空,則直接下次從 Q3 中獲取消息。在將 Delta 轉移到 Q3 的過程當中, RabbitMQ 是按照索引分段讀取的,首先讀取某一段,直到讀到的消息非空爲止,而後判斷讀取的消息個數與 Delta 中的消息個數是否相等,若是相等,則判定此時 Delta 中已無消息,則直接將 Q2 和剛讀到的消息一併放入 Q3 中。若是不相等,則僅將這次讀到的消息轉移到Q3 中。這就是消費者引發的消息流動過程。
    

     下面咱們分析一下因爲內存不足引發的消息換出。消息換出的條件是內存中保存的消息數量 + 等待 ACK 的消息的數量 >Target_RAM_Count 。當條件觸發時,系統首先會判斷若是當前進入等待 ACK 的消息的速度大於進入隊列的消息的速度時,會先處理等待 ACK 的消息。步驟基本上 Q1->Q2 或者 Q3 移動,取決於 Delta 隊列是否爲空。 Q4->Q3 移動, Q2 和Q3 向 Delta 移動。

    最後,咱們來分析一下前面遺留的兩個問題,一個是爲何 Q3 隊列爲空便可認定整個隊列爲空。試想若是 Q3 爲空,Delta 不空,則在 Q3 取出最後一條消息時, Delta 上的消息就會被轉移到 Q3 上,與 Q3 空矛盾。若是 Q2 不空,則在 Q3 取出最後一條消息,若是 Delta 爲空時,會將 Q2 的消息併入 Q3 ,與 Q3 爲空矛盾。若是 Q1 不空,則在 Q3 取出最後一條消息,若是 Delta 和 Q3 均爲空時,則將 Q1 的消息轉移到 Q4 中,與 Q4 爲空矛盾。這也解釋了另一個問題,即爲何 Q3 和Delta 爲空, Q2 就爲空。
    上述就是整個消息在 RabbitMQ 隊列中流動過程。從上述流程能夠看出,消息若是可以被儘早消費掉,就不須要經歷持久化的過程,由於這樣會加系統的開銷。若是消息被消費的速度過慢, RabbitMQ 經過換出內存的方式,防止內存溢出。
相關文章
相關標籤/搜索