RabbitMQ中 exchange、route、queue的關係

從AMQP協議能夠看出,MessageQueue、Exchange和Binding構成了AMQP協議的核心,下面咱們就圍繞這三個主要組件    從應用使用的角度全面的介紹如何利用Rabbit MQ構建消息隊列以及使用過程當中的注意事項。python

 

 

  • 1. 聲明MessageQueuegit

      在Rabbit MQ中,不管是生產者發送消息仍是消費者接受消息,都首先須要聲明一個MessageQueue。這就存在一個問題,是生產者聲明仍是消費者聲明呢?要解決這個問題,首先須要明確:正則表達式

a)消費者是沒法訂閱或者獲取不存在的MessageQueue中信息。算法

b)消息被Exchange接受之後,若是沒有匹配的Queue,則會被丟棄。安全

在明白了上述兩點之後,就容易理解若是是消費者去聲明Queue,就有可能會出如今聲明Queue以前,生產者已發送的消息被丟棄的隱患。若是應用可以經過消息重發的機制容許消息丟失,則使用此方案沒有任何問題。可是若是不能接受該方案,這就須要不管是生產者仍是消費者,在發送或者接受消息前,都須要去嘗試創建消息隊列。這裏有一點須要明確,若是客戶端嘗試創建一個已經存在的消息隊列,Rabbit MQ不會作任何事情,並返回客戶端創建成功的。服務器

       若是一個消費者在一個信道中正在監聽某一個隊列的消息,Rabbit MQ是不容許該消費者在同一個channel去聲明其餘隊列的。Rabbit MQ中,能夠經過queue.declare命令聲明一個隊列,能夠設置該隊列如下屬性:併發

a) Exclusive:排他隊列,若是一個隊列被聲明爲排他隊列,該隊列僅對首次聲明它的鏈接可見,並在鏈接斷開時自動刪除。這裏須要注意三點:其一,排他隊列是基於鏈接可見的,同一鏈接的不一樣信道是能夠同時訪問同一個鏈接建立的排他隊列的。其二,「首次」,若是一個鏈接已經聲明瞭一個排他隊列,其餘鏈接是不容許創建同名的排他隊列的,這個與普通隊列不一樣。其三,即便該隊列是持久化的,一旦鏈接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。app

b)   Auto-delete:自動刪除,若是該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。負載均衡

 c)   Durable:持久化,這個會在後面做爲專門一個章節討論。異步

d)  其餘選項,例如若是用戶僅僅想查詢某一個隊列是否已存在,若是不存在,不想創建該隊列,仍然能夠調用queue.declare,只不過須要將參數passive設爲true,傳給queue.declare,若是該隊列已存在,則會返回true;若是不存在,則會返回Error,可是不會建立新的隊列。

  • 2. 生產者發送消息

        在AMQP模型中,Exchange是接受生產者消息並將消息路由到消息隊列的關鍵組件。ExchangeType和Binding決定了消息的路由規則。因此生產者想要發送消息,首先必需要聲明一個Exchange和該Exchange對應的Binding。能夠經過 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,聲明一個Exchange須要三個參數:ExchangeName,ExchangeType和Durable。ExchangeName是該Exchange的名字,該屬性在建立Binding和生產者經過publish推送消息時須要指定。ExchangeType,指Exchange的類型,在RabbitMQ中,有三種類型的Exchange:direct ,fanout和topic,不一樣的Exchange會表現出不一樣路由行爲。Durable是該Exchange的持久化屬性,這個會在消息持久化章節討論。聲明一個Binding須要提供一個QueueName,ExchangeName和BindingKey。下面咱們就分析一下不一樣的ExchangeType表現出的不一樣路由規則。

        生產者在發送消息時,都須要指定一個RoutingKey和Exchange,Exchange在接到該RoutingKey之後,會判斷該ExchangeType:

                         a) 若是是Direct類型,則會將消息中的RoutingKey與該Exchange關聯的全部Binding中的BindingKey進行比較,若是相等,則發送到該Binding對應的Queue中。

 


 

                  b)   若是是  Fanout  類型,則會將消息發送給全部與該  Exchange  定義過  Binding  的全部  Queues  中去,實際上是一種廣播行爲。
           

 


 

        c)若是是Topic類型,則會按照正則表達式,對RoutingKey與BindingKey進行匹配,若是匹配成功,則發送到對應的Queue中。

             

 

  • 3. 消費者訂閱消息    

    在RabbitMQ中消費者有2種方式獲取隊列中的消息:

       a)  一種是經過basic.consume命令,訂閱某一個隊列中的消息,channel會自動在處理完上一條消息以後,接收下一條消息。(同一個channel消息處理是串行的)。除非關閉channel或者取消訂閱,不然客戶端將會一直接收隊列的消息。

             b)  另一種方式是經過basic.get命令主動獲取隊列中的消息,可是絕對不能夠經過循環調用basic.get來代替basic.consume,這是由於basic.get RabbitMQ在實際執行的時候,是首先consume某一個隊列,而後檢索第一條消息,而後再取消訂閱。若是是高吞吐率的消費者,最好仍是建議使用basic.consume。

      若是有多個消費者同時訂閱同一個隊列的話,RabbitMQ是採用循環的方式分發消息的,每一條消息只能被一個訂閱者接收。例如,有隊列Queue,其中ClientA和ClientB都Consume了該隊列,MessageA到達隊列後,被分派到ClientA,ClientA服務器收到響應,服務器刪除MessageA;再有一條消息MessageB抵達隊列,服務器根據「循環推送」原則,將消息會發給ClientB,而後收到ClientB的確認後,刪除MessageB;等到再下一條消息時,服務器會再將消息發送給ClientA。

       這裏咱們能夠看出,消費者再接到消息之後,都須要給服務器發送一條確認命令,這個便可以在handleDelivery裏顯示的調用basic.ack實現,也能夠在Consume某個隊列的時候,設置autoACK屬性爲true實現。這個ACK僅僅是通知服務器能夠安全的刪除該消息,而不是通知生產者,與RPC不一樣。 若是消費者在接到消息之後還沒來得及返回ACK就斷開了鏈接,消息服務器會重傳該消息給下一個訂閱者,若是沒有訂閱者就會存儲該消息。

        既然RabbitMQ提供了ACK某一個消息的命令,固然也提供了Reject某一個消息的命令。當客戶端發生錯誤,調用basic.reject命令拒絕某一個消息時,能夠設置一個requeue的屬性,若是爲true,則消息服務器會重傳該消息給下一個訂閱者;若是爲false,則會直接刪除該消息。固然,也能夠經過ack,讓消息服務器直接刪除該消息而且不會重傳。

  • 4. 持久化:

        Rabbit MQ默認是不持久隊列、Exchange、Binding以及隊列中的消息的,這意味着一旦消息服務器重啓,全部已聲明的隊列,Exchange,Binding以及隊列中的消息都會丟失。經過設置Exchange和MessageQueue的durable屬性爲true,可使得隊列和Exchange持久化,可是這還不能使得隊列中的消息持久化,這須要生產者在發送消息的時候,將delivery mode設置爲2,只有這3個所有設置完成後,才能保證服務器重啓不會對現有的隊列形成影響。這裏須要注意的是,只有durable爲true的Exchange和durable爲ture的Queues才能綁定,不然在綁定時,RabbitMQ都會拋錯的。持久化會對RabbitMQ的性能形成比較大的影響,可能會降低10倍不止。

  • 5. 事務:

     對事務的支持是AMQP協議的一個重要特性。假設當生產者將一個持久化消息發送給服務器時,由於consume命令自己沒有任何Response返回,因此即便服務器崩潰,沒有持久化該消息,生產者也沒法獲知該消息已經丟失。若是此時使用事務,即經過txSelect()開啓一個事務,而後發送消息給服務器,而後經過txCommit()提交該事務,便可以保證,若是txCommit()提交了,則該消息必定會持久化,若是txCommit()還未提交即服務器崩潰,則該消息不會服務器就收。固然Rabbit MQ也提供了txRollback()命令用於回滾某一個事務。

  • 6. Confirm機制:

      使用事務當然能夠保證只有提交的事務,纔會被服務器執行。可是這樣同時也將客戶端與消息服務器同步起來,這背離了消息隊列解耦的本質。Rabbit MQ提供了一個更加輕量級的機制來保證生產者能夠感知服務器消息是否已被路由到正確的隊列中——Confirm。若是設置channel爲confirm狀態,則經過該channel發送的消息都會被分配一個惟一的ID,而後一旦該消息被正確的路由到匹配的隊列中後,服務器會返回給生產者一個Confirm,該Confirm包含該消息的ID,這樣生產者就會知道該消息已被正確分發。對於持久化消息,只有該消息被持久化後,纔會返回Confirm。Confirm機制的最大優勢在於異步,生產者在發送消息之後,便可繼續執行其餘任務。而服務器返回Confirm後,會觸發生產者的回調函數,生產者在回調函數中處理Confirm信息。若是消息服務器發生異常,致使該消息丟失,會返回給生產者一個nack,表示消息已經丟失,這樣生產者就能夠經過重發消息,保證消息不丟失。Confirm機制在性能上要比事務優越不少。可是Confirm機制,沒法進行回滾,就是一旦服務器崩潰,生產者沒法獲得Confirm信息,生產者其實自己也不知道該消息吃否已經被持久化,只有繼續重發來保證消息不丟失,可是若是原先已經持久化的消息,並不會被回滾,這樣隊列中就會存在兩條相同的消息,系統須要支持去重。

 

  • 其餘:

Broker:簡單來講就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。

消息隊列的使用過程大概以下:

(1)客戶端鏈接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間創建好綁定關係。
(5)客戶端投遞消息到exchange。

 

Exchanges, queues, and bindings

 
   exchanges, queues, and bindings是三個基礎的概念, 他們的做用是: exchanges are where producers publish their messages,  queues are where the messages end up and are received by consumers, and  bindings are how the messages get routed from the exchange to particular queues. 
  
下面咱們用一副簡單的思惟導圖把上面的概念組織起來:
 
 
   上面還提到了一個vhost的概念,vhost是爲了組織exchanges, queues, and bindings提出的概念,咱們就從它開始講起:
 

VHost

 
   Vhosts也是AMQP的一個基礎概念,鏈接到RabbitMQ默認就有一個名爲"/"的vhost可用,本地調試的時候能夠直接使用這個默認的vhost.這個"/"的訪問可使用guest用戶名(密碼guest)訪問.可使用rabbitmqctl工具修改這個帳戶的權限和密碼,這在生產環境是必需要關注的. 出於安全和可移植性的考慮,一個vhost內的exchange不能綁定到其餘的vhost.
 
    能夠按照業務功能組來規劃vhost,在集羣環境中只要在某個節點建立vhost就會在整個集羣內的節點都建立該vhost.VHost和權限都不能經過AMQP協議建立,在RabbitMQ中都是使用rabbitmqctl進行建立,管理.
 
如何建立vhost   
    vhost和permission(權限)信息是並非經過AMQP建立而是經過rabbitmqctl工具來添加,管理的.
 
說完vhost咱們就來看看重中之重的消息:Message
 

Message

 
   消息由兩部分組成:  payload and  label. "payload"是實際要傳輸的數據,至於數據的格式RabbitMQ並不關心,"label"描述payload,包括exchange name 和可選的topic tag.消息一旦到了consumer那裏就只有payload部分了,label部分並無帶過來.RabbitMQ並不告訴你消息是誰發出的.這比如你收到一封信可是信封上是空白的.固然想知道是誰發的仍是有辦法的,在消息內容中包含發送者的信息就能夠了.
  
   消息的consumer和producer對應的概念是sending和receiving並不對應client和server.經過channel咱們能夠建立不少並行的傳輸 TCP連接再也不成爲瓶頸,咱們能夠把RabbitMQ當作應用程序級別的路由器.
 
 
Consumer消息的接收方式
     Consumer有兩種方式接收消息:
     經過 basic.consume 訂閱隊列.channel將進入接收模式直到你取消訂閱.訂閱模式下Consumer只要上一條消息處理完成(ACK或拒絕),就會主動接收新消息.若是消息到達queue就但願獲得儘快處理,也應該使用basic.consume命令.
     還有一種狀況,咱們不須要一直保持訂閱,只要使用basic.get命令主動獲取消息便可.當前消息處理完成以後,繼續獲取消息須要主動執行basic.get 不要"在循環中使用basic.ge"t當作另一種形式的basic.consume,由於這種作法相比basic.consume有額外的成本:basic.get本質上就是先訂閱queue取回一條消息以後取消訂閱.Consumer吞吐量大的狀況下一般都會使用basic.consume.
 
 
要是沒有Consumer怎麼辦?
 
     若是消息沒有Consumer就會老老實實呆在隊列裏面.
 
多個Consumer訂閱同一個隊列
 
    只要Consumer訂閱了queue,消息就會發送到該Consumer.咱們的問題是這種狀況下queue中的消息是如何分發的?
    若是一個rabbit queue有多個consumer,具體到隊列中的某條消息只會發送到其中的一個Consumer.
 
消息確認
   
    全部接收到的消息都要求發送響應消息(ACK).這裏有兩種方式一種是Consumer使用basic.ack明確發送ACK,一種是訂閱queue的時候指定auto_ack爲true,這樣消息一到Consumer那裏RabbitMQ就會認爲消息已經獲得ACK.
   要注意的是這裏的響應和消息的發送者沒有絲毫關係,ACK只是Consumer向RabbitMQ確認消息已經正確的接收到消息,RabbitMQ能夠安全移除該消息,僅此而已.
 
沒有正確響應怎麼辦
 
    若是Consumer接收了一個消息就尚未發送ACK就與RabbitMQ斷開了,RabbitMQ會認爲這條消息沒有投遞成功會從新投遞到別的Consumer.若是你的應用程序崩掉了,你能夠設置備用程序來繼續完成消息的處理.
   若是Consumer自己邏輯有問題沒有發送ACK的處理,RabbitMQ不會再向該Consumer發送消息.RabbitMQ會認爲這個Consumer尚未處理完上一條消息,沒有能力繼續接收新消息.咱們能夠善加利用這一機制,若是須要處理過程是至關複雜的,應用程序能夠延遲發送ACK直處處理完成爲止.這能夠有效控制應用程序這邊的負載,不致於被大量消息衝擊.
 
 
拒絕消息
 
    因爲要拒絕消息,因此ACK響應消息尚未發出,因此這裏拒絕消息能夠有兩種選擇:
    1.Consumer直接斷開RabbitMQ 這樣RabbitMQ將把這條消息從新排隊,交由其它Consumer處理.這個方法在RabbitMQ各版本都支持.這樣作的壞處就是鏈接斷開增長了RabbitMQ的額外負擔,特別是consumer出現異常每條消息都沒法正常處理的時候.
   2. RabbitMQ 2.0.0可使用 basic.reject 命令,收到該命令RabbitMQ會從新投遞到其它的Consumer.若是設置requeue爲false,RabbitMQ會直接將消息從queue中移除.
   其實還有一種選擇就是直接忽略這條消息併發送ACK,當你明確直到這條消息是異常的不會有Consumer能處理,能夠這樣作拋棄異常數據.爲何要發送basic.reject消息而不是ACK?RabbitMQ後面的版本可能會引入"dead letter"隊列,若是想利用dead letter作點文章就使用basic.reject並設置requeue爲false.
  
 
消息持久化
    消息的持久化須要在消息投遞的時候設置delivery mode值爲2.因爲消息實際存儲於queue之中,"皮之不存毛將焉附"邏輯上,消息持久化同時要求exchange和queue也是持久化的.這是消息持久化必須知足的三個條件. 
     持久化的代價就是性能損失,磁盤IO遠遠慢於RAM(使用SSD會顯著提升消息持久化的性能) , 持久化會大大下降RabbitMQ每秒可處理的消息.二者的性能差距可能在10倍以上.
 
消息恢復
   consumer從durable queue中取回一條消息以後併發回了ACK消息,RabbitMQ就會將其標記,方便後續垃圾回收.若是一條持久化的消息沒有被consumer取走,RabbitMQ重啓以後會自動重建exchange和queue(以及bingding關係),消息經過持久化日誌重建再次進入對應的queues,exchanges.
 
皮之不存,毛將焉附?緊接着咱們看看消息實際存放的地方:Queue

Queue

 
  Queues是Massage的落腳點和等待接收的地方,消息除非被扔進黑洞不然就會被安置在一個Queue裏面.Queue很適合作負載均衡,RabbitMQ能夠在若干consumer中間實現輪流調度(Round-Robin).
 
如何建立隊列
   consumer和producer均可以建立Queue,若是consumer來建立,避免consumer訂閱一個不存在的Queue的狀況,可是這裏要承擔一種風險:消息已經投遞可是consumer還沒有建立隊列,那麼消息就會被扔到黑洞,換句話說消息丟了;避免這種狀況的好辦法就是producer和consumer都嘗試建立一下queue. 若是consumer在已經訂閱了另一個Queue的狀況下沒法完成新Queue的建立,必須取消以前的訂閱將Channel置爲傳輸模式("transmit")才能建立新的Channel.
   建立Queue的時候一般要指定名字,名字方便consumer訂閱.即便你不指定Rabbit會給它分配一個隨機的名字,這在使用臨時匿名隊列完成RPC-over-AMQP調用時會很是有用.
   建立Queue的時候還有兩個很是有用的選項:
   exclusive—When set to true, your queue becomes private and can only be consumed by your app. This is useful when you need to limit a queue to only one consumer.
   auto-delete—The queue is automatically deleted when the last consumer unsubscribes.
 
   若是要建立只有一個consumer使用的臨時queue能夠組合使用auto-delete和 exclusive.consumer一旦斷開鏈接該隊列自動刪除.
   重複建立Queue會怎樣?若是Queue建立的選項徹底一致的話,RabbitMQ直接返回成功,若是名稱相同可是建立選項不一致就會返回建立失敗.若是是想檢查Queue是否存在,能夠設置queue.declare命令的passive 選項爲true:若是隊列存在就會返回成功,若是隊列不存在會報錯且不會執行建立邏輯.
 
消息是如何從動態路由到不一樣的隊列的?這就看下面的內容了
 

bindings and exchanges

 
消息如何發送到隊列
 
     消息是如何發送到隊列的?這就要說到AMQP  bindings and exchanges. 投遞消息到queue都是經由exchange完成的,和生活中的郵件投遞同樣也須要遵循必定的規則,在RabbitMQ中規則是經過routing key把queue綁定到exchange上,這種綁定關係即binding.消息發送到RabbitMQ都會攜帶一個routing key(哪怕是空的key),RabbitMQ會根據bindings匹配routing key,若是匹配成功消息會轉發到指定Queue,若是沒有匹配到queue消息就會被扔到黑洞.
 
如何發送到多個隊列
 
  消息是分發到多個隊列的?AMQP協議裏面定義了幾種不一樣類型的exchange:direct, fanout, topic, and headers. 每一種都實現了一種 routing 算法. header的路由消息並不依賴routing key而是去匹配AMQP消息的header部分,這和下面提到的direct exchange一模一樣,可是性能要差不少,在實際場景中幾乎不會被用到.
 
direct exchange  routing key徹底匹配才轉發
fanout exchange 不理會routing key,消息直接廣播到全部綁定的queue 
topic exchange  對routing key模式匹配
 
 
exchange持久化
 
  建立queue和exchange默認狀況下都是沒有持久化的,節點重啓以後queue和exchange就會消失,這裏須要特別指定queue和exchange的durable屬性.
 
 
Consumer是直接建立TCP連接到RabbitMQ嗎?下面就是答案:
 

Channel

 
    不管是要發佈消息仍是要獲取消息 ,應用程序都須要經過TCP鏈接到RabbitMQ.應用程序鏈接並經過權限認證以後就要建立Channel來執行AMQP命令.Channel是創建在實際TCP鏈接之上通訊管道,這裏之因此引入channel的概念而不是直接經過TCP連接直接發送AMQP命令,是出於兩方面的考慮:創建上成百上千的TCP連接,一方面浪費了TCP連接,一方面很快會觸及系統瓶頸.引入了Channel以後多個進程與RabbitMQ的通訊能夠在一條TCP連接上完成.咱們能夠把TCP類比作光纜,那麼Channel就像光纜中的一根根光纖.
 

參考資料

 
相關文章
相關標籤/搜索